]> git.lizzy.rs Git - rust.git/blob - library/std/src/sys/unix/thread_parker/pthread.rs
Auto merge of #100539 - joboet:horizon_timeout_clock, r=thomcc
[rust.git] / library / std / src / sys / unix / thread_parker / pthread.rs
1 //! Thread parking without `futex` using the `pthread` synchronization primitives.
2
3 use crate::cell::UnsafeCell;
4 use crate::marker::PhantomPinned;
5 use crate::pin::Pin;
6 use crate::ptr::addr_of_mut;
7 use crate::sync::atomic::AtomicUsize;
8 use crate::sync::atomic::Ordering::SeqCst;
9 use crate::sys::time::TIMESPEC_MAX;
10 use crate::time::Duration;
11
12 const EMPTY: usize = 0;
13 const PARKED: usize = 1;
14 const NOTIFIED: usize = 2;
15
16 unsafe fn lock(lock: *mut libc::pthread_mutex_t) {
17     let r = libc::pthread_mutex_lock(lock);
18     debug_assert_eq!(r, 0);
19 }
20
21 unsafe fn unlock(lock: *mut libc::pthread_mutex_t) {
22     let r = libc::pthread_mutex_unlock(lock);
23     debug_assert_eq!(r, 0);
24 }
25
26 unsafe fn notify_one(cond: *mut libc::pthread_cond_t) {
27     let r = libc::pthread_cond_signal(cond);
28     debug_assert_eq!(r, 0);
29 }
30
31 unsafe fn wait(cond: *mut libc::pthread_cond_t, lock: *mut libc::pthread_mutex_t) {
32     let r = libc::pthread_cond_wait(cond, lock);
33     debug_assert_eq!(r, 0);
34 }
35
36 unsafe fn wait_timeout(
37     cond: *mut libc::pthread_cond_t,
38     lock: *mut libc::pthread_mutex_t,
39     dur: Duration,
40 ) {
41     // Use the system clock on systems that do not support pthread_condattr_setclock.
42     // This unfortunately results in problems when the system time changes.
43     #[cfg(any(
44         target_os = "macos",
45         target_os = "ios",
46         target_os = "watchos",
47         target_os = "espidf",
48         target_os = "horizon",
49     ))]
50     let (now, dur) = {
51         use crate::cmp::min;
52         use crate::sys::time::SystemTime;
53
54         // OSX implementation of `pthread_cond_timedwait` is buggy
55         // with super long durations. When duration is greater than
56         // 0x100_0000_0000_0000 seconds, `pthread_cond_timedwait`
57         // in macOS Sierra return error 316.
58         //
59         // This program demonstrates the issue:
60         // https://gist.github.com/stepancheg/198db4623a20aad2ad7cddb8fda4a63c
61         //
62         // To work around this issue, and possible bugs of other OSes, timeout
63         // is clamped to 1000 years, which is allowable per the API of `park_timeout`
64         // because of spurious wakeups.
65         let dur = min(dur, Duration::from_secs(1000 * 365 * 86400));
66         let now = SystemTime::now().t;
67         (now, dur)
68     };
69     // Use the monotonic clock on other systems.
70     #[cfg(not(any(
71         target_os = "macos",
72         target_os = "ios",
73         target_os = "watchos",
74         target_os = "espidf",
75         target_os = "horizon",
76     )))]
77     let (now, dur) = {
78         use crate::sys::time::Timespec;
79
80         (Timespec::now(libc::CLOCK_MONOTONIC), dur)
81     };
82
83     let timeout =
84         now.checked_add_duration(&dur).and_then(|t| t.to_timespec()).unwrap_or(TIMESPEC_MAX);
85     let r = libc::pthread_cond_timedwait(cond, lock, &timeout);
86     debug_assert!(r == libc::ETIMEDOUT || r == 0);
87 }
88
89 pub struct Parker {
90     state: AtomicUsize,
91     lock: UnsafeCell<libc::pthread_mutex_t>,
92     cvar: UnsafeCell<libc::pthread_cond_t>,
93     // The `pthread` primitives require a stable address, so make this struct `!Unpin`.
94     _pinned: PhantomPinned,
95 }
96
97 impl Parker {
98     /// Construct the UNIX parker in-place.
99     ///
100     /// # Safety
101     /// The constructed parker must never be moved.
102     pub unsafe fn new(parker: *mut Parker) {
103         // Use the default mutex implementation to allow for simpler initialization.
104         // This could lead to undefined behaviour when deadlocking. This is avoided
105         // by not deadlocking. Note in particular the unlocking operation before any
106         // panic, as code after the panic could try to park again.
107         addr_of_mut!((*parker).state).write(AtomicUsize::new(EMPTY));
108         addr_of_mut!((*parker).lock).write(UnsafeCell::new(libc::PTHREAD_MUTEX_INITIALIZER));
109
110         cfg_if::cfg_if! {
111             if #[cfg(any(
112                 target_os = "macos",
113                 target_os = "ios",
114                 target_os = "watchos",
115                 target_os = "l4re",
116                 target_os = "android",
117                 target_os = "redox"
118             ))] {
119                 addr_of_mut!((*parker).cvar).write(UnsafeCell::new(libc::PTHREAD_COND_INITIALIZER));
120             } else if #[cfg(any(target_os = "espidf", target_os = "horizon"))] {
121                 let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), crate::ptr::null());
122                 assert_eq!(r, 0);
123             } else {
124                 use crate::mem::MaybeUninit;
125                 let mut attr = MaybeUninit::<libc::pthread_condattr_t>::uninit();
126                 let r = libc::pthread_condattr_init(attr.as_mut_ptr());
127                 assert_eq!(r, 0);
128                 let r = libc::pthread_condattr_setclock(attr.as_mut_ptr(), libc::CLOCK_MONOTONIC);
129                 assert_eq!(r, 0);
130                 let r = libc::pthread_cond_init(addr_of_mut!((*parker).cvar).cast(), attr.as_ptr());
131                 assert_eq!(r, 0);
132                 let r = libc::pthread_condattr_destroy(attr.as_mut_ptr());
133                 assert_eq!(r, 0);
134             }
135         }
136     }
137
138     // This implementation doesn't require `unsafe`, but other implementations
139     // may assume this is only called by the thread that owns the Parker.
140     pub unsafe fn park(self: Pin<&Self>) {
141         // If we were previously notified then we consume this notification and
142         // return quickly.
143         if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
144             return;
145         }
146
147         // Otherwise we need to coordinate going to sleep
148         lock(self.lock.get());
149         match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
150             Ok(_) => {}
151             Err(NOTIFIED) => {
152                 // We must read here, even though we know it will be `NOTIFIED`.
153                 // This is because `unpark` may have been called again since we read
154                 // `NOTIFIED` in the `compare_exchange` above. We must perform an
155                 // acquire operation that synchronizes with that `unpark` to observe
156                 // any writes it made before the call to unpark. To do that we must
157                 // read from the write it made to `state`.
158                 let old = self.state.swap(EMPTY, SeqCst);
159
160                 unlock(self.lock.get());
161
162                 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
163                 return;
164             } // should consume this notification, so prohibit spurious wakeups in next park.
165             Err(_) => {
166                 unlock(self.lock.get());
167
168                 panic!("inconsistent park state")
169             }
170         }
171
172         loop {
173             wait(self.cvar.get(), self.lock.get());
174
175             match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
176                 Ok(_) => break, // got a notification
177                 Err(_) => {}    // spurious wakeup, go back to sleep
178             }
179         }
180
181         unlock(self.lock.get());
182     }
183
184     // This implementation doesn't require `unsafe`, but other implementations
185     // may assume this is only called by the thread that owns the Parker. Use
186     // `Pin` to guarantee a stable address for the mutex and condition variable.
187     pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
188         // Like `park` above we have a fast path for an already-notified thread, and
189         // afterwards we start coordinating for a sleep.
190         // return quickly.
191         if self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
192             return;
193         }
194
195         lock(self.lock.get());
196         match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
197             Ok(_) => {}
198             Err(NOTIFIED) => {
199                 // We must read again here, see `park`.
200                 let old = self.state.swap(EMPTY, SeqCst);
201                 unlock(self.lock.get());
202
203                 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
204                 return;
205             } // should consume this notification, so prohibit spurious wakeups in next park.
206             Err(_) => {
207                 unlock(self.lock.get());
208                 panic!("inconsistent park_timeout state")
209             }
210         }
211
212         // Wait with a timeout, and if we spuriously wake up or otherwise wake up
213         // from a notification we just want to unconditionally set the state back to
214         // empty, either consuming a notification or un-flagging ourselves as
215         // parked.
216         wait_timeout(self.cvar.get(), self.lock.get(), dur);
217
218         match self.state.swap(EMPTY, SeqCst) {
219             NOTIFIED => unlock(self.lock.get()), // got a notification, hurray!
220             PARKED => unlock(self.lock.get()),   // no notification, alas
221             n => {
222                 unlock(self.lock.get());
223                 panic!("inconsistent park_timeout state: {n}")
224             }
225         }
226     }
227
228     pub fn unpark(self: Pin<&Self>) {
229         // To ensure the unparked thread will observe any writes we made
230         // before this call, we must perform a release operation that `park`
231         // can synchronize with. To do that we must write `NOTIFIED` even if
232         // `state` is already `NOTIFIED`. That is why this must be a swap
233         // rather than a compare-and-swap that returns if it reads `NOTIFIED`
234         // on failure.
235         match self.state.swap(NOTIFIED, SeqCst) {
236             EMPTY => return,    // no one was waiting
237             NOTIFIED => return, // already unparked
238             PARKED => {}        // gotta go wake someone up
239             _ => panic!("inconsistent state in unpark"),
240         }
241
242         // There is a period between when the parked thread sets `state` to
243         // `PARKED` (or last checked `state` in the case of a spurious wake
244         // up) and when it actually waits on `cvar`. If we were to notify
245         // during this period it would be ignored and then when the parked
246         // thread went to sleep it would never wake up. Fortunately, it has
247         // `lock` locked at this stage so we can acquire `lock` to wait until
248         // it is ready to receive the notification.
249         //
250         // Releasing `lock` before the call to `notify_one` means that when the
251         // parked thread wakes it doesn't get woken only to have to wait for us
252         // to release `lock`.
253         unsafe {
254             lock(self.lock.get());
255             unlock(self.lock.get());
256             notify_one(self.cvar.get());
257         }
258     }
259 }
260
261 impl Drop for Parker {
262     fn drop(&mut self) {
263         unsafe {
264             libc::pthread_cond_destroy(self.cvar.get_mut());
265             libc::pthread_mutex_destroy(self.lock.get_mut());
266         }
267     }
268 }
269
270 unsafe impl Sync for Parker {}
271 unsafe impl Send for Parker {}