]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/condvar.rs
Stabilize basic timeout functionality
[rust.git] / src / libstd / sync / condvar.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::v1::*;
12
13 use sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
14 use sync::poison::{self, LockResult};
15 use sys::time::SteadyTime;
16 use sys_common::condvar as sys;
17 use sys_common::mutex as sys_mutex;
18 use time::Duration;
19 use sync::{mutex, MutexGuard, PoisonError};
20
21 /// A Condition Variable
22 ///
23 /// Condition variables represent the ability to block a thread such that it
24 /// consumes no CPU time while waiting for an event to occur. Condition
25 /// variables are typically associated with a boolean predicate (a condition)
26 /// and a mutex. The predicate is always verified inside of the mutex before
27 /// determining that thread must block.
28 ///
29 /// Functions in this module will block the current **thread** of execution and
30 /// are bindings to system-provided condition variables where possible. Note
31 /// that this module places one additional restriction over the system condition
32 /// variables: each condvar can be used with precisely one mutex at runtime. Any
33 /// attempt to use multiple mutexes on the same condition variable will result
34 /// in a runtime panic. If this is not desired, then the unsafe primitives in
35 /// `sys` do not have this restriction but may result in undefined behavior.
36 ///
37 /// # Examples
38 ///
39 /// ```
40 /// use std::sync::{Arc, Mutex, Condvar};
41 /// use std::thread;
42 ///
43 /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
44 /// let pair2 = pair.clone();
45 ///
46 /// // Inside of our lock, spawn a new thread, and then wait for it to start
47 /// thread::spawn(move|| {
48 ///     let &(ref lock, ref cvar) = &*pair2;
49 ///     let mut started = lock.lock().unwrap();
50 ///     *started = true;
51 ///     cvar.notify_one();
52 /// });
53 ///
54 /// // wait for the thread to start up
55 /// let &(ref lock, ref cvar) = &*pair;
56 /// let mut started = lock.lock().unwrap();
57 /// while !*started {
58 ///     started = cvar.wait(started).unwrap();
59 /// }
60 /// ```
61 #[stable(feature = "rust1", since = "1.0.0")]
62 pub struct Condvar { inner: Box<StaticCondvar> }
63
64 /// Statically allocated condition variables.
65 ///
66 /// This structure is identical to `Condvar` except that it is suitable for use
67 /// in static initializers for other structures.
68 ///
69 /// # Examples
70 ///
71 /// ```
72 /// # #![feature(std_misc)]
73 /// use std::sync::{StaticCondvar, CONDVAR_INIT};
74 ///
75 /// static CVAR: StaticCondvar = CONDVAR_INIT;
76 /// ```
77 #[unstable(feature = "std_misc",
78            reason = "may be merged with Condvar in the future")]
79 pub struct StaticCondvar {
80     inner: sys::Condvar,
81     mutex: AtomicUsize,
82 }
83
84 /// Constant initializer for a statically allocated condition variable.
85 #[unstable(feature = "std_misc",
86            reason = "may be merged with Condvar in the future")]
87 pub const CONDVAR_INIT: StaticCondvar = StaticCondvar {
88     inner: sys::CONDVAR_INIT,
89     mutex: ATOMIC_USIZE_INIT,
90 };
91
92 impl Condvar {
93     /// Creates a new condition variable which is ready to be waited on and
94     /// notified.
95     #[stable(feature = "rust1", since = "1.0.0")]
96     pub fn new() -> Condvar {
97         Condvar {
98             inner: box StaticCondvar {
99                 inner: unsafe { sys::Condvar::new() },
100                 mutex: AtomicUsize::new(0),
101             }
102         }
103     }
104
105     /// Block the current thread until this condition variable receives a
106     /// notification.
107     ///
108     /// This function will atomically unlock the mutex specified (represented by
109     /// `mutex_guard`) and block the current thread. This means that any calls
110     /// to `notify_*()` which happen logically after the mutex is unlocked are
111     /// candidates to wake this thread up. When this function call returns, the
112     /// lock specified will have been re-acquired.
113     ///
114     /// Note that this function is susceptible to spurious wakeups. Condition
115     /// variables normally have a boolean predicate associated with them, and
116     /// the predicate must always be checked each time this function returns to
117     /// protect against spurious wakeups.
118     ///
119     /// # Failure
120     ///
121     /// This function will return an error if the mutex being waited on is
122     /// poisoned when this thread re-acquires the lock. For more information,
123     /// see information about poisoning on the Mutex type.
124     ///
125     /// # Panics
126     ///
127     /// This function will `panic!()` if it is used with more than one mutex
128     /// over time. Each condition variable is dynamically bound to exactly one
129     /// mutex to ensure defined behavior across platforms. If this functionality
130     /// is not desired, then unsafe primitives in `sys` are provided.
131     #[stable(feature = "rust1", since = "1.0.0")]
132     pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>)
133                        -> LockResult<MutexGuard<'a, T>> {
134         unsafe {
135             let me: &'static Condvar = &*(self as *const _);
136             me.inner.wait(guard)
137         }
138     }
139
140     /// Wait on this condition variable for a notification, timing out after a
141     /// specified duration.
142     ///
143     /// The semantics of this function are equivalent to `wait()`
144     /// except that the thread will be blocked for roughly no longer
145     /// than `ms` milliseconds. This method should not be used for
146     /// precise timing due to anomalies such as preemption or platform
147     /// differences that may not cause the maximum amount of time
148     /// waited to be precisely `ms`.
149     ///
150     /// The returned boolean is `false` only if the timeout is known
151     /// to have elapsed.
152     ///
153     /// Like `wait`, the lock specified will be re-acquired when this function
154     /// returns, regardless of whether the timeout elapsed or not.
155     #[stable(feature = "rust1", since = "1.0.0")]
156     pub fn wait_timeout_ms<'a, T>(&self, guard: MutexGuard<'a, T>, ms: u32)
157                                   -> LockResult<(MutexGuard<'a, T>, bool)> {
158         unsafe {
159             let me: &'static Condvar = &*(self as *const _);
160             me.inner.wait_timeout_ms(guard, ms)
161         }
162     }
163
164     /// Deprecated: use `wait_timeout_ms` instead.
165     #[unstable(feature = "std_misc")]
166     #[deprecated(since = "1.0.0", reason = "use wait_timeout_ms instead")]
167     pub fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>, dur: Duration)
168                                -> LockResult<(MutexGuard<'a, T>, bool)> {
169         self.wait_timeout_ms(guard, dur.num_milliseconds() as u32)
170     }
171
172     /// Wait on this condition variable for a notification, timing out after a
173     /// specified duration.
174     ///
175     /// The semantics of this function are equivalent to `wait_timeout` except
176     /// that the implementation will repeatedly wait while the duration has not
177     /// passed and the provided function returns `false`.
178     #[unstable(feature = "wait_timeout_with",
179                reason = "unsure if this API is broadly needed or what form it should take")]
180     pub fn wait_timeout_with<'a, T, F>(&self,
181                                        guard: MutexGuard<'a, T>,
182                                        dur: Duration,
183                                        f: F)
184                                        -> LockResult<(MutexGuard<'a, T>, bool)>
185             where F: FnMut(LockResult<&mut T>) -> bool {
186         unsafe {
187             let me: &'static Condvar = &*(self as *const _);
188             me.inner.wait_timeout_with(guard, dur, f)
189         }
190     }
191
192     /// Wake up one blocked thread on this condvar.
193     ///
194     /// If there is a blocked thread on this condition variable, then it will
195     /// be woken up from its call to `wait` or `wait_timeout`. Calls to
196     /// `notify_one` are not buffered in any way.
197     ///
198     /// To wake up all threads, see `notify_all()`.
199     #[stable(feature = "rust1", since = "1.0.0")]
200     pub fn notify_one(&self) { unsafe { self.inner.inner.notify_one() } }
201
202     /// Wake up all blocked threads on this condvar.
203     ///
204     /// This method will ensure that any current waiters on the condition
205     /// variable are awoken. Calls to `notify_all()` are not buffered in any
206     /// way.
207     ///
208     /// To wake up only one thread, see `notify_one()`.
209     #[stable(feature = "rust1", since = "1.0.0")]
210     pub fn notify_all(&self) { unsafe { self.inner.inner.notify_all() } }
211 }
212
213 #[stable(feature = "rust1", since = "1.0.0")]
214 impl Drop for Condvar {
215     fn drop(&mut self) {
216         unsafe { self.inner.inner.destroy() }
217     }
218 }
219
220 impl StaticCondvar {
221     /// Block the current thread until this condition variable receives a
222     /// notification.
223     ///
224     /// See `Condvar::wait`.
225     #[unstable(feature = "std_misc",
226                reason = "may be merged with Condvar in the future")]
227     pub fn wait<'a, T>(&'static self, guard: MutexGuard<'a, T>)
228                        -> LockResult<MutexGuard<'a, T>> {
229         let poisoned = unsafe {
230             let lock = mutex::guard_lock(&guard);
231             self.verify(lock);
232             self.inner.wait(lock);
233             mutex::guard_poison(&guard).get()
234         };
235         if poisoned {
236             Err(PoisonError::new(guard))
237         } else {
238             Ok(guard)
239         }
240     }
241
242     /// Wait on this condition variable for a notification, timing out after a
243     /// specified duration.
244     ///
245     /// See `Condvar::wait_timeout`.
246     #[unstable(feature = "std_misc",
247                reason = "may be merged with Condvar in the future")]
248     pub fn wait_timeout_ms<'a, T>(&'static self, guard: MutexGuard<'a, T>, ms: u32)
249                                   -> LockResult<(MutexGuard<'a, T>, bool)> {
250         let (poisoned, success) = unsafe {
251             let lock = mutex::guard_lock(&guard);
252             self.verify(lock);
253             let success = self.inner.wait_timeout(lock, Duration::milliseconds(ms as i64));
254             (mutex::guard_poison(&guard).get(), success)
255         };
256         if poisoned {
257             Err(PoisonError::new((guard, success)))
258         } else {
259             Ok((guard, success))
260         }
261     }
262
263     /// Wait on this condition variable for a notification, timing out after a
264     /// specified duration.
265     ///
266     /// The implementation will repeatedly wait while the duration has not
267     /// passed and the function returns `false`.
268     ///
269     /// See `Condvar::wait_timeout_with`.
270     #[unstable(feature = "std_misc",
271                reason = "may be merged with Condvar in the future")]
272     pub fn wait_timeout_with<'a, T, F>(&'static self,
273                                        guard: MutexGuard<'a, T>,
274                                        dur: Duration,
275                                        mut f: F)
276                                        -> LockResult<(MutexGuard<'a, T>, bool)>
277             where F: FnMut(LockResult<&mut T>) -> bool {
278         // This could be made more efficient by pushing the implementation into sys::condvar
279         let start = SteadyTime::now();
280         let mut guard_result: LockResult<MutexGuard<'a, T>> = Ok(guard);
281         while !f(guard_result
282                     .as_mut()
283                     .map(|g| &mut **g)
284                     .map_err(|e| PoisonError::new(&mut **e.get_mut()))) {
285             let now = SteadyTime::now();
286             let consumed = &now - &start;
287             let guard = guard_result.unwrap_or_else(|e| e.into_inner());
288             let res = self.wait_timeout_ms(guard, (dur - consumed).num_milliseconds() as u32);
289             let (new_guard_result, no_timeout) = match res {
290                 Ok((new_guard, no_timeout)) => (Ok(new_guard), no_timeout),
291                 Err(err) => {
292                     let (new_guard, no_timeout) = err.into_inner();
293                     (Err(PoisonError::new(new_guard)), no_timeout)
294                 }
295             };
296             guard_result = new_guard_result;
297             if !no_timeout {
298                 let result = f(guard_result
299                                     .as_mut()
300                                     .map(|g| &mut **g)
301                                     .map_err(|e| PoisonError::new(&mut **e.get_mut())));
302                 return poison::map_result(guard_result, |g| (g, result));
303             }
304         }
305
306         poison::map_result(guard_result, |g| (g, true))
307     }
308
309     /// Wake up one blocked thread on this condvar.
310     ///
311     /// See `Condvar::notify_one`.
312     #[unstable(feature = "std_misc",
313                reason = "may be merged with Condvar in the future")]
314     pub fn notify_one(&'static self) { unsafe { self.inner.notify_one() } }
315
316     /// Wake up all blocked threads on this condvar.
317     ///
318     /// See `Condvar::notify_all`.
319     #[unstable(feature = "std_misc",
320                reason = "may be merged with Condvar in the future")]
321     pub fn notify_all(&'static self) { unsafe { self.inner.notify_all() } }
322
323     /// Deallocate all resources associated with this static condvar.
324     ///
325     /// This method is unsafe to call as there is no guarantee that there are no
326     /// active users of the condvar, and this also doesn't prevent any future
327     /// users of the condvar. This method is required to be called to not leak
328     /// memory on all platforms.
329     #[unstable(feature = "std_misc",
330                reason = "may be merged with Condvar in the future")]
331     pub unsafe fn destroy(&'static self) {
332         self.inner.destroy()
333     }
334
335     fn verify(&self, mutex: &sys_mutex::Mutex) {
336         let addr = mutex as *const _ as usize;
337         match self.mutex.compare_and_swap(0, addr, Ordering::SeqCst) {
338             // If we got out 0, then we have successfully bound the mutex to
339             // this cvar.
340             0 => {}
341
342             // If we get out a value that's the same as `addr`, then someone
343             // already beat us to the punch.
344             n if n == addr => {}
345
346             // Anything else and we're using more than one mutex on this cvar,
347             // which is currently disallowed.
348             _ => panic!("attempted to use a condition variable with two \
349                          mutexes"),
350         }
351     }
352 }
353
354 #[cfg(test)]
355 mod tests {
356     use prelude::v1::*;
357
358     use super::{StaticCondvar, CONDVAR_INIT};
359     use sync::mpsc::channel;
360     use sync::{StaticMutex, MUTEX_INIT, Condvar, Mutex, Arc};
361     use sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
362     use thread;
363     use time::Duration;
364     use u32;
365
366     #[test]
367     fn smoke() {
368         let c = Condvar::new();
369         c.notify_one();
370         c.notify_all();
371     }
372
373     #[test]
374     fn static_smoke() {
375         static C: StaticCondvar = CONDVAR_INIT;
376         C.notify_one();
377         C.notify_all();
378         unsafe { C.destroy(); }
379     }
380
381     #[test]
382     fn notify_one() {
383         static C: StaticCondvar = CONDVAR_INIT;
384         static M: StaticMutex = MUTEX_INIT;
385
386         let g = M.lock().unwrap();
387         let _t = thread::spawn(move|| {
388             let _g = M.lock().unwrap();
389             C.notify_one();
390         });
391         let g = C.wait(g).unwrap();
392         drop(g);
393         unsafe { C.destroy(); M.destroy(); }
394     }
395
396     #[test]
397     fn notify_all() {
398         const N: usize = 10;
399
400         let data = Arc::new((Mutex::new(0), Condvar::new()));
401         let (tx, rx) = channel();
402         for _ in 0..N {
403             let data = data.clone();
404             let tx = tx.clone();
405             thread::spawn(move|| {
406                 let &(ref lock, ref cond) = &*data;
407                 let mut cnt = lock.lock().unwrap();
408                 *cnt += 1;
409                 if *cnt == N {
410                     tx.send(()).unwrap();
411                 }
412                 while *cnt != 0 {
413                     cnt = cond.wait(cnt).unwrap();
414                 }
415                 tx.send(()).unwrap();
416             });
417         }
418         drop(tx);
419
420         let &(ref lock, ref cond) = &*data;
421         rx.recv().unwrap();
422         let mut cnt = lock.lock().unwrap();
423         *cnt = 0;
424         cond.notify_all();
425         drop(cnt);
426
427         for _ in 0..N {
428             rx.recv().unwrap();
429         }
430     }
431
432     #[test]
433     fn wait_timeout_ms() {
434         static C: StaticCondvar = CONDVAR_INIT;
435         static M: StaticMutex = MUTEX_INIT;
436
437         let g = M.lock().unwrap();
438         let (g, _no_timeout) = C.wait_timeout_ms(g, 1).unwrap();
439         // spurious wakeups mean this isn't necessarily true
440         // assert!(!no_timeout);
441         let _t = thread::spawn(move || {
442             let _g = M.lock().unwrap();
443             C.notify_one();
444         });
445         let (g, no_timeout) = C.wait_timeout_ms(g, u32::MAX).unwrap();
446         assert!(no_timeout);
447         drop(g);
448         unsafe { C.destroy(); M.destroy(); }
449     }
450
451     #[test]
452     fn wait_timeout_with() {
453         static C: StaticCondvar = CONDVAR_INIT;
454         static M: StaticMutex = MUTEX_INIT;
455         static S: AtomicUsize = ATOMIC_USIZE_INIT;
456
457         let g = M.lock().unwrap();
458         let (g, success) = C.wait_timeout_with(g, Duration::nanoseconds(1000), |_| false).unwrap();
459         assert!(!success);
460
461         let (tx, rx) = channel();
462         let _t = thread::spawn(move || {
463             rx.recv().unwrap();
464             let g = M.lock().unwrap();
465             S.store(1, Ordering::SeqCst);
466             C.notify_one();
467             drop(g);
468
469             rx.recv().unwrap();
470             let g = M.lock().unwrap();
471             S.store(2, Ordering::SeqCst);
472             C.notify_one();
473             drop(g);
474
475             rx.recv().unwrap();
476             let _g = M.lock().unwrap();
477             S.store(3, Ordering::SeqCst);
478             C.notify_one();
479         });
480
481         let mut state = 0;
482         let (_g, success) = C.wait_timeout_with(g, Duration::days(1), |_| {
483             assert_eq!(state, S.load(Ordering::SeqCst));
484             tx.send(()).unwrap();
485             state += 1;
486             match state {
487                 1|2 => false,
488                 _ => true,
489             }
490         }).unwrap();
491         assert!(success);
492     }
493
494     #[test]
495     #[should_panic]
496     fn two_mutexes() {
497         static M1: StaticMutex = MUTEX_INIT;
498         static M2: StaticMutex = MUTEX_INIT;
499         static C: StaticCondvar = CONDVAR_INIT;
500
501         let mut g = M1.lock().unwrap();
502         let _t = thread::spawn(move|| {
503             let _g = M1.lock().unwrap();
504             C.notify_one();
505         });
506         g = C.wait(g).unwrap();
507         drop(g);
508
509         let _ = C.wait(M2.lock().unwrap()).unwrap();
510     }
511 }