use sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use sync::poison::{self, LockResult};
+use sys::time::SteadyTime;
use sys_common::condvar as sys;
use sys_common::mutex as sys_mutex;
use time::Duration;
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
- // Note that this method is *not* public, and this is quite intentional
- // because we're not quite sure about the semantics of relative vs absolute
- // durations or how the timing guarantees play into what the system APIs
- // provide. There are also additional concerns about the unix-specific
- // implementation which may need to be addressed.
- #[allow(dead_code)]
- fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
+ #[unstable]
+ pub fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
-> LockResult<(MutexGuard<'a, T>, bool)> {
unsafe {
let me: &'static Condvar = &*(self as *const _);
}
}
+ /// Wait on this condition variable for a notification, timing out after a
+ /// specified duration.
+ ///
+ /// The semantics of this function are equivalent to `wait_timeout` except
+ /// that the implementation will repeatedly wait while the duration has not
+ /// passed and the provided function returns `false`.
+ #[unstable]
+ pub fn wait_timeout_with<'a, T, F>(&self,
+ guard: MutexGuard<'a, T>,
+ dur: Duration,
+ f: F)
+ -> LockResult<(MutexGuard<'a, T>, bool)>
+ where F: FnMut(LockResult<&mut T>) -> bool {
+ unsafe {
+ let me: &'static Condvar = &*(self as *const _);
+ me.inner.wait_timeout_with(guard, dur, f)
+ }
+ }
+
/// Wake up one blocked thread on this condvar.
///
/// If there is a blocked thread on this condition variable, then it will
/// be woken up from its call to `wait` or `wait_timeout`. Calls to
/// `notify_one` are not buffered in any way.
///
- /// To wake up all threads, see `notify_one()`.
+ /// To wake up all threads, see `notify_all()`.
#[stable]
pub fn notify_one(&self) { unsafe { self.inner.inner.notify_one() } }
/// specified duration.
///
/// See `Condvar::wait_timeout`.
- #[allow(dead_code)] // may want to stabilize this later, see wait_timeout above
- fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration)
- -> LockResult<(MutexGuard<'a, T>, bool)> {
+ #[unstable = "may be merged with Condvar in the future"]
+ pub fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration)
+ -> LockResult<(MutexGuard<'a, T>, bool)> {
let (poisoned, success) = unsafe {
let lock = mutex::guard_lock(&guard);
self.verify(lock);
}
}
+ /// Wait on this condition variable for a notification, timing out after a
+ /// specified duration.
+ ///
+ /// The implementation will repeatedly wait while the duration has not
+ /// passed and the function returns `false`.
+ ///
+ /// See `Condvar::wait_timeout_with`.
+ #[unstable = "may be merged with Condvar in the future"]
+ pub fn wait_timeout_with<'a, T, F>(&'static self,
+ guard: MutexGuard<'a, T>,
+ dur: Duration,
+ mut f: F)
+ -> LockResult<(MutexGuard<'a, T>, bool)>
+ where F: FnMut(LockResult<&mut T>) -> bool {
+ // This could be made more efficient by pushing the implementation into sys::condvar
+ let start = SteadyTime::now();
+ let mut guard_result: LockResult<MutexGuard<'a, T>> = Ok(guard);
+ while !f(guard_result
+ .as_mut()
+ .map(|g| &mut **g)
+ .map_err(|e| poison::new_poison_error(&mut **e.get_mut()))) {
+ let now = SteadyTime::now();
+ let consumed = &now - &start;
+ let guard = guard_result.unwrap_or_else(|e| e.into_inner());
+ let (new_guard_result, no_timeout) = match self.wait_timeout(guard, dur - consumed) {
+ Ok((new_guard, no_timeout)) => (Ok(new_guard), no_timeout),
+ Err(err) => {
+ let (new_guard, no_timeout) = err.into_inner();
+ (Err(poison::new_poison_error(new_guard)), no_timeout)
+ }
+ };
+ guard_result = new_guard_result;
+ if !no_timeout {
+ let result = f(guard_result
+ .as_mut()
+ .map(|g| &mut **g)
+ .map_err(|e| poison::new_poison_error(&mut **e.get_mut())));
+ return poison::map_result(guard_result, |g| (g, result));
+ }
+ }
+
+ poison::map_result(guard_result, |g| (g, true))
+ }
+
/// Wake up one blocked thread on this condvar.
///
/// See `Condvar::notify_one`.
use super::{StaticCondvar, CONDVAR_INIT};
use sync::mpsc::channel;
use sync::{StaticMutex, MUTEX_INIT, Condvar, Mutex, Arc};
+ use sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use thread::Thread;
use time::Duration;
unsafe { C.destroy(); M.destroy(); }
}
+ #[test]
+ fn wait_timeout_with() {
+ static C: StaticCondvar = CONDVAR_INIT;
+ static M: StaticMutex = MUTEX_INIT;
+ static S: AtomicUsize = ATOMIC_USIZE_INIT;
+
+ let g = M.lock().unwrap();
+ let (g, success) = C.wait_timeout_with(g, Duration::nanoseconds(1000), |_| false).unwrap();
+ assert!(!success);
+
+ let (tx, rx) = channel();
+ let _t = Thread::scoped(move || {
+ rx.recv().unwrap();
+ let g = M.lock().unwrap();
+ S.store(1, Ordering::SeqCst);
+ C.notify_one();
+ drop(g);
+
+ rx.recv().unwrap();
+ let g = M.lock().unwrap();
+ S.store(2, Ordering::SeqCst);
+ C.notify_one();
+ drop(g);
+
+ rx.recv().unwrap();
+ let _g = M.lock().unwrap();
+ S.store(3, Ordering::SeqCst);
+ C.notify_one();
+ });
+
+ let mut state = 0;
+ let (_g, success) = C.wait_timeout_with(g, Duration::days(1), |_| {
+ assert_eq!(state, S.load(Ordering::SeqCst));
+ tx.send(()).unwrap();
+ state += 1;
+ match state {
+ 1|2 => false,
+ _ => true,
+ }
+ }).unwrap();
+ assert!(success);
+ }
+
#[test]
#[should_fail]
fn two_mutexes() {