From 8d6edc9f8f71075c55fad3c5ca5a7e28b5e01400 Mon Sep 17 00:00:00 2001 From: Jethro Beekman Date: Wed, 5 Sep 2018 16:19:35 -0700 Subject: [PATCH] SGX target: implement synchronization primitives and threading --- src/libstd/io/lazy.rs | 1 + src/libstd/sys/sgx/abi/mod.rs | 5 +- src/libstd/sys/sgx/abi/usercalls/mod.rs | 12 + src/libstd/sys/sgx/alloc.rs | 15 +- src/libstd/sys/sgx/condvar.rs | 23 +- src/libstd/sys/sgx/mod.rs | 1 + src/libstd/sys/sgx/mutex.rs | 122 ++++-- src/libstd/sys/sgx/os.rs | 2 +- src/libstd/sys/sgx/rwlock.rs | 106 +++-- src/libstd/sys/sgx/thread.rs | 66 ++- src/libstd/sys/sgx/waitqueue.rs | 552 ++++++++++++++++++++++++ src/libstd/sys_common/condvar.rs | 1 + src/libstd/sys_common/mutex.rs | 1 + src/libstd/sys_common/rwlock.rs | 1 + 14 files changed, 832 insertions(+), 76 deletions(-) create mode 100644 src/libstd/sys/sgx/waitqueue.rs diff --git a/src/libstd/io/lazy.rs b/src/libstd/io/lazy.rs index 24965ff6931..c2aaeb98907 100644 --- a/src/libstd/io/lazy.rs +++ b/src/libstd/io/lazy.rs @@ -26,6 +26,7 @@ const fn done() -> *mut Arc { 1_usize as *mut _ } unsafe impl Sync for Lazy {} impl Lazy { + #[unstable(feature = "sys_internals", issue = "0")] // FIXME: min_const_fn pub const fn new() -> Lazy { Lazy { lock: Mutex::new(), diff --git a/src/libstd/sys/sgx/abi/mod.rs b/src/libstd/sys/sgx/abi/mod.rs index 61204904962..069cca3b98e 100644 --- a/src/libstd/sys/sgx/abi/mod.rs +++ b/src/libstd/sys/sgx/abi/mod.rs @@ -20,7 +20,7 @@ pub mod thread; pub mod tls; #[macro_use] -mod usercalls; +pub mod usercalls; global_asm!(concat!(usercalls_asm!(), include_str!("entry.S"))); @@ -59,14 +59,13 @@ // (main function exists). If this is a library, the crate author should be // able to specify this #[no_mangle] -#[allow(unreachable_code)] extern "C" fn entry(p1: u64, p2: u64, p3: u64, secondary: bool, p4: u64, p5: u64) -> (u64, u64) { // FIXME: how to support TLS in library mode? let tls = Box::new(tls::Tls::new()); let _tls_guard = unsafe { tls.activate() }; if secondary { - unimplemented!("thread entrypoint"); + super::thread::Thread::entry(); (0, 0) } else { diff --git a/src/libstd/sys/sgx/abi/usercalls/mod.rs b/src/libstd/sys/sgx/abi/usercalls/mod.rs index f7a9c3da3b2..cf422e3e6aa 100644 --- a/src/libstd/sys/sgx/abi/usercalls/mod.rs +++ b/src/libstd/sys/sgx/abi/usercalls/mod.rs @@ -16,10 +16,22 @@ #[macro_use] mod raw; +pub fn launch_thread() -> IoResult<()> { + unsafe { raw::launch_thread().from_sgx_result() } +} + pub fn exit(panic: bool) -> ! { unsafe { raw::exit(panic) } } +pub fn wait(event_mask: u64, timeout: u64) -> IoResult { + unsafe { raw::wait(event_mask, timeout).from_sgx_result() } +} + +pub fn send(event_set: u64, tcs: Option) -> IoResult<()> { + unsafe { raw::send(event_set, tcs).from_sgx_result() } +} + pub fn alloc(size: usize, alignment: usize) -> IoResult<*mut u8> { unsafe { raw::alloc(size, alignment).from_sgx_result() } } diff --git a/src/libstd/sys/sgx/alloc.rs b/src/libstd/sys/sgx/alloc.rs index a31f93ae493..83c20ace89b 100644 --- a/src/libstd/sys/sgx/alloc.rs +++ b/src/libstd/sys/sgx/alloc.rs @@ -12,28 +12,31 @@ use alloc::{GlobalAlloc, Layout, System}; -// FIXME: protect this value for concurrent access -static mut DLMALLOC: dlmalloc::Dlmalloc = dlmalloc::DLMALLOC_INIT; +use super::waitqueue::SpinMutex; + +// Using a SpinMutex because we never want to exit the enclave waiting for the +// allocator. +static DLMALLOC: SpinMutex = SpinMutex::new(dlmalloc::DLMALLOC_INIT); #[stable(feature = "alloc_system_type", since = "1.28.0")] unsafe impl GlobalAlloc for System { #[inline] unsafe fn alloc(&self, layout: Layout) -> *mut u8 { - DLMALLOC.malloc(layout.size(), layout.align()) + DLMALLOC.lock().malloc(layout.size(), layout.align()) } #[inline] unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { - DLMALLOC.calloc(layout.size(), layout.align()) + DLMALLOC.lock().calloc(layout.size(), layout.align()) } #[inline] unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { - DLMALLOC.free(ptr, layout.size(), layout.align()) + DLMALLOC.lock().free(ptr, layout.size(), layout.align()) } #[inline] unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { - DLMALLOC.realloc(ptr, layout.size(), layout.align(), new_size) + DLMALLOC.lock().realloc(ptr, layout.size(), layout.align(), new_size) } } diff --git a/src/libstd/sys/sgx/condvar.rs b/src/libstd/sys/sgx/condvar.rs index 2097280a064..d3e8165f3df 100644 --- a/src/libstd/sys/sgx/condvar.rs +++ b/src/libstd/sys/sgx/condvar.rs @@ -11,11 +11,16 @@ use sys::mutex::Mutex; use time::Duration; -pub struct Condvar { } +use super::waitqueue::{WaitVariable, WaitQueue, SpinMutex}; + +pub struct Condvar { + inner: SpinMutex>, +} impl Condvar { + #[unstable(feature = "sgx_internals", issue = "0")] // FIXME: min_const_fn pub const fn new() -> Condvar { - Condvar { } + Condvar { inner: SpinMutex::new(WaitVariable::new(())) } } #[inline] @@ -23,21 +28,25 @@ pub unsafe fn init(&mut self) {} #[inline] pub unsafe fn notify_one(&self) { + let _ = WaitQueue::notify_one(self.inner.lock()); } #[inline] pub unsafe fn notify_all(&self) { + let _ = WaitQueue::notify_all(self.inner.lock()); } - pub unsafe fn wait(&self, _mutex: &Mutex) { - panic!("can't block with web assembly") + pub unsafe fn wait(&self, mutex: &Mutex) { + let guard = self.inner.lock(); + mutex.unlock(); + WaitQueue::wait(guard); + mutex.lock() } pub unsafe fn wait_timeout(&self, _mutex: &Mutex, _dur: Duration) -> bool { - panic!("can't block with web assembly"); + panic!("timeout not supported in SGX"); } #[inline] - pub unsafe fn destroy(&self) { - } + pub unsafe fn destroy(&self) {} } diff --git a/src/libstd/sys/sgx/mod.rs b/src/libstd/sys/sgx/mod.rs index f38c69e90c7..68f7479d7cd 100644 --- a/src/libstd/sys/sgx/mod.rs +++ b/src/libstd/sys/sgx/mod.rs @@ -18,6 +18,7 @@ use sync::atomic::{AtomicBool, Ordering}; pub mod abi; +mod waitqueue; pub mod alloc; pub mod args; diff --git a/src/libstd/sys/sgx/mutex.rs b/src/libstd/sys/sgx/mutex.rs index ffaa4014e14..663361162bc 100644 --- a/src/libstd/sys/sgx/mutex.rs +++ b/src/libstd/sys/sgx/mutex.rs @@ -8,71 +8,145 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use cell::UnsafeCell; +use fortanix_sgx_abi::Tcs; + +use super::abi::thread; + +use super::waitqueue::{WaitVariable, WaitQueue, SpinMutex, NotifiedTcs, try_lock_or_false}; pub struct Mutex { - locked: UnsafeCell, + inner: SpinMutex>, } -unsafe impl Send for Mutex {} -unsafe impl Sync for Mutex {} // FIXME - +// Implementation according to “Operating Systems: Three Easy Pieces”, chapter 28 impl Mutex { + #[unstable(feature = "sgx_internals", issue = "0")] // FIXME: min_const_fn pub const fn new() -> Mutex { - Mutex { locked: UnsafeCell::new(false) } + Mutex { inner: SpinMutex::new(WaitVariable::new(false)) } } #[inline] - pub unsafe fn init(&mut self) { - } + pub unsafe fn init(&mut self) {} #[inline] pub unsafe fn lock(&self) { - let locked = self.locked.get(); - assert!(!*locked, "cannot recursively acquire mutex"); - *locked = true; + let mut guard = self.inner.lock(); + if *guard.lock_var() { + // Another thread has the lock, wait + WaitQueue::wait(guard) + // Another thread has passed the lock to us + } else { + // We are just now obtaining the lock + *guard.lock_var_mut() = true; + } } #[inline] pub unsafe fn unlock(&self) { - *self.locked.get() = false; + let guard = self.inner.lock(); + if let Err(mut guard) = WaitQueue::notify_one(guard) { + // No other waiters, unlock + *guard.lock_var_mut() = false; + } else { + // There was a thread waiting, just pass the lock + } } #[inline] pub unsafe fn try_lock(&self) -> bool { - let locked = self.locked.get(); - if *locked { + let mut guard = try_lock_or_false!(self.inner); + if *guard.lock_var() { + // Another thread has the lock false } else { - *locked = true; + // We are just now obtaining the lock + *guard.lock_var_mut() = true; true } } #[inline] - pub unsafe fn destroy(&self) { - } + pub unsafe fn destroy(&self) {} +} + +struct ReentrantLock { + owner: Option, + count: usize } -// FIXME pub struct ReentrantMutex { + inner: SpinMutex>, } impl ReentrantMutex { - pub unsafe fn uninitialized() -> ReentrantMutex { - ReentrantMutex { } + #[unstable(feature = "sgx_internals", issue = "0")] // FIXME: min_const_fn + pub const fn uninitialized() -> ReentrantMutex { + ReentrantMutex { + inner: SpinMutex::new(WaitVariable::new(ReentrantLock { owner: None, count: 0 })) + } } + #[inline] pub unsafe fn init(&mut self) {} - pub unsafe fn lock(&self) {} + #[inline] + pub unsafe fn lock(&self) { + let mut guard = self.inner.lock(); + match guard.lock_var().owner { + Some(tcs) if tcs != thread::current() => { + // Another thread has the lock, wait + WaitQueue::wait(guard); + // Another thread has passed the lock to us + }, + _ => { + // We are just now obtaining the lock + guard.lock_var_mut().owner = Some(thread::current()); + guard.lock_var_mut().count += 1; + }, + } + } #[inline] - pub unsafe fn try_lock(&self) -> bool { - true + pub unsafe fn unlock(&self) { + let mut guard = self.inner.lock(); + if guard.lock_var().count > 1 { + guard.lock_var_mut().count -= 1; + } else { + match WaitQueue::notify_one(guard) { + Err(mut guard) => { + // No other waiters, unlock + guard.lock_var_mut().count = 0; + guard.lock_var_mut().owner = None; + }, + Ok(mut guard) => { + // There was a thread waiting, just pass the lock + if let NotifiedTcs::Single(tcs) = guard.notified_tcs() { + guard.lock_var_mut().owner = Some(tcs) + } else { + unreachable!() // called notify_one + } + } + } + } } - pub unsafe fn unlock(&self) {} + #[inline] + pub unsafe fn try_lock(&self) -> bool { + let mut guard = try_lock_or_false!(self.inner); + match guard.lock_var().owner { + Some(tcs) if tcs != thread::current() => { + // Another thread has the lock + false + }, + _ => { + // We are just now obtaining the lock + guard.lock_var_mut().owner = Some(thread::current()); + guard.lock_var_mut().count += 1; + true + }, + } + } + #[inline] pub unsafe fn destroy(&self) {} } diff --git a/src/libstd/sys/sgx/os.rs b/src/libstd/sys/sgx/os.rs index 38d82efaf17..cb25338ed46 100644 --- a/src/libstd/sys/sgx/os.rs +++ b/src/libstd/sys/sgx/os.rs @@ -92,7 +92,7 @@ pub fn env() -> Env { } pub fn getenv(_k: &OsStr) -> io::Result> { - unsupported() + Ok(None) } pub fn setenv(_k: &OsStr, _v: &OsStr) -> io::Result<()> { diff --git a/src/libstd/sys/sgx/rwlock.rs b/src/libstd/sys/sgx/rwlock.rs index 2c0b1a45206..7b6970b825f 100644 --- a/src/libstd/sys/sgx/rwlock.rs +++ b/src/libstd/sys/sgx/rwlock.rs @@ -8,75 +8,127 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use cell::UnsafeCell; +use num::NonZeroUsize; + +use super::waitqueue::{WaitVariable, WaitQueue, SpinMutex, NotifiedTcs, try_lock_or_false}; pub struct RWLock { - mode: UnsafeCell, + readers: SpinMutex>>, + writer: SpinMutex>, } -unsafe impl Send for RWLock {} -unsafe impl Sync for RWLock {} // FIXME +//unsafe impl Send for RWLock {} +//unsafe impl Sync for RWLock {} // FIXME impl RWLock { + #[unstable(feature = "sgx_internals", issue = "0")] // FIXME: min_const_fn pub const fn new() -> RWLock { RWLock { - mode: UnsafeCell::new(0), + readers: SpinMutex::new(WaitVariable::new(None)), + writer: SpinMutex::new(WaitVariable::new(false)) } } #[inline] pub unsafe fn read(&self) { - let mode = self.mode.get(); - if *mode >= 0 { - *mode += 1; + let mut rguard = self.readers.lock(); + let wguard = self.writer.lock(); + if *wguard.lock_var() || !wguard.queue_empty() { + // Another thread has or is waiting for the write lock, wait + drop(wguard); + WaitQueue::wait(rguard); + // Another thread has passed the lock to us } else { - rtabort!("rwlock locked for writing"); + // No waiting writers, acquire the read lock + *rguard.lock_var_mut() = + NonZeroUsize::new(rguard.lock_var().map_or(0, |n| n.get()) + 1); } } #[inline] pub unsafe fn try_read(&self) -> bool { - let mode = self.mode.get(); - if *mode >= 0 { - *mode += 1; - true - } else { + let mut rguard = try_lock_or_false!(self.readers); + let wguard = try_lock_or_false!(self.writer); + if *wguard.lock_var() || !wguard.queue_empty() { + // Another thread has or is waiting for the write lock false + } else { + // No waiting writers, acquire the read lock + *rguard.lock_var_mut() = + NonZeroUsize::new(rguard.lock_var().map_or(0, |n| n.get()) + 1); + true } } #[inline] pub unsafe fn write(&self) { - let mode = self.mode.get(); - if *mode == 0 { - *mode = -1; + let rguard = self.readers.lock(); + let mut wguard = self.writer.lock(); + if *wguard.lock_var() || rguard.lock_var().is_some() { + // Another thread has the lock, wait + drop(rguard); + WaitQueue::wait(wguard); + // Another thread has passed the lock to us } else { - rtabort!("rwlock locked for reading") + // We are just now obtaining the lock + *wguard.lock_var_mut() = true; } } #[inline] pub unsafe fn try_write(&self) -> bool { - let mode = self.mode.get(); - if *mode == 0 { - *mode = -1; - true - } else { + let rguard = try_lock_or_false!(self.readers); + let mut wguard = try_lock_or_false!(self.writer); + if *wguard.lock_var() || rguard.lock_var().is_some() { + // Another thread has the lock false + } else { + // We are just now obtaining the lock + *wguard.lock_var_mut() = true; + true } } #[inline] pub unsafe fn read_unlock(&self) { - *self.mode.get() -= 1; + let mut rguard = self.readers.lock(); + let wguard = self.writer.lock(); + *rguard.lock_var_mut() = NonZeroUsize::new(rguard.lock_var().unwrap().get() - 1); + if rguard.lock_var().is_some() { + // There are other active readers + } else { + if let Ok(mut wguard) = WaitQueue::notify_one(wguard) { + // A writer was waiting, pass the lock + *wguard.lock_var_mut() = true; + } else { + // No writers were waiting, the lock is released + assert!(rguard.queue_empty()); + } + } } #[inline] pub unsafe fn write_unlock(&self) { - *self.mode.get() += 1; + let rguard = self.readers.lock(); + let wguard = self.writer.lock(); + if let Err(mut wguard) = WaitQueue::notify_one(wguard) { + // No writers waiting, release the write lock + *wguard.lock_var_mut() = false; + if let Ok(mut rguard) = WaitQueue::notify_all(rguard) { + // One or more readers were waiting, pass the lock to them + if let NotifiedTcs::All { count } = rguard.notified_tcs() { + *rguard.lock_var_mut() = Some(count) + } else { + unreachable!() // called notify_all + } + } else { + // No readers waiting, the lock is released + } + } else { + // There was a thread waiting for write, just pass the lock + } } #[inline] - pub unsafe fn destroy(&self) { - } + pub unsafe fn destroy(&self) {} } diff --git a/src/libstd/sys/sgx/thread.rs b/src/libstd/sys/sgx/thread.rs index ff8df12302c..9de12a5e6f1 100644 --- a/src/libstd/sys/sgx/thread.rs +++ b/src/libstd/sys/sgx/thread.rs @@ -11,35 +11,85 @@ use boxed::FnBox; use ffi::CStr; use io; -use sys::{unsupported, Void}; use time::Duration; -pub struct Thread(Void); +use super::abi::usercalls; + +pub struct Thread(task_queue::JoinHandle); pub const DEFAULT_MIN_STACK_SIZE: usize = 4096; +mod task_queue { + use sync::{Mutex, MutexGuard, Once}; + use sync::mpsc; + use boxed::FnBox; + + pub type JoinHandle = mpsc::Receiver<()>; + + pub(super) struct Task { + p: Box, + done: mpsc::Sender<()>, + } + + impl Task { + pub(super) fn new(p: Box) -> (Task, JoinHandle) { + let (done, recv) = mpsc::channel(); + (Task { p, done }, recv) + } + + pub(super) fn run(self) { + (self.p)(); + let _ = self.done.send(()); + } + } + + static TASK_QUEUE_INIT: Once = Once::new(); + static mut TASK_QUEUE: Option>> = None; + + pub(super) fn lock() -> MutexGuard<'static, Vec> { + unsafe { + TASK_QUEUE_INIT.call_once(|| TASK_QUEUE = Some(Default::default()) ); + TASK_QUEUE.as_ref().unwrap().lock().unwrap() + } + } +} + impl Thread { // unsafe: see thread::Builder::spawn_unchecked for safety requirements - pub unsafe fn new(_stack: usize, _p: Box) + pub unsafe fn new(_stack: usize, p: Box) -> io::Result { - unsupported() + let mut queue_lock = task_queue::lock(); + usercalls::launch_thread()?; + let (task, handle) = task_queue::Task::new(p); + queue_lock.push(task); + Ok(Thread(handle)) + } + + pub(super) fn entry() { + let mut guard = task_queue::lock(); + let task = guard.pop().expect("Thread started but no tasks pending"); + drop(guard); // make sure to not hold the task queue lock longer than necessary + task.run() } pub fn yield_now() { - // do nothing + assert_eq!( + usercalls::wait(0, usercalls::WAIT_NO).unwrap_err().kind(), + io::ErrorKind::WouldBlock + ); } pub fn set_name(_name: &CStr) { - // nope + // FIXME: could store this pointer in TLS somewhere } pub fn sleep(_dur: Duration) { - panic!("can't sleep"); + panic!("can't sleep"); // FIXME } pub fn join(self) { - match self.0 {} + let _ = self.0.recv(); } } diff --git a/src/libstd/sys/sgx/waitqueue.rs b/src/libstd/sys/sgx/waitqueue.rs new file mode 100644 index 00000000000..ec1135ba30c --- /dev/null +++ b/src/libstd/sys/sgx/waitqueue.rs @@ -0,0 +1,552 @@ +// Copyright 2018 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// A simple queue implementation for synchronization primitives. +/// +/// This queue is used to implement condition variable and mutexes. +/// +/// Users of this API are expected to use the `WaitVariable` type. Since +/// that type is not `Sync`, it needs to be protected by e.g. a `SpinMutex` to +/// allow shared access. +/// +/// Since userspace may send spurious wake-ups, the wakeup event state is +/// recorded in the enclave. The wakeup event state is protected by a spinlock. +/// The queue and associated wait state are stored in a `WaitVariable`. + +use ops::{Deref, DerefMut}; +use num::NonZeroUsize; + +use fortanix_sgx_abi::{Tcs, EV_UNPARK, WAIT_INDEFINITE}; +use super::abi::usercalls; +use super::abi::thread; + +use self::unsafe_list::{UnsafeList, UnsafeListEntry}; +pub use self::spin_mutex::{SpinMutex, SpinMutexGuard, try_lock_or_false}; + +/// An queue entry in a `WaitQueue`. +struct WaitEntry { + /// TCS address of the thread that is waiting + tcs: Tcs, + /// Whether this thread has been notified to be awoken + wake: bool +} + +/// Data stored with a `WaitQueue` alongside it. This ensures accesses to the +/// queue and the data are synchronized, since the type itself is not `Sync`. +/// +/// Consumers of this API should use a synchronization primitive for shared +/// access, such as `SpinMutex`. +#[derive(Default)] +pub struct WaitVariable { + queue: WaitQueue, + lock: T +} + +impl WaitVariable { + #[unstable(feature = "sgx_internals", issue = "0")] // FIXME: min_const_fn + pub const fn new(var: T) -> Self { + WaitVariable { + queue: WaitQueue::new(), + lock: var + } + } + + pub fn queue_empty(&self) -> bool { + self.queue.is_empty() + } + + pub fn lock_var(&self) -> &T { + &self.lock + } + + pub fn lock_var_mut(&mut self) -> &mut T { + &mut self.lock + } +} + +#[derive(Copy, Clone)] +pub enum NotifiedTcs { + Single(Tcs), + All { count: NonZeroUsize } +} + +/// An RAII guard that will notify a set of target threads as well as unlock +/// a mutex on drop. +pub struct WaitGuard<'a, T: 'a> { + mutex_guard: Option>>, + notified_tcs: NotifiedTcs +} + +/// A queue of threads that are waiting on some synchronization primitive. +/// +/// `UnsafeList` entries are allocated on the waiting thread's stack. This +/// avoids any global locking that might happen in the heap allocator. This is +/// safe because the waiting thread will not return from that stack frame until +/// after it is notified. The notifying thread ensures to clean up any +/// references to the list entries before sending the wakeup event. +pub struct WaitQueue { + // We use an inner Mutex here to protect the data in the face of spurious + // wakeups. + inner: UnsafeList>, +} +unsafe impl Send for WaitQueue {} + +impl Default for WaitQueue { + fn default() -> Self { + Self::new() + } +} + +impl<'a, T> WaitGuard<'a, T> { + /// Returns which TCSes will be notified when this guard drops. + pub fn notified_tcs(&self) -> NotifiedTcs { + self.notified_tcs + } +} + +impl<'a, T> Deref for WaitGuard<'a, T> { + type Target = SpinMutexGuard<'a, WaitVariable>; + + fn deref(&self) -> &Self::Target { + self.mutex_guard.as_ref().unwrap() + } +} + +impl<'a, T> DerefMut for WaitGuard<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + self.mutex_guard.as_mut().unwrap() + } +} + +impl<'a, T> Drop for WaitGuard<'a, T> { + fn drop(&mut self) { + drop(self.mutex_guard.take()); + let target_tcs = match self.notified_tcs { + NotifiedTcs::Single(tcs) => Some(tcs), + NotifiedTcs::All { .. } => None + }; + usercalls::send(EV_UNPARK, target_tcs).unwrap(); + } +} + +impl WaitQueue { + #[unstable(feature = "sgx_internals", issue = "0")] // FIXME: min_const_fn + pub const fn new() -> Self { + WaitQueue { + inner: UnsafeList::new() + } + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Add the calling thread to the WaitVariable's wait queue, then wait + /// until a wakeup event. + /// + /// This function does not return until this thread has been awoken. + pub fn wait(mut guard: SpinMutexGuard>) { + unsafe { + let mut entry = UnsafeListEntry::new(SpinMutex::new(WaitEntry { + tcs: thread::current(), + wake: false + })); + let entry = guard.queue.inner.push(&mut entry); + drop(guard); + while !entry.lock().wake { + assert_eq!( + usercalls::wait(EV_UNPARK, WAIT_INDEFINITE).unwrap() & EV_UNPARK, + EV_UNPARK + ); + } + } + } + + /// Either find the next waiter on the wait queue, or return the mutex + /// guard unchanged. + /// + /// If a waiter is found, a `WaitGuard` is returned which will notify the + /// waiter when it is dropped. + pub fn notify_one(mut guard: SpinMutexGuard>) + -> Result, SpinMutexGuard>> + { + unsafe { + if let Some(entry) = guard.queue.inner.pop() { + let mut entry_guard = entry.lock(); + let tcs = entry_guard.tcs; + entry_guard.wake = true; + drop(entry); + Ok(WaitGuard { + mutex_guard: Some(guard), + notified_tcs: NotifiedTcs::Single(tcs) + }) + } else { + Err(guard) + } + } + } + + /// Either find any and all waiters on the wait queue, or return the mutex + /// guard unchanged. + /// + /// If at least one waiter is found, a `WaitGuard` is returned which will + /// notify all waiters when it is dropped. + pub fn notify_all(mut guard: SpinMutexGuard>) + -> Result, SpinMutexGuard>> + { + unsafe { + let mut count = 0; + while let Some(entry) = guard.queue.inner.pop() { + count += 1; + let mut entry_guard = entry.lock(); + entry_guard.wake = true; + } + if let Some(count) = NonZeroUsize::new(count) { + Ok(WaitGuard { + mutex_guard: Some(guard), + notified_tcs: NotifiedTcs::All { count } + }) + } else { + Err(guard) + } + } + } +} + +/// A doubly-linked list where callers are in charge of memory allocation +/// of the nodes in the list. +mod unsafe_list { + use ptr::NonNull; + use mem; + + pub struct UnsafeListEntry { + next: NonNull>, + prev: NonNull>, + value: Option + } + + impl UnsafeListEntry { + fn dummy() -> Self { + UnsafeListEntry { + next: NonNull::dangling(), + prev: NonNull::dangling(), + value: None + } + } + + pub fn new(value: T) -> Self { + UnsafeListEntry { + value: Some(value), + ..Self::dummy() + } + } + } + + pub struct UnsafeList { + head_tail: NonNull>, + head_tail_entry: Option>, + } + + impl UnsafeList { + #[unstable(feature = "sgx_internals", issue = "0")] // FIXME: min_const_fn + pub const fn new() -> Self { + unsafe { + UnsafeList { + head_tail: NonNull::new_unchecked(1 as _), + head_tail_entry: None + } + } + } + + unsafe fn init(&mut self) { + if self.head_tail_entry.is_none() { + self.head_tail_entry = Some(UnsafeListEntry::dummy()); + self.head_tail = NonNull::new_unchecked(self.head_tail_entry.as_mut().unwrap()); + self.head_tail.as_mut().next = self.head_tail; + self.head_tail.as_mut().prev = self.head_tail; + } + } + + pub fn is_empty(&self) -> bool { + unsafe { + if self.head_tail_entry.is_some() { + let first = self.head_tail.as_ref().next; + if first == self.head_tail { + // ,-------> /---------\ next ---, + // | |head_tail| | + // `--- prev \---------/ <-------` + assert_eq!(self.head_tail.as_ref().prev, first); + true + } else { + false + } + } else { + true + } + } + } + + /// Pushes an entry onto the back of the list. + /// + /// # Safety + /// + /// The entry must remain allocated until the entry is removed from the + /// list AND the caller who popped is done using the entry. + pub unsafe fn push<'a>(&mut self, entry: &'a mut UnsafeListEntry) -> &'a T { + self.init(); + + // BEFORE: + // /---------\ next ---> /---------\ + // ... |prev_tail| |head_tail| ... + // \---------/ <--- prev \---------/ + // + // AFTER: + // /---------\ next ---> /-----\ next ---> /---------\ + // ... |prev_tail| |entry| |head_tail| ... + // \---------/ <--- prev \-----/ <--- prev \---------/ + let mut entry = NonNull::new_unchecked(entry); + let mut prev_tail = mem::replace(&mut self.head_tail.as_mut().prev, entry); + entry.as_mut().prev = prev_tail; + entry.as_mut().next = self.head_tail; + prev_tail.as_mut().next = entry; + (*entry.as_ptr()).value.as_ref().unwrap() + } + + /// Pops an entry from the front of the list. + /// + /// # Safety + /// + /// The caller must make sure to synchronize ending the borrow of the + /// return value and deallocation of the containing entry. + pub unsafe fn pop<'a>(&mut self) -> Option<&'a T> { + self.init(); + + if self.is_empty() { + None + } else { + // BEFORE: + // /---------\ next ---> /-----\ next ---> /------\ + // ... |head_tail| |first| |second| ... + // \---------/ <--- prev \-----/ <--- prev \------/ + // + // AFTER: + // /---------\ next ---> /------\ + // ... |head_tail| |second| ... + // \---------/ <--- prev \------/ + let mut first = self.head_tail.as_mut().next; + let mut second = first.as_mut().next; + self.head_tail.as_mut().next = second; + second.as_mut().prev = self.head_tail; + first.as_mut().next = NonNull::dangling(); + first.as_mut().prev = NonNull::dangling(); + Some((*first.as_ptr()).value.as_ref().unwrap()) + } + } + } + + #[cfg(test)] + mod tests { + use super::*; + use cell::Cell; + + unsafe fn assert_empty(list: &mut UnsafeList) { + assert!(list.pop().is_none(), "assertion failed: list is not empty"); + } + + #[test] + fn init_empty() { + unsafe { + assert_empty(&mut UnsafeList::::new()); + } + } + + #[test] + fn push_pop() { + unsafe { + let mut node = UnsafeListEntry::new(1234); + let mut list = UnsafeList::new(); + assert_eq!(list.push(&mut node), &1234); + assert_eq!(list.pop().unwrap(), &1234); + assert_empty(&mut list); + } + } + + #[test] + fn complex_pushes_pops() { + unsafe { + let mut node1 = UnsafeListEntry::new(1234); + let mut node2 = UnsafeListEntry::new(4567); + let mut node3 = UnsafeListEntry::new(9999); + let mut node4 = UnsafeListEntry::new(8642); + let mut list = UnsafeList::new(); + list.push(&mut node1); + list.push(&mut node2); + assert_eq!(list.pop().unwrap(), &1234); + list.push(&mut node3); + assert_eq!(list.pop().unwrap(), &4567); + assert_eq!(list.pop().unwrap(), &9999); + assert_empty(&mut list); + list.push(&mut node4); + assert_eq!(list.pop().unwrap(), &8642); + assert_empty(&mut list); + } + } + + #[test] + fn cell() { + unsafe { + let mut node = UnsafeListEntry::new(Cell::new(0)); + let mut list = UnsafeList::new(); + let noderef = list.push(&mut node); + assert_eq!(noderef.get(), 0); + list.pop().unwrap().set(1); + assert_empty(&mut list); + assert_eq!(noderef.get(), 1); + } + } + } +} + +/// Trivial spinlock-based implementation of `sync::Mutex`. +// FIXME: Perhaps use Intel TSX to avoid locking? +mod spin_mutex { + use cell::UnsafeCell; + use sync::atomic::{AtomicBool, Ordering, spin_loop_hint}; + use ops::{Deref, DerefMut}; + + #[derive(Default)] + pub struct SpinMutex { + value: UnsafeCell, + lock: AtomicBool, + } + + unsafe impl Send for SpinMutex {} + unsafe impl Sync for SpinMutex {} + + pub struct SpinMutexGuard<'a, T: 'a> { + mutex: &'a SpinMutex, + } + + impl<'a, T> !Send for SpinMutexGuard<'a, T> {} + unsafe impl<'a, T: Sync> Sync for SpinMutexGuard<'a, T> {} + + impl SpinMutex { + pub const fn new(value: T) -> Self { + SpinMutex { + value: UnsafeCell::new(value), + lock: AtomicBool::new(false) + } + } + + #[inline(always)] + pub fn lock(&self) -> SpinMutexGuard { + loop { + match self.try_lock() { + None => while self.lock.load(Ordering::Relaxed) { + spin_loop_hint() + }, + Some(guard) => return guard + } + } + } + + #[inline(always)] + pub fn try_lock(&self) -> Option> { + if !self.lock.compare_and_swap(false, true, Ordering::Acquire) { + Some(SpinMutexGuard { + mutex: self, + }) + } else { + None + } + } + } + + pub macro try_lock_or_false { + ($e:expr) => { + if let Some(v) = $e.try_lock() { + v + } else { + return false + } + } + } + + impl<'a, T> Deref for SpinMutexGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { + &*self.mutex.value.get() + } + } + } + + impl<'a, T> DerefMut for SpinMutexGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { + &mut*self.mutex.value.get() + } + } + } + + impl<'a, T> Drop for SpinMutexGuard<'a, T> { + fn drop(&mut self) { + self.mutex.lock.store(false, Ordering::Release) + } + } + + #[cfg(test)] + mod tests { + #![allow(deprecated)] + + use super::*; + use sync::Arc; + use thread; + + #[test] + fn sleep() { + let mutex = Arc::new(SpinMutex::::default()); + let mutex2 = mutex.clone(); + let guard = mutex.lock(); + let t1 = thread::spawn(move || { + *mutex2.lock() = 1; + }); + thread::sleep_ms(50); + assert_eq!(*guard, 0); + drop(guard); + t1.join().unwrap(); + assert_eq!(*mutex.lock(), 1); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use sync::Arc; + use thread; + + #[test] + fn queue() { + let wq = Arc::new(SpinMutex::>::default()); + let wq2 = wq.clone(); + + let locked = wq.lock(); + + let t1 = thread::spawn(move || { + assert!(WaitQueue::notify_one(wq2.lock()).is_none()) + }); + + WaitQueue::wait(locked); + + t1.join().unwrap(); + } +} diff --git a/src/libstd/sys_common/condvar.rs b/src/libstd/sys_common/condvar.rs index b6f29dd5fc3..16bf0803a8d 100644 --- a/src/libstd/sys_common/condvar.rs +++ b/src/libstd/sys_common/condvar.rs @@ -25,6 +25,7 @@ impl Condvar { /// /// Behavior is undefined if the condition variable is moved after it is /// first used with any of the functions below. + #[unstable(feature = "sys_internals", issue = "0")] // FIXME: min_const_fn pub const fn new() -> Condvar { Condvar(imp::Condvar::new()) } /// Prepares the condition variable for use. diff --git a/src/libstd/sys_common/mutex.rs b/src/libstd/sys_common/mutex.rs index c6d531c7a1a..87684237638 100644 --- a/src/libstd/sys_common/mutex.rs +++ b/src/libstd/sys_common/mutex.rs @@ -27,6 +27,7 @@ impl Mutex { /// Also, until `init` is called, behavior is undefined if this /// mutex is ever used reentrantly, i.e., `raw_lock` or `try_lock` /// are called by the thread currently holding the lock. + #[unstable(feature = "sys_internals", issue = "0")] // FIXME: min_const_fn pub const fn new() -> Mutex { Mutex(imp::Mutex::new()) } /// Prepare the mutex for use. diff --git a/src/libstd/sys_common/rwlock.rs b/src/libstd/sys_common/rwlock.rs index 71a4f01ec4c..a430c254d3c 100644 --- a/src/libstd/sys_common/rwlock.rs +++ b/src/libstd/sys_common/rwlock.rs @@ -22,6 +22,7 @@ impl RWLock { /// /// Behavior is undefined if the reader-writer lock is moved after it is /// first used with any of the functions below. + #[unstable(feature = "sys_internals", issue = "0")] // FIXME: min_const_fn pub const fn new() -> RWLock { RWLock(imp::RWLock::new()) } /// Acquires shared access to the underlying lock, blocking the current -- 2.44.0