]> git.lizzy.rs Git - rust.git/commitdiff
Rewrite Condvar::wait_timeout and make it public
authorSteven Fackler <sfackler@gmail.com>
Wed, 14 Jan 2015 05:24:26 +0000 (21:24 -0800)
committerSteven Fackler <sfackler@gmail.com>
Fri, 16 Jan 2015 17:17:37 +0000 (09:17 -0800)
**The implementation is a direct adaptation of libcxx's
condition_variable implementation.**

pthread_cond_timedwait uses the non-monotonic system clock. It's
possible to change the clock to a monotonic via pthread_cond_attr, but
this is incompatible with static initialization. To deal with this, we
calculate the timeout using the system clock, and maintain a separate
record of the start and end times with a monotonic clock to be used for
calculation of the return value.

src/libstd/sync/condvar.rs
src/libstd/sync/poison.rs
src/libstd/sys/unix/condvar.rs
src/libstd/sys/unix/mod.rs
src/libstd/sys/unix/time.rs [new file with mode: 0644]
src/libstd/sys/windows/mod.rs
src/libstd/sys/windows/time.rs [new file with mode: 0644]
src/libstd/time/mod.rs

index bcd5f56a353792790e6467a5835e0987a03ba63a..b8b186f31e044089afe0e1fdfdb9d394489e757d 100644 (file)
@@ -12,6 +12,7 @@
 
 use sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
 use sync::poison::{self, LockResult};
+use sys::time::SteadyTime;
 use sys_common::condvar as sys;
 use sys_common::mutex as sys_mutex;
 use time::Duration;
@@ -153,13 +154,8 @@ pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>)
     ///
     /// Like `wait`, the lock specified will be re-acquired when this function
     /// returns, regardless of whether the timeout elapsed or not.
-    // Note that this method is *not* public, and this is quite intentional
-    // because we're not quite sure about the semantics of relative vs absolute
-    // durations or how the timing guarantees play into what the system APIs
-    // provide. There are also additional concerns about the unix-specific
-    // implementation which may need to be addressed.
-    #[allow(dead_code)]
-    fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
+    #[unstable]
+    pub fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
                            -> LockResult<(MutexGuard<'a, T>, bool)> {
         unsafe {
             let me: &'static Condvar = &*(self as *const _);
@@ -167,6 +163,25 @@ fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
         }
     }
 
+    /// Wait on this condition variable for a notification, timing out after a
+    /// specified duration.
+    ///
+    /// The semantics of this function are equivalent to `wait_timeout` except
+    /// that the implementation will repeatedly wait while the duration has not
+    /// passed and the provided function returns `false`.
+    #[unstable]
+    pub fn wait_timeout_with<'a, T, F>(&self,
+                                       guard: MutexGuard<'a, T>,
+                                       dur: Duration,
+                                       f: F)
+                                       -> LockResult<(MutexGuard<'a, T>, bool)>
+            where F: FnMut(LockResult<&mut T>) -> bool {
+        unsafe {
+            let me: &'static Condvar = &*(self as *const _);
+            me.inner.wait_timeout_with(guard, dur, f)
+        }
+    }
+
     /// Wake up one blocked thread on this condvar.
     ///
     /// If there is a blocked thread on this condition variable, then it will
@@ -220,9 +235,9 @@ pub fn wait<'a, T>(&'static self, guard: MutexGuard<'a, T>)
     /// specified duration.
     ///
     /// See `Condvar::wait_timeout`.
-    #[allow(dead_code)] // may want to stabilize this later, see wait_timeout above
-    fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration)
-                           -> LockResult<(MutexGuard<'a, T>, bool)> {
+    #[unstable = "may be merged with Condvar in the future"]
+    pub fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration)
+                               -> LockResult<(MutexGuard<'a, T>, bool)> {
         let (poisoned, success) = unsafe {
             let lock = mutex::guard_lock(&guard);
             self.verify(lock);
@@ -236,6 +251,50 @@ fn wait_timeout<'a, T>(&'static self, guard: MutexGuard<'a, T>, dur: Duration)
         }
     }
 
+    /// Wait on this condition variable for a notification, timing out after a
+    /// specified duration.
+    ///
+    /// The implementation will repeatedly wait while the duration has not
+    /// passed and the function returns `false`.
+    ///
+    /// See `Condvar::wait_timeout_with`.
+    #[unstable = "may be merged with Condvar in the future"]
+    pub fn wait_timeout_with<'a, T, F>(&'static self,
+                                       guard: MutexGuard<'a, T>,
+                                       dur: Duration,
+                                       mut f: F)
+                                       -> LockResult<(MutexGuard<'a, T>, bool)>
+            where F: FnMut(LockResult<&mut T>) -> bool {
+        // This could be made more efficient by pushing the implementation into sys::condvar
+        let start = SteadyTime::now();
+        let mut guard_result: LockResult<MutexGuard<'a, T>> = Ok(guard);
+        while !f(guard_result
+                    .as_mut()
+                    .map(|g| &mut **g)
+                    .map_err(|e| poison::new_poison_error(&mut **e.get_mut()))) {
+            let now = SteadyTime::now();
+            let consumed = &now - &start;
+            let guard = guard_result.unwrap_or_else(|e| e.into_inner());
+            let (new_guard_result, no_timeout) = match self.wait_timeout(guard, dur - consumed) {
+                Ok((new_guard, no_timeout)) => (Ok(new_guard), no_timeout),
+                Err(err) => {
+                    let (new_guard, no_timeout) = err.into_inner();
+                    (Err(poison::new_poison_error(new_guard)), no_timeout)
+                }
+            };
+            guard_result = new_guard_result;
+            if !no_timeout {
+                let result = f(guard_result
+                                    .as_mut()
+                                    .map(|g| &mut **g)
+                                    .map_err(|e| poison::new_poison_error(&mut **e.get_mut())));
+                return poison::map_result(guard_result, |g| (g, result));
+            }
+        }
+
+        poison::map_result(guard_result, |g| (g, true))
+    }
+
     /// Wake up one blocked thread on this condvar.
     ///
     /// See `Condvar::notify_one`.
@@ -285,6 +344,7 @@ mod tests {
     use super::{StaticCondvar, CONDVAR_INIT};
     use sync::mpsc::channel;
     use sync::{StaticMutex, MUTEX_INIT, Condvar, Mutex, Arc};
+    use sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
     use thread::Thread;
     use time::Duration;
 
@@ -372,6 +432,49 @@ fn wait_timeout() {
         unsafe { C.destroy(); M.destroy(); }
     }
 
+    #[test]
+    fn wait_timeout_with() {
+        static C: StaticCondvar = CONDVAR_INIT;
+        static M: StaticMutex = MUTEX_INIT;
+        static S: AtomicUsize = ATOMIC_USIZE_INIT;
+
+        let g = M.lock().unwrap();
+        let (g, success) = C.wait_timeout_with(g, Duration::nanoseconds(1000), |_| false).unwrap();
+        assert!(!success);
+
+        let (tx, rx) = channel();
+        let _t = Thread::scoped(move || {
+            rx.recv().unwrap();
+            let g = M.lock().unwrap();
+            S.store(1, Ordering::SeqCst);
+            C.notify_one();
+            drop(g);
+
+            rx.recv().unwrap();
+            let g = M.lock().unwrap();
+            S.store(2, Ordering::SeqCst);
+            C.notify_one();
+            drop(g);
+
+            rx.recv().unwrap();
+            let _g = M.lock().unwrap();
+            S.store(3, Ordering::SeqCst);
+            C.notify_one();
+        });
+
+        let mut state = 0;
+        let (_g, success) = C.wait_timeout_with(g, Duration::days(1), |_| {
+            assert_eq!(state, S.load(Ordering::SeqCst));
+            tx.send(()).unwrap();
+            state += 1;
+            match state {
+                1|2 => false,
+                _ => true,
+            }
+        }).unwrap();
+        assert!(success);
+    }
+
     #[test]
     #[should_fail]
     fn two_mutexes() {
index 385df45b400c41a8cebfeab79d47c92ac6c5aed5..cc8c331ef3997c3d2e34467de445907f3050d068 100644 (file)
@@ -99,8 +99,23 @@ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
 impl<T> PoisonError<T> {
     /// Consumes this error indicating that a lock is poisoned, returning the
     /// underlying guard to allow access regardless.
-    #[stable]
+    #[deprecated="renamed to into_inner"]
     pub fn into_guard(self) -> T { self.guard }
+
+    /// Consumes this error indicating that a lock is poisoned, returning the
+    /// underlying guard to allow access regardless.
+    #[unstable]
+    pub fn into_inner(self) -> T { self.guard }
+
+    /// Reaches into this error indicating that a lock is poisoned, returning a
+    /// reference to the underlying guard to allow access regardless.
+    #[unstable]
+    pub fn get_ref(&self) -> &T { &self.guard }
+
+    /// Reaches into this error indicating that a lock is poisoned, returning a
+    /// mutable reference to the underlying guard to allow access regardless.
+    #[unstable]
+    pub fn get_mut(&mut self) -> &mut T { &mut self.guard }
 }
 
 impl<T> FromError<PoisonError<T>> for TryLockError<T> {
index 52dd261824fd43021825913644b8b489a358389c..85a65bbef508d541b92ef25d9d01a7a5e586c649 100644 (file)
 
 use cell::UnsafeCell;
 use libc;
+use std::option::Option::{Some, None};
 use sys::mutex::{self, Mutex};
+use sys::time;
 use sys::sync as ffi;
 use time::Duration;
+use num::{Int, NumCast};
 
 pub struct Condvar { inner: UnsafeCell<ffi::pthread_cond_t> }
 
@@ -46,33 +49,46 @@ pub unsafe fn wait(&self, mutex: &Mutex) {
         debug_assert_eq!(r, 0);
     }
 
+    // This implementation is modeled after libcxx's condition_variable
+    // https://github.com/llvm-mirror/libcxx/blob/release_35/src/condition_variable.cpp#L46
+    // https://github.com/llvm-mirror/libcxx/blob/release_35/include/__mutex_base#L367
     pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool {
-        assert!(dur >= Duration::nanoseconds(0));
+        if dur <= Duration::zero() {
+            return false;
+        }
 
-        // First, figure out what time it currently is
-        let mut tv = libc::timeval { tv_sec: 0, tv_usec: 0 };
-        let r = ffi::gettimeofday(&mut tv, 0 as *mut _);
+        // First, figure out what time it currently is, in both system and stable time.
+        // pthread_cond_timedwait uses system time, but we want to report timeout based on stable
+        // time.
+        let mut sys_now = libc::timeval { tv_sec: 0, tv_usec: 0 };
+        let stable_now = time::SteadyTime::now();
+        let r = ffi::gettimeofday(&mut sys_now, 0 as *mut _);
         debug_assert_eq!(r, 0);
 
-        // Offset that time with the specified duration
-        let abs = Duration::seconds(tv.tv_sec as i64) +
-                  Duration::microseconds(tv.tv_usec as i64) +
-                  dur;
-        let ns = abs.num_nanoseconds().unwrap() as u64;
-        let timeout = libc::timespec {
-            tv_sec: (ns / 1000000000) as libc::time_t,
-            tv_nsec: (ns % 1000000000) as libc::c_long,
+        let seconds = NumCast::from(dur.num_seconds());
+        let timeout = match seconds.and_then(|s| sys_now.tv_sec.checked_add(s)) {
+            Some(sec) => {
+                libc::timespec {
+                    tv_sec: sec,
+                    tv_nsec: (dur - Duration::seconds(dur.num_seconds()))
+                        .num_nanoseconds().unwrap() as libc::c_long,
+                }
+            }
+            None => {
+                libc::timespec {
+                    tv_sec: Int::max_value(),
+                    tv_nsec: 1_000_000_000 - 1,
+                }
+            }
         };
 
         // And wait!
-        let r = ffi::pthread_cond_timedwait(self.inner.get(), mutex::raw(mutex),
-                                            &timeout);
-        if r != 0 {
-            debug_assert_eq!(r as int, libc::ETIMEDOUT as int);
-            false
-        } else {
-            true
-        }
+        let r = ffi::pthread_cond_timedwait(self.inner.get(), mutex::raw(mutex), &timeout);
+        debug_assert!(r == libc::ETIMEDOUT || r == 0);
+
+        // ETIMEDOUT is not a totally reliable method of determining timeout due to clock shifts,
+        // so do the check ourselves
+        &time::SteadyTime::now() - &stable_now < dur
     }
 
     #[inline]
index 6a408aa60f0bfb2aa8bbcbbf8fe3a691fb681e0e..bb98d1e052a1c1172e85b60267037c149228ae97 100644 (file)
@@ -52,6 +52,7 @@ macro_rules! helper_init { (static $name:ident: Helper<$m:ty>) => (
 pub mod tcp;
 pub mod thread;
 pub mod thread_local;
+pub mod time;
 pub mod timer;
 pub mod tty;
 pub mod udp;
diff --git a/src/libstd/sys/unix/time.rs b/src/libstd/sys/unix/time.rs
new file mode 100644 (file)
index 0000000..cc1e23f
--- /dev/null
@@ -0,0 +1,124 @@
+// Copyright 2015 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+
+pub use self::inner::SteadyTime;
+
+#[cfg(any(target_os = "macos", target_os = "ios"))]
+mod inner {
+    use libc;
+    use time::Duration;
+    use ops::Sub;
+    use sync::{Once, ONCE_INIT};
+
+    pub struct SteadyTime {
+        t: u64
+    }
+
+    extern {
+        pub fn mach_absolute_time() -> u64;
+        pub fn mach_timebase_info(info: *mut libc::mach_timebase_info) -> libc::c_int;
+    }
+
+    impl SteadyTime {
+        pub fn now() -> SteadyTime {
+            SteadyTime {
+                t: unsafe { mach_absolute_time() },
+            }
+        }
+
+        pub fn ns(&self) -> u64 {
+            let info = info();
+            self.t * info.numer as u64 / info.denom as u64
+        }
+    }
+
+    fn info() -> &'static libc::mach_timebase_info {
+        static mut INFO: libc::mach_timebase_info = libc::mach_timebase_info {
+            numer: 0,
+            denom: 0,
+        };
+        static ONCE: Once = ONCE_INIT;
+
+        unsafe {
+            ONCE.call_once(|| {
+                mach_timebase_info(&mut INFO);
+            });
+            &INFO
+        }
+    }
+
+    impl<'a> Sub for &'a SteadyTime {
+        type Output = Duration;
+
+        fn sub(self, other: &SteadyTime) -> Duration {
+            unsafe {
+                let info = info();
+                let diff = self.t as i64 - other.t as i64;
+                Duration::nanoseconds(diff * info.numer as i64 / info.denom as i64)
+            }
+        }
+    }
+}
+
+#[cfg(not(any(target_os = "macos", target_os = "ios")))]
+mod inner {
+    use libc;
+    use time::Duration;
+    use ops::Sub;
+
+    const NSEC_PER_SEC: i64 = 1_000_000_000;
+
+    pub struct SteadyTime {
+        t: libc::timespec,
+    }
+
+    // Apparently android provides this in some other library?
+    #[cfg(not(target_os = "android"))]
+    #[link(name = "rt")]
+    extern {}
+
+    extern {
+        fn clock_gettime(clk_id: libc::c_int, tp: *mut libc::timespec) -> libc::c_int;
+    }
+
+    impl SteadyTime {
+        pub fn now() -> SteadyTime {
+            let mut t = SteadyTime {
+                t: libc::timespec {
+                    tv_sec: 0,
+                    tv_nsec: 0,
+                }
+            };
+            unsafe {
+                assert_eq!(0, clock_gettime(libc::CLOCK_MONOTONIC, &mut t.t));
+            }
+            t
+        }
+
+        pub fn ns(&self) -> u64 {
+            self.t.tv_sec as u64 * NSEC_PER_SEC as u64 + self.t.tv_nsec as u64
+        }
+    }
+
+    impl<'a> Sub for &'a SteadyTime {
+        type Output = Duration;
+
+        fn sub(self, other: &SteadyTime) -> Duration {
+            if self.t.tv_nsec >= other.t.tv_nsec {
+                Duration::seconds(self.t.tv_sec as i64 - other.t.tv_sec as i64) +
+                    Duration::nanoseconds(self.t.tv_nsec as i64 - other.t.tv_nsec as i64)
+            } else {
+                Duration::seconds(self.t.tv_sec as i64 - 1 - other.t.tv_sec as i64) +
+                    Duration::nanoseconds(self.t.tv_nsec as i64 + NSEC_PER_SEC -
+                                          other.t.tv_nsec as i64)
+            }
+        }
+    }
+}
index 0e706c3cc6a57881794fd782fbfbaf75463a096a..72fc2f8700d6fc03d36ce8741be914fa13645bd0 100644 (file)
@@ -50,6 +50,7 @@ macro_rules! helper_init { (static $name:ident: Helper<$m:ty>) => (
 pub mod sync;
 pub mod stack_overflow;
 pub mod tcp;
+pub mod time;
 pub mod thread;
 pub mod thread_local;
 pub mod timer;
diff --git a/src/libstd/sys/windows/time.rs b/src/libstd/sys/windows/time.rs
new file mode 100644 (file)
index 0000000..20ceff0
--- /dev/null
@@ -0,0 +1,50 @@
+// Copyright 2015 The Rust Project Developers. See the COPYRIGHT
+// file at the top-level directory of this distribution and at
+// http://rust-lang.org/COPYRIGHT.
+//
+// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
+// option. This file may not be copied, modified, or distributed
+// except according to those terms.
+use libc;
+use ops::Sub;
+use time::Duration;
+use sync::{Once, ONCE_INIT};
+
+pub struct SteadyTime {
+    t: libc::LARGE_INTEGER,
+}
+
+impl SteadyTime {
+    pub fn now() -> SteadyTime {
+        let mut t = SteadyTime { t: 0 };
+        unsafe { libc::QueryPerformanceCounter(&mut t.t); }
+        t
+    }
+
+    pub fn ns(&self) -> u64 {
+        self.t as u64 * 1_000_000_000 / frequency() as u64
+    }
+}
+
+fn frequency() -> libc::LARGE_INTEGER {
+    static mut FREQUENCY: libc::LARGE_INTEGER = 0;
+    static ONCE: Once = ONCE_INIT;
+
+    unsafe {
+        ONCE.call_once(|| {
+            libc::QueryPerformanceFrequency(&mut FREQUENCY);
+        });
+        FREQUENCY
+    }
+}
+
+impl<'a> Sub for &'a SteadyTime {
+    type Output = Duration;
+
+    fn sub(self, other: &SteadyTime) -> Duration {
+        let diff = self.t as i64 - other.t as i64;
+        Duration::microseconds(diff * 1_000_000 / frequency() as i64)
+    }
+}
index d6c94f27a8baf9462dea4a451861ad329956d37b..f62571942a78df49cb3440306aeb5746d21d8df3 100644 (file)
@@ -10,7 +10,7 @@
 
 //! Temporal quantification.
 
-use libc;
+use sys::time::SteadyTime;
 
 pub use self::duration::Duration;
 
 /// in nanoseconds since an unspecified epoch.
 // NB: this is intentionally not public, this is not ready to stabilize its api.
 fn precise_time_ns() -> u64 {
-    return os_precise_time_ns();
-
-    #[cfg(windows)]
-    fn os_precise_time_ns() -> u64 {
-        let mut ticks_per_s = 0;
-        assert_eq!(unsafe {
-            libc::QueryPerformanceFrequency(&mut ticks_per_s)
-        }, 1);
-        let ticks_per_s = if ticks_per_s == 0 {1} else {ticks_per_s};
-        let mut ticks = 0;
-        assert_eq!(unsafe {
-            libc::QueryPerformanceCounter(&mut ticks)
-        }, 1);
-
-        return (ticks as u64 * 1000000000) / (ticks_per_s as u64);
-    }
-
-    #[cfg(any(target_os = "macos", target_os = "ios"))]
-    fn os_precise_time_ns() -> u64 {
-        use sync;
-
-        static mut TIMEBASE: libc::mach_timebase_info = libc::mach_timebase_info { numer: 0,
-                                                                                   denom: 0 };
-        static ONCE: sync::Once = sync::ONCE_INIT;
-        unsafe {
-            ONCE.call_once(|| {
-                imp::mach_timebase_info(&mut TIMEBASE);
-            });
-            let time = imp::mach_absolute_time();
-            time * TIMEBASE.numer as u64 / TIMEBASE.denom as u64
-        }
-    }
-
-    #[cfg(not(any(windows, target_os = "macos", target_os = "ios")))]
-    fn os_precise_time_ns() -> u64 {
-        let mut ts = libc::timespec { tv_sec: 0, tv_nsec: 0 };
-        unsafe {
-            imp::clock_gettime(libc::CLOCK_MONOTONIC, &mut ts);
-        }
-        return (ts.tv_sec as u64) * 1000000000 + (ts.tv_nsec as u64)
-    }
-}
-
-#[cfg(all(unix, not(target_os = "macos"), not(target_os = "ios")))]
-mod imp {
-    use libc::{c_int, timespec};
-
-    // Apparently android provides this in some other library?
-    #[cfg(not(target_os = "android"))]
-    #[link(name = "rt")]
-    extern {}
-
-    extern {
-        pub fn clock_gettime(clk_id: c_int, tp: *mut timespec) -> c_int;
-    }
-
-}
-#[cfg(any(target_os = "macos", target_os = "ios"))]
-mod imp {
-    use libc::{c_int, mach_timebase_info};
-
-    extern {
-        pub fn mach_absolute_time() -> u64;
-        pub fn mach_timebase_info(info: *mut mach_timebase_info) -> c_int;
-    }
+    SteadyTime::now().ns()
 }