]> git.lizzy.rs Git - rust.git/commitdiff
std: rewrite SGX thread parker
authorjoboet <jonasboettiger@icloud.com>
Wed, 22 Jun 2022 14:42:49 +0000 (16:42 +0200)
committerjoboet <jonasboettiger@icloud.com>
Wed, 22 Jun 2022 14:42:49 +0000 (16:42 +0200)
library/std/src/sys/sgx/mod.rs
library/std/src/sys/sgx/thread_parker.rs [new file with mode: 0644]
library/std/src/sys_common/thread_parker/mod.rs

index 696400670e04d1f8f543b258602a258191e6158c..65c1d0afe460877a7b388af8e8d87efa106af41a 100644 (file)
@@ -33,6 +33,7 @@
 pub mod stdio;
 pub mod thread;
 pub mod thread_local_key;
+pub mod thread_parker;
 pub mod time;
 
 mod condvar;
diff --git a/library/std/src/sys/sgx/thread_parker.rs b/library/std/src/sys/sgx/thread_parker.rs
new file mode 100644 (file)
index 0000000..f768abd
--- /dev/null
@@ -0,0 +1,93 @@
+//! Thread parking based on SGX events.
+
+use super::abi::{thread, usercalls};
+use crate::io::ErrorKind;
+use crate::pin::Pin;
+use crate::ptr::{self, NonNull};
+use crate::sync::atomic::AtomicPtr;
+use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
+use crate::time::Duration;
+use fortanix_sgx_abi::{EV_UNPARK, WAIT_INDEFINITE};
+
+const EMPTY: *mut u8 = ptr::invalid_mut(0);
+/// The TCS structure must be page-aligned, so this cannot be a valid pointer
+const NOTIFIED: *mut u8 = ptr::invalid_mut(1);
+
+pub struct Parker {
+    state: AtomicPtr<u8>,
+}
+
+impl Parker {
+    /// Construct the thread parker. The UNIX parker implementation
+    /// requires this to happen in-place.
+    pub unsafe fn new(parker: *mut Parker) {
+        unsafe { parker.write(Parker::new_internal()) }
+    }
+
+    pub(super) fn new_internal() -> Parker {
+        Parker { state: AtomicPtr::new(EMPTY) }
+    }
+
+    // This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
+    pub unsafe fn park(self: Pin<&Self>) {
+        let tcs = thread::current().as_ptr();
+
+        if self.state.load(Acquire) != NOTIFIED {
+            if self.state.compare_exchange(EMPTY, tcs, Acquire, Acquire).is_ok() {
+                // Loop to guard against spurious wakeups.
+                loop {
+                    let event = usercalls::wait(EV_UNPARK, WAIT_INDEFINITE).unwrap();
+                    assert!(event & EV_UNPARK == EV_UNPARK);
+                    if self.state.load(Acquire) == NOTIFIED {
+                        break;
+                    }
+                }
+            }
+        }
+
+        // At this point, the token was definately read with acquire ordering,
+        // so this can be a store.
+        self.state.store(EMPTY, Relaxed);
+    }
+
+    // This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
+    pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
+        let timeout = u128::min(dur.as_nanos(), WAIT_INDEFINITE as u128 - 1) as u64;
+        let tcs = thread::current().as_ptr();
+
+        if self.state.load(Acquire) != NOTIFIED {
+            if self.state.compare_exchange(EMPTY, tcs, Acquire, Acquire).is_ok() {
+                match usercalls::wait(EV_UNPARK, timeout) {
+                    Ok(event) => assert!(event & EV_UNPARK == EV_UNPARK),
+                    Err(e) => {
+                        assert!(matches!(e.kind(), ErrorKind::TimedOut | ErrorKind::WouldBlock))
+                    }
+                }
+
+                // Swap to provide acquire ordering even if the timeout occurred
+                // before the token was set. This situation can result in spurious
+                // wakeups on the next call to `park_timeout`, but it is better to let
+                // those be handled by the user than do some perhaps unnecessary, but
+                // always expensive guarding.
+                self.state.swap(EMPTY, Acquire);
+                return;
+            }
+        }
+
+        // The token was already read with `acquire` ordering, this can be a store.
+        self.state.store(EMPTY, Relaxed);
+    }
+
+    // This implementation doesn't require `Pin`, but other implementations do.
+    pub fn unpark(self: Pin<&Self>) {
+        let state = self.state.swap(NOTIFIED, Release);
+
+        if !matches!(state, EMPTY | NOTIFIED) {
+            // There is a thread waiting, wake it up.
+            let tcs = NonNull::new(state).unwrap();
+            // This will fail if the thread has already terminated by the time the signal is send,
+            // but that is OK.
+            let _ = usercalls::send(EV_UNPARK, Some(tcs));
+        }
+    }
+}
index c789a388e05adf77e5992c424a1db6d25813616c..505f26a4001aef007be1a941a660114e77ea4469 100644 (file)
@@ -13,6 +13,8 @@
         pub use crate::sys::thread_parker::Parker;
     } else if #[cfg(target_family = "unix")] {
         pub use crate::sys::thread_parker::Parker;
+    } else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] {
+        pub use crate::sys::thread_parker::Parker;
     } else {
         mod generic;
         pub use generic::Parker;