**The implementation is a direct adaptation of libcxx's condition_variable implementation.**
I also added a wait_timeout_with method, which matches the second overload in C++'s condition_variable. The implementation right now is kind of dumb but it works. There is an outstanding issue with it: as is it doesn't support the use case where a user doesn't care about poisoning and wants to continue through poison.
r? @alexcrichton @aturon
use sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use sync::poison::{self, LockResult};
+use sys::time::SteadyTime;
use sys_common::condvar as sys;
use sys_common::mutex as sys_mutex;
use time::Duration;
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
- // Note that this method is *not* public, and this is quite intentional
- // because we're not quite sure about the semantics of relative vs absolute
- // durations or how the timing guarantees play into what the system APIs
- // provide. There are also additional concerns about the unix-specific
- // implementation which may need to be addressed.
- #[allow(dead_code)]
- fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
+ #[unstable]
+ pub fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
-> LockResult<(MutexGuard<'a, T>, bool)> {
unsafe {
let me: &'static Condvar = &*(self as *const _);
}
}
+ /// Wait on this condition variable for a notification, timing out after a
+ /// specified duration.
+ ///
+ /// The semantics of this function are equivalent to `wait_timeout` except
+ /// that the implementation will repeatedly wait while the duration has not
+ /// passed and the provided function returns `false`.
+ #[unstable]
+ pub fn wait_timeout_with<'a, T, F>(&self,
+ guard: MutexGuard<'a, T>,
+ dur: Duration,
+ f: F)
+ -> LockResult<(MutexGuard<'a, T>, bool)>
+ where F: FnMut(LockResult<&mut T>) -> bool {
+ unsafe {
+ let me: &'static Condvar = &*(self as *const _);
+ me.inner.wait_timeout_with(guard, dur, f)
+ }
+ }
+
/// Wake up one blocked thread on this condvar.
///
/// If there is a blocked thread on this condition variable, then it will
/// specified duration.
///
/// See `Condvar::wait_timeout`.
- #[allow(dead_code)] // may want to stabilize this later, see wait_timeout above
- fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration)
- -> LockResult<(MutexGuard<'a, T>, bool)> {
+ #[unstable = "may be merged with Condvar in the future"]
+ pub fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration)
+ -> LockResult<(MutexGuard<'a, T>, bool)> {
let (poisoned, success) = unsafe {
let lock = mutex::guard_lock(&guard);
self.verify(lock);
}
}
+ /// Wait on this condition variable for a notification, timing out after a
+ /// specified duration.
+ ///
+ /// The implementation will repeatedly wait while the duration has not
+ /// passed and the function returns `false`.
+ ///
+ /// See `Condvar::wait_timeout_with`.
+ #[unstable = "may be merged with Condvar in the future"]
+ pub fn wait_timeout_with<'a, T, F>(&'static self,
+ guard: MutexGuard<'a, T>,
+ dur: Duration,
+ mut f: F)
+ -> LockResult<(MutexGuard<'a, T>, bool)>
+ where F: FnMut(LockResult<&mut T>) -> bool {
+ // This could be made more efficient by pushing the implementation into sys::condvar
+ let start = SteadyTime::now();
+ let mut guard_result: LockResult<MutexGuard<'a, T>> = Ok(guard);
+ while !f(guard_result
+ .as_mut()
+ .map(|g| &mut **g)
+ .map_err(|e| poison::new_poison_error(&mut **e.get_mut()))) {
+ let now = SteadyTime::now();
+ let consumed = &now - &start;
+ let guard = guard_result.unwrap_or_else(|e| e.into_inner());
+ let (new_guard_result, no_timeout) = match self.wait_timeout(guard, dur - consumed) {
+ Ok((new_guard, no_timeout)) => (Ok(new_guard), no_timeout),
+ Err(err) => {
+ let (new_guard, no_timeout) = err.into_inner();
+ (Err(poison::new_poison_error(new_guard)), no_timeout)
+ }
+ };
+ guard_result = new_guard_result;
+ if !no_timeout {
+ let result = f(guard_result
+ .as_mut()
+ .map(|g| &mut **g)
+ .map_err(|e| poison::new_poison_error(&mut **e.get_mut())));
+ return poison::map_result(guard_result, |g| (g, result));
+ }
+ }
+
+ poison::map_result(guard_result, |g| (g, true))
+ }
+
/// Wake up one blocked thread on this condvar.
///
/// See `Condvar::notify_one`.
use super::{StaticCondvar, CONDVAR_INIT};
use sync::mpsc::channel;
use sync::{StaticMutex, MUTEX_INIT, Condvar, Mutex, Arc};
+ use sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
use thread::Thread;
use time::Duration;
unsafe { C.destroy(); M.destroy(); }
}
+ #[test]
+ fn wait_timeout_with() {
+ static C: StaticCondvar = CONDVAR_INIT;
+ static M: StaticMutex = MUTEX_INIT;
+ static S: AtomicUsize = ATOMIC_USIZE_INIT;
+
+ let g = M.lock().unwrap();
+ let (g, success) = C.wait_timeout_with(g, Duration::nanoseconds(1000), |_| false).unwrap();
+ assert!(!success);
+
+ let (tx, rx) = channel();
+ let _t = Thread::scoped(move || {
+ rx.recv().unwrap();
+ let g = M.lock().unwrap();
+ S.store(1, Ordering::SeqCst);
+ C.notify_one();
+ drop(g);
+
+ rx.recv().unwrap();
+ let g = M.lock().unwrap();
+ S.store(2, Ordering::SeqCst);
+ C.notify_one();
+ drop(g);
+
+ rx.recv().unwrap();
+ let _g = M.lock().unwrap();
+ S.store(3, Ordering::SeqCst);
+ C.notify_one();
+ });
+
+ let mut state = 0;
+ let (_g, success) = C.wait_timeout_with(g, Duration::days(1), |_| {
+ assert_eq!(state, S.load(Ordering::SeqCst));
+ tx.send(()).unwrap();
+ state += 1;
+ match state {
+ 1|2 => false,
+ _ => true,
+ }
+ }).unwrap();
+ assert!(success);
+ }
+
#[test]
#[should_fail]
fn two_mutexes() {
impl<T> PoisonError<T> {
/// Consumes this error indicating that a lock is poisoned, returning the
/// underlying guard to allow access regardless.
- #[stable]
+ #[deprecated="renamed to into_inner"]
pub fn into_guard(self) -> T { self.guard }
+
+ /// Consumes this error indicating that a lock is poisoned, returning the
+ /// underlying guard to allow access regardless.
+ #[unstable]
+ pub fn into_inner(self) -> T { self.guard }
+
+ /// Reaches into this error indicating that a lock is poisoned, returning a
+ /// reference to the underlying guard to allow access regardless.
+ #[unstable]
+ pub fn get_ref(&self) -> &T { &self.guard }
+
+ /// Reaches into this error indicating that a lock is poisoned, returning a
+ /// mutable reference to the underlying guard to allow access regardless.
+ #[unstable]
+ pub fn get_mut(&mut self) -> &mut T { &mut self.guard }
}
impl<T> FromError<PoisonError<T>> for TryLockError<T> {
use cell::UnsafeCell;
use libc;
+use std::option::Option::{Some, None};
use sys::mutex::{self, Mutex};
+use sys::time;
use sys::sync as ffi;
use time::Duration;
+use num::{Int, NumCast};
pub struct Condvar { inner: UnsafeCell<ffi::pthread_cond_t> }
debug_assert_eq!(r, 0);
}
+ // This implementation is modeled after libcxx's condition_variable
+ // https://github.com/llvm-mirror/libcxx/blob/release_35/src/condition_variable.cpp#L46
+ // https://github.com/llvm-mirror/libcxx/blob/release_35/include/__mutex_base#L367
pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
- assert!(dur >= Duration::nanoseconds(0));
+ if dur <= Duration::zero() {
+ return false;
+ }
- // First, figure out what time it currently is
- let mut tv = libc::timeval { tv_sec: 0, tv_usec: 0 };
- let r = ffi::gettimeofday(&mut tv, 0 as *mut _);
+ // First, figure out what time it currently is, in both system and stable time.
+ // pthread_cond_timedwait uses system time, but we want to report timeout based on stable
+ // time.
+ let mut sys_now = libc::timeval { tv_sec: 0, tv_usec: 0 };
+ let stable_now = time::SteadyTime::now();
+ let r = ffi::gettimeofday(&mut sys_now, 0 as *mut _);
debug_assert_eq!(r, 0);
- // Offset that time with the specified duration
- let abs = Duration::seconds(tv.tv_sec as i64) +
- Duration::microseconds(tv.tv_usec as i64) +
- dur;
- let ns = abs.num_nanoseconds().unwrap() as u64;
- let timeout = libc::timespec {
- tv_sec: (ns / 1000000000) as libc::time_t,
- tv_nsec: (ns % 1000000000) as libc::c_long,
+ let seconds = NumCast::from(dur.num_seconds());
+ let timeout = match seconds.and_then(|s| sys_now.tv_sec.checked_add(s)) {
+ Some(sec) => {
+ libc::timespec {
+ tv_sec: sec,
+ tv_nsec: (dur - Duration::seconds(dur.num_seconds()))
+ .num_nanoseconds().unwrap() as libc::c_long,
+ }
+ }
+ None => {
+ libc::timespec {
+ tv_sec: Int::max_value(),
+ tv_nsec: 1_000_000_000 - 1,
+ }
+ }
};
// And wait!
- let r = ffi::pthread_cond_timedwait(self.inner.get(), mutex::raw(mutex),
- &timeout);
- if r != 0 {
- debug_assert_eq!(r as int, libc::ETIMEDOUT as int);
- false
- } else {
- true
- }
+ let r = ffi::pthread_cond_timedwait(self.inner.get(), mutex::raw(mutex), &timeout);
+ debug_assert!(r == libc::ETIMEDOUT || r == 0);
+
+ // ETIMEDOUT is not a totally reliable method of determining timeout due to clock shifts,
+ // so do the check ourselves
+ &time::SteadyTime::now() - &stable_now < dur
}
#[inline]
pub mod tcp;
pub mod thread;
pub mod thread_local;
+pub mod time;
pub mod timer;
pub mod tty;
pub mod udp;
--- /dev/null
+// Copyright 2015 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+pub use self::inner::SteadyTime;
+
+#[cfg(any(target_os = "macos", target_os = "ios"))]
+mod inner {
+ use libc;
+ use time::Duration;
+ use ops::Sub;
+ use sync::{Once, ONCE_INIT};
+
+ pub struct SteadyTime {
+ t: u64
+ }
+
+ extern {
+ pub fn mach_absolute_time() -> u64;
+ pub fn mach_timebase_info(info: *mut libc::mach_timebase_info) -> libc::c_int;
+ }
+
+ impl SteadyTime {
+ pub fn now() -> SteadyTime {
+ SteadyTime {
+ t: unsafe { mach_absolute_time() },
+ }
+ }
+
+ pub fn ns(&self) -> u64 {
+ let info = info();
+ self.t * info.numer as u64 / info.denom as u64
+ }
+ }
+
+ fn info() -> &'static libc::mach_timebase_info {
+ static mut INFO: libc::mach_timebase_info = libc::mach_timebase_info {
+ numer: 0,
+ denom: 0,
+ };
+ static ONCE: Once = ONCE_INIT;
+
+ unsafe {
+ ONCE.call_once(|| {
+ mach_timebase_info(&mut INFO);
+ });
+ &INFO
+ }
+ }
+
+ impl<'a> Sub for &'a SteadyTime {
+ type Output = Duration;
+
+ fn sub(self, other: &SteadyTime) -> Duration {
+ unsafe {
+ let info = info();
+ let diff = self.t as i64 - other.t as i64;
+ Duration::nanoseconds(diff * info.numer as i64 / info.denom as i64)
+ }
+ }
+ }
+}
+
+#[cfg(not(any(target_os = "macos", target_os = "ios")))]
+mod inner {
+ use libc;
+ use time::Duration;
+ use ops::Sub;
+
+ const NSEC_PER_SEC: i64 = 1_000_000_000;
+
+ pub struct SteadyTime {
+ t: libc::timespec,
+ }
+
+ // Apparently android provides this in some other library?
+ #[cfg(not(target_os = "android"))]
+ #[link(name = "rt")]
+ extern {}
+
+ extern {
+ fn clock_gettime(clk_id: libc::c_int, tp: *mut libc::timespec) -> libc::c_int;
+ }
+
+ impl SteadyTime {
+ pub fn now() -> SteadyTime {
+ let mut t = SteadyTime {
+ t: libc::timespec {
+ tv_sec: 0,
+ tv_nsec: 0,
+ }
+ };
+ unsafe {
+ assert_eq!(0, clock_gettime(libc::CLOCK_MONOTONIC, &mut t.t));
+ }
+ t
+ }
+
+ pub fn ns(&self) -> u64 {
+ self.t.tv_sec as u64 * NSEC_PER_SEC as u64 + self.t.tv_nsec as u64
+ }
+ }
+
+ impl<'a> Sub for &'a SteadyTime {
+ type Output = Duration;
+
+ fn sub(self, other: &SteadyTime) -> Duration {
+ if self.t.tv_nsec >= other.t.tv_nsec {
+ Duration::seconds(self.t.tv_sec as i64 - other.t.tv_sec as i64) +
+ Duration::nanoseconds(self.t.tv_nsec as i64 - other.t.tv_nsec as i64)
+ } else {
+ Duration::seconds(self.t.tv_sec as i64 - 1 - other.t.tv_sec as i64) +
+ Duration::nanoseconds(self.t.tv_nsec as i64 + NSEC_PER_SEC -
+ other.t.tv_nsec as i64)
+ }
+ }
+ }
+}
pub mod sync;
pub mod stack_overflow;
pub mod tcp;
+pub mod time;
pub mod thread;
pub mod thread_local;
pub mod timer;
--- /dev/null
+// Copyright 2015 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+use libc;
+use ops::Sub;
+use time::Duration;
+use sync::{Once, ONCE_INIT};
+
+pub struct SteadyTime {
+ t: libc::LARGE_INTEGER,
+}
+
+impl SteadyTime {
+ pub fn now() -> SteadyTime {
+ let mut t = SteadyTime { t: 0 };
+ unsafe { libc::QueryPerformanceCounter(&mut t.t); }
+ t
+ }
+
+ pub fn ns(&self) -> u64 {
+ self.t as u64 * 1_000_000_000 / frequency() as u64
+ }
+}
+
+fn frequency() -> libc::LARGE_INTEGER {
+ static mut FREQUENCY: libc::LARGE_INTEGER = 0;
+ static ONCE: Once = ONCE_INIT;
+
+ unsafe {
+ ONCE.call_once(|| {
+ libc::QueryPerformanceFrequency(&mut FREQUENCY);
+ });
+ FREQUENCY
+ }
+}
+
+impl<'a> Sub for &'a SteadyTime {
+ type Output = Duration;
+
+ fn sub(self, other: &SteadyTime) -> Duration {
+ let diff = self.t as i64 - other.t as i64;
+ Duration::microseconds(diff * 1_000_000 / frequency() as i64)
+ }
+}
//! Temporal quantification.
-use libc;
+use sys::time::SteadyTime;
pub use self::duration::Duration;
/// in nanoseconds since an unspecified epoch.
// NB: this is intentionally not public, this is not ready to stabilize its api.
fn precise_time_ns() -> u64 {
- return os_precise_time_ns();
-
- #[cfg(windows)]
- fn os_precise_time_ns() -> u64 {
- let mut ticks_per_s = 0;
- assert_eq!(unsafe {
- libc::QueryPerformanceFrequency(&mut ticks_per_s)
- }, 1);
- let ticks_per_s = if ticks_per_s == 0 {1} else {ticks_per_s};
- let mut ticks = 0;
- assert_eq!(unsafe {
- libc::QueryPerformanceCounter(&mut ticks)
- }, 1);
-
- return (ticks as u64 * 1000000000) / (ticks_per_s as u64);
- }
-
- #[cfg(any(target_os = "macos", target_os = "ios"))]
- fn os_precise_time_ns() -> u64 {
- use sync;
-
- static mut TIMEBASE: libc::mach_timebase_info = libc::mach_timebase_info { numer: 0,
- denom: 0 };
- static ONCE: sync::Once = sync::ONCE_INIT;
- unsafe {
- ONCE.call_once(|| {
- imp::mach_timebase_info(&mut TIMEBASE);
- });
- let time = imp::mach_absolute_time();
- time * TIMEBASE.numer as u64 / TIMEBASE.denom as u64
- }
- }
-
- #[cfg(not(any(windows, target_os = "macos", target_os = "ios")))]
- fn os_precise_time_ns() -> u64 {
- let mut ts = libc::timespec { tv_sec: 0, tv_nsec: 0 };
- unsafe {
- imp::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts);
- }
- return (ts.tv_sec as u64) * 1000000000 + (ts.tv_nsec as u64)
- }
-}
-
-#[cfg(all(unix, not(target_os = "macos"), not(target_os = "ios")))]
-mod imp {
- use libc::{c_int, timespec};
-
- // Apparently android provides this in some other library?
- #[cfg(not(target_os = "android"))]
- #[link(name = "rt")]
- extern {}
-
- extern {
- pub fn clock_gettime(clk_id: c_int, tp: *mut timespec) -> c_int;
- }
-
-}
-#[cfg(any(target_os = "macos", target_os = "ios"))]
-mod imp {
- use libc::{c_int, mach_timebase_info};
-
- extern {
- pub fn mach_absolute_time() -> u64;
- pub fn mach_timebase_info(info: *mut mach_timebase_info) -> c_int;
- }
+ SteadyTime::now().ns()
}