]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/condvar.rs
Add test for MIR range matching.
[rust.git] / src / libstd / sync / condvar.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::v1::*;
12
13 use sync::atomic::{AtomicUsize, Ordering};
14 use sync::{mutex, MutexGuard, PoisonError};
15 use sys_common::condvar as sys;
16 use sys_common::mutex as sys_mutex;
17 use sys_common::poison::{self, LockResult};
18 use time::{Instant, Duration};
19
20 /// A type indicating whether a timed wait on a condition variable returned
21 /// due to a time out or not.
22 #[derive(Debug, PartialEq, Eq, Copy, Clone)]
23 #[stable(feature = "wait_timeout", since = "1.5.0")]
24 pub struct WaitTimeoutResult(bool);
25
26 impl WaitTimeoutResult {
27     /// Returns whether the wait was known to have timed out.
28     #[stable(feature = "wait_timeout", since = "1.5.0")]
29     pub fn timed_out(&self) -> bool {
30         self.0
31     }
32 }
33
34 /// A Condition Variable
35 ///
36 /// Condition variables represent the ability to block a thread such that it
37 /// consumes no CPU time while waiting for an event to occur. Condition
38 /// variables are typically associated with a boolean predicate (a condition)
39 /// and a mutex. The predicate is always verified inside of the mutex before
40 /// determining that thread must block.
41 ///
42 /// Functions in this module will block the current **thread** of execution and
43 /// are bindings to system-provided condition variables where possible. Note
44 /// that this module places one additional restriction over the system condition
45 /// variables: each condvar can be used with precisely one mutex at runtime. Any
46 /// attempt to use multiple mutexes on the same condition variable will result
47 /// in a runtime panic. If this is not desired, then the unsafe primitives in
48 /// `sys` do not have this restriction but may result in undefined behavior.
49 ///
50 /// # Examples
51 ///
52 /// ```
53 /// use std::sync::{Arc, Mutex, Condvar};
54 /// use std::thread;
55 ///
56 /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
57 /// let pair2 = pair.clone();
58 ///
59 /// // Inside of our lock, spawn a new thread, and then wait for it to start
60 /// thread::spawn(move|| {
61 ///     let &(ref lock, ref cvar) = &*pair2;
62 ///     let mut started = lock.lock().unwrap();
63 ///     *started = true;
64 ///     cvar.notify_one();
65 /// });
66 ///
67 /// // wait for the thread to start up
68 /// let &(ref lock, ref cvar) = &*pair;
69 /// let mut started = lock.lock().unwrap();
70 /// while !*started {
71 ///     started = cvar.wait(started).unwrap();
72 /// }
73 /// ```
74 #[stable(feature = "rust1", since = "1.0.0")]
75 pub struct Condvar { inner: Box<StaticCondvar> }
76
77 /// Statically allocated condition variables.
78 ///
79 /// This structure is identical to `Condvar` except that it is suitable for use
80 /// in static initializers for other structures.
81 ///
82 /// # Examples
83 ///
84 /// ```
85 /// #![feature(static_condvar)]
86 ///
87 /// use std::sync::{StaticCondvar, CONDVAR_INIT};
88 ///
89 /// static CVAR: StaticCondvar = CONDVAR_INIT;
90 /// ```
91 #[unstable(feature = "static_condvar",
92            reason = "may be merged with Condvar in the future",
93            issue = "27717")]
94 pub struct StaticCondvar {
95     inner: sys::Condvar,
96     mutex: AtomicUsize,
97 }
98
99 /// Constant initializer for a statically allocated condition variable.
100 #[unstable(feature = "static_condvar",
101            reason = "may be merged with Condvar in the future",
102            issue = "27717")]
103 pub const CONDVAR_INIT: StaticCondvar = StaticCondvar::new();
104
105 impl Condvar {
106     /// Creates a new condition variable which is ready to be waited on and
107     /// notified.
108     #[stable(feature = "rust1", since = "1.0.0")]
109     pub fn new() -> Condvar {
110         Condvar {
111             inner: box StaticCondvar {
112                 inner: sys::Condvar::new(),
113                 mutex: AtomicUsize::new(0),
114             }
115         }
116     }
117
118     /// Blocks the current thread until this condition variable receives a
119     /// notification.
120     ///
121     /// This function will atomically unlock the mutex specified (represented by
122     /// `mutex_guard`) and block the current thread. This means that any calls
123     /// to `notify_*()` which happen logically after the mutex is unlocked are
124     /// candidates to wake this thread up. When this function call returns, the
125     /// lock specified will have been re-acquired.
126     ///
127     /// Note that this function is susceptible to spurious wakeups. Condition
128     /// variables normally have a boolean predicate associated with them, and
129     /// the predicate must always be checked each time this function returns to
130     /// protect against spurious wakeups.
131     ///
132     /// # Failure
133     ///
134     /// This function will return an error if the mutex being waited on is
135     /// poisoned when this thread re-acquires the lock. For more information,
136     /// see information about poisoning on the Mutex type.
137     ///
138     /// # Panics
139     ///
140     /// This function will `panic!()` if it is used with more than one mutex
141     /// over time. Each condition variable is dynamically bound to exactly one
142     /// mutex to ensure defined behavior across platforms. If this functionality
143     /// is not desired, then unsafe primitives in `sys` are provided.
144     #[stable(feature = "rust1", since = "1.0.0")]
145     pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>)
146                        -> LockResult<MutexGuard<'a, T>> {
147         unsafe {
148             let me: &'static Condvar = &*(self as *const _);
149             me.inner.wait(guard)
150         }
151     }
152
153     /// Waits on this condition variable for a notification, timing out after a
154     /// specified duration.
155     ///
156     /// The semantics of this function are equivalent to `wait()`
157     /// except that the thread will be blocked for roughly no longer
158     /// than `ms` milliseconds. This method should not be used for
159     /// precise timing due to anomalies such as preemption or platform
160     /// differences that may not cause the maximum amount of time
161     /// waited to be precisely `ms`.
162     ///
163     /// The returned boolean is `false` only if the timeout is known
164     /// to have elapsed.
165     ///
166     /// Like `wait`, the lock specified will be re-acquired when this function
167     /// returns, regardless of whether the timeout elapsed or not.
168     #[stable(feature = "rust1", since = "1.0.0")]
169     #[rustc_deprecated(since = "1.6.0", reason = "replaced by `std::sync::Condvar::wait_timeout`")]
170     #[allow(deprecated)]
171     pub fn wait_timeout_ms<'a, T>(&self, guard: MutexGuard<'a, T>, ms: u32)
172                                   -> LockResult<(MutexGuard<'a, T>, bool)> {
173         unsafe {
174             let me: &'static Condvar = &*(self as *const _);
175             me.inner.wait_timeout_ms(guard, ms)
176         }
177     }
178
179     /// Waits on this condition variable for a notification, timing out after a
180     /// specified duration.
181     ///
182     /// The semantics of this function are equivalent to `wait()` except that
183     /// the thread will be blocked for roughly no longer than `dur`. This
184     /// method should not be used for precise timing due to anomalies such as
185     /// preemption or platform differences that may not cause the maximum
186     /// amount of time waited to be precisely `dur`.
187     ///
188     /// The returned `WaitTimeoutResult` value indicates if the timeout is
189     /// known to have elapsed.
190     ///
191     /// Like `wait`, the lock specified will be re-acquired when this function
192     /// returns, regardless of whether the timeout elapsed or not.
193     #[stable(feature = "wait_timeout", since = "1.5.0")]
194     pub fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>,
195                                dur: Duration)
196                                -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> {
197         unsafe {
198             let me: &'static Condvar = &*(self as *const _);
199             me.inner.wait_timeout(guard, dur)
200         }
201     }
202
203     /// Waits on this condition variable for a notification, timing out after a
204     /// specified duration.
205     ///
206     /// The semantics of this function are equivalent to `wait_timeout` except
207     /// that the implementation will repeatedly wait while the duration has not
208     /// passed and the provided function returns `false`.
209     #[unstable(feature = "wait_timeout_with",
210                reason = "unsure if this API is broadly needed or what form it should take",
211                issue = "27748")]
212     pub fn wait_timeout_with<'a, T, F>(&self,
213                                        guard: MutexGuard<'a, T>,
214                                        dur: Duration,
215                                        f: F)
216                                        -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)>
217             where F: FnMut(LockResult<&mut T>) -> bool {
218         unsafe {
219             let me: &'static Condvar = &*(self as *const _);
220             me.inner.wait_timeout_with(guard, dur, f)
221         }
222     }
223
224     /// Wakes up one blocked thread on this condvar.
225     ///
226     /// If there is a blocked thread on this condition variable, then it will
227     /// be woken up from its call to `wait` or `wait_timeout`. Calls to
228     /// `notify_one` are not buffered in any way.
229     ///
230     /// To wake up all threads, see `notify_all()`.
231     #[stable(feature = "rust1", since = "1.0.0")]
232     pub fn notify_one(&self) { unsafe { self.inner.inner.notify_one() } }
233
234     /// Wakes up all blocked threads on this condvar.
235     ///
236     /// This method will ensure that any current waiters on the condition
237     /// variable are awoken. Calls to `notify_all()` are not buffered in any
238     /// way.
239     ///
240     /// To wake up only one thread, see `notify_one()`.
241     #[stable(feature = "rust1", since = "1.0.0")]
242     pub fn notify_all(&self) { unsafe { self.inner.inner.notify_all() } }
243 }
244
245 #[stable(feature = "rust1", since = "1.0.0")]
246 impl Drop for Condvar {
247     fn drop(&mut self) {
248         unsafe { self.inner.inner.destroy() }
249     }
250 }
251
252 impl StaticCondvar {
253     /// Creates a new condition variable
254     #[unstable(feature = "static_condvar",
255                reason = "may be merged with Condvar in the future",
256                issue = "27717")]
257     pub const fn new() -> StaticCondvar {
258         StaticCondvar {
259             inner: sys::Condvar::new(),
260             mutex: AtomicUsize::new(0),
261         }
262     }
263
264     /// Blocks the current thread until this condition variable receives a
265     /// notification.
266     ///
267     /// See `Condvar::wait`.
268     #[unstable(feature = "static_condvar",
269                reason = "may be merged with Condvar in the future",
270                issue = "27717")]
271     pub fn wait<'a, T>(&'static self, guard: MutexGuard<'a, T>)
272                        -> LockResult<MutexGuard<'a, T>> {
273         let poisoned = unsafe {
274             let lock = mutex::guard_lock(&guard);
275             self.verify(lock);
276             self.inner.wait(lock);
277             mutex::guard_poison(&guard).get()
278         };
279         if poisoned {
280             Err(PoisonError::new(guard))
281         } else {
282             Ok(guard)
283         }
284     }
285
286     /// Waits on this condition variable for a notification, timing out after a
287     /// specified duration.
288     ///
289     /// See `Condvar::wait_timeout`.
290     #[unstable(feature = "static_condvar",
291                reason = "may be merged with Condvar in the future",
292                issue = "27717")]
293     #[rustc_deprecated(since = "1.6.0",
294                        reason = "replaced by `std::sync::StaticCondvar::wait_timeout`")]
295     pub fn wait_timeout_ms<'a, T>(&'static self, guard: MutexGuard<'a, T>, ms: u32)
296                                   -> LockResult<(MutexGuard<'a, T>, bool)> {
297         match self.wait_timeout(guard, Duration::from_millis(ms as u64)) {
298             Ok((guard, timed_out)) => Ok((guard, !timed_out.timed_out())),
299             Err(poison) => {
300                 let (guard, timed_out) = poison.into_inner();
301                 Err(PoisonError::new((guard, !timed_out.timed_out())))
302             }
303         }
304     }
305
306     /// Waits on this condition variable for a notification, timing out after a
307     /// specified duration.
308     ///
309     /// See `Condvar::wait_timeout`.
310     #[unstable(feature = "static_condvar",
311                reason = "may be merged with Condvar in the future",
312                issue = "27717")]
313     pub fn wait_timeout<'a, T>(&'static self,
314                                guard: MutexGuard<'a, T>,
315                                timeout: Duration)
316                                -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> {
317         let (poisoned, result) = unsafe {
318             let lock = mutex::guard_lock(&guard);
319             self.verify(lock);
320             let success = self.inner.wait_timeout(lock, timeout);
321             (mutex::guard_poison(&guard).get(), WaitTimeoutResult(!success))
322         };
323         if poisoned {
324             Err(PoisonError::new((guard, result)))
325         } else {
326             Ok((guard, result))
327         }
328     }
329
330     /// Waits on this condition variable for a notification, timing out after a
331     /// specified duration.
332     ///
333     /// The implementation will repeatedly wait while the duration has not
334     /// passed and the function returns `false`.
335     ///
336     /// See `Condvar::wait_timeout_with`.
337     #[unstable(feature = "static_condvar",
338                reason = "may be merged with Condvar in the future",
339                issue = "27717")]
340     pub fn wait_timeout_with<'a, T, F>(&'static self,
341                                        guard: MutexGuard<'a, T>,
342                                        dur: Duration,
343                                        mut f: F)
344                                        -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)>
345             where F: FnMut(LockResult<&mut T>) -> bool {
346         // This could be made more efficient by pushing the implementation into
347         // sys::condvar
348         let start = Instant::now();
349         let mut guard_result: LockResult<MutexGuard<'a, T>> = Ok(guard);
350         while !f(guard_result
351                     .as_mut()
352                     .map(|g| &mut **g)
353                     .map_err(|e| PoisonError::new(&mut **e.get_mut()))) {
354             let consumed = start.elapsed();
355             let guard = guard_result.unwrap_or_else(|e| e.into_inner());
356             let (new_guard_result, timed_out) = if consumed > dur {
357                 (Ok(guard), WaitTimeoutResult(true))
358             } else {
359                 match self.wait_timeout(guard, dur - consumed) {
360                     Ok((new_guard, timed_out)) => (Ok(new_guard), timed_out),
361                     Err(err) => {
362                         let (new_guard, no_timeout) = err.into_inner();
363                         (Err(PoisonError::new(new_guard)), no_timeout)
364                     }
365                 }
366             };
367             guard_result = new_guard_result;
368             if timed_out.timed_out() {
369                 let result = f(guard_result
370                                     .as_mut()
371                                     .map(|g| &mut **g)
372                                     .map_err(|e| PoisonError::new(&mut **e.get_mut())));
373                 let result = WaitTimeoutResult(!result);
374                 return poison::map_result(guard_result, |g| (g, result));
375             }
376         }
377
378         poison::map_result(guard_result, |g| (g, WaitTimeoutResult(false)))
379     }
380
381     /// Wakes up one blocked thread on this condvar.
382     ///
383     /// See `Condvar::notify_one`.
384     #[unstable(feature = "static_condvar",
385                reason = "may be merged with Condvar in the future",
386                issue = "27717")]
387     pub fn notify_one(&'static self) { unsafe { self.inner.notify_one() } }
388
389     /// Wakes up all blocked threads on this condvar.
390     ///
391     /// See `Condvar::notify_all`.
392     #[unstable(feature = "static_condvar",
393                reason = "may be merged with Condvar in the future",
394                issue = "27717")]
395     pub fn notify_all(&'static self) { unsafe { self.inner.notify_all() } }
396
397     /// Deallocates all resources associated with this static condvar.
398     ///
399     /// This method is unsafe to call as there is no guarantee that there are no
400     /// active users of the condvar, and this also doesn't prevent any future
401     /// users of the condvar. This method is required to be called to not leak
402     /// memory on all platforms.
403     #[unstable(feature = "static_condvar",
404                reason = "may be merged with Condvar in the future",
405                issue = "27717")]
406     pub unsafe fn destroy(&'static self) {
407         self.inner.destroy()
408     }
409
410     fn verify(&self, mutex: &sys_mutex::Mutex) {
411         let addr = mutex as *const _ as usize;
412         match self.mutex.compare_and_swap(0, addr, Ordering::SeqCst) {
413             // If we got out 0, then we have successfully bound the mutex to
414             // this cvar.
415             0 => {}
416
417             // If we get out a value that's the same as `addr`, then someone
418             // already beat us to the punch.
419             n if n == addr => {}
420
421             // Anything else and we're using more than one mutex on this cvar,
422             // which is currently disallowed.
423             _ => panic!("attempted to use a condition variable with two \
424                          mutexes"),
425         }
426     }
427 }
428
429 #[cfg(test)]
430 mod tests {
431     use prelude::v1::*;
432
433     use super::StaticCondvar;
434     use sync::mpsc::channel;
435     use sync::{StaticMutex, Condvar, Mutex, Arc};
436     use sync::atomic::{AtomicUsize, Ordering};
437     use thread;
438     use time::Duration;
439     use u32;
440
441     #[test]
442     fn smoke() {
443         let c = Condvar::new();
444         c.notify_one();
445         c.notify_all();
446     }
447
448     #[test]
449     fn static_smoke() {
450         static C: StaticCondvar = StaticCondvar::new();
451         C.notify_one();
452         C.notify_all();
453         unsafe { C.destroy(); }
454     }
455
456     #[test]
457     fn notify_one() {
458         static C: StaticCondvar = StaticCondvar::new();
459         static M: StaticMutex = StaticMutex::new();
460
461         let g = M.lock().unwrap();
462         let _t = thread::spawn(move|| {
463             let _g = M.lock().unwrap();
464             C.notify_one();
465         });
466         let g = C.wait(g).unwrap();
467         drop(g);
468         unsafe { C.destroy(); M.destroy(); }
469     }
470
471     #[test]
472     fn notify_all() {
473         const N: usize = 10;
474
475         let data = Arc::new((Mutex::new(0), Condvar::new()));
476         let (tx, rx) = channel();
477         for _ in 0..N {
478             let data = data.clone();
479             let tx = tx.clone();
480             thread::spawn(move|| {
481                 let &(ref lock, ref cond) = &*data;
482                 let mut cnt = lock.lock().unwrap();
483                 *cnt += 1;
484                 if *cnt == N {
485                     tx.send(()).unwrap();
486                 }
487                 while *cnt != 0 {
488                     cnt = cond.wait(cnt).unwrap();
489                 }
490                 tx.send(()).unwrap();
491             });
492         }
493         drop(tx);
494
495         let &(ref lock, ref cond) = &*data;
496         rx.recv().unwrap();
497         let mut cnt = lock.lock().unwrap();
498         *cnt = 0;
499         cond.notify_all();
500         drop(cnt);
501
502         for _ in 0..N {
503             rx.recv().unwrap();
504         }
505     }
506
507     #[test]
508     fn wait_timeout_ms() {
509         static C: StaticCondvar = StaticCondvar::new();
510         static M: StaticMutex = StaticMutex::new();
511
512         let g = M.lock().unwrap();
513         let (g, _no_timeout) = C.wait_timeout_ms(g, 1).unwrap();
514         // spurious wakeups mean this isn't necessarily true
515         // assert!(!no_timeout);
516         let _t = thread::spawn(move || {
517             let _g = M.lock().unwrap();
518             C.notify_one();
519         });
520         let (g, no_timeout) = C.wait_timeout_ms(g, u32::MAX).unwrap();
521         assert!(no_timeout);
522         drop(g);
523         unsafe { C.destroy(); M.destroy(); }
524     }
525
526     #[test]
527     fn wait_timeout_with() {
528         static C: StaticCondvar = StaticCondvar::new();
529         static M: StaticMutex = StaticMutex::new();
530         static S: AtomicUsize = AtomicUsize::new(0);
531
532         let g = M.lock().unwrap();
533         let (g, timed_out) = C.wait_timeout_with(g, Duration::new(0, 1000), |_| {
534             false
535         }).unwrap();
536         assert!(timed_out.timed_out());
537
538         let (tx, rx) = channel();
539         let _t = thread::spawn(move || {
540             rx.recv().unwrap();
541             let g = M.lock().unwrap();
542             S.store(1, Ordering::SeqCst);
543             C.notify_one();
544             drop(g);
545
546             rx.recv().unwrap();
547             let g = M.lock().unwrap();
548             S.store(2, Ordering::SeqCst);
549             C.notify_one();
550             drop(g);
551
552             rx.recv().unwrap();
553             let _g = M.lock().unwrap();
554             S.store(3, Ordering::SeqCst);
555             C.notify_one();
556         });
557
558         let mut state = 0;
559         let day = 24 * 60 * 60;
560         let (_g, timed_out) = C.wait_timeout_with(g, Duration::new(day, 0), |_| {
561             assert_eq!(state, S.load(Ordering::SeqCst));
562             tx.send(()).unwrap();
563             state += 1;
564             match state {
565                 1|2 => false,
566                 _ => true,
567             }
568         }).unwrap();
569         assert!(!timed_out.timed_out());
570     }
571
572     #[test]
573     #[should_panic]
574     fn two_mutexes() {
575         static M1: StaticMutex = StaticMutex::new();
576         static M2: StaticMutex = StaticMutex::new();
577         static C: StaticCondvar = StaticCondvar::new();
578
579         let mut g = M1.lock().unwrap();
580         let _t = thread::spawn(move|| {
581             let _g = M1.lock().unwrap();
582             C.notify_one();
583         });
584         g = C.wait(g).unwrap();
585         drop(g);
586
587         let _ = C.wait(M2.lock().unwrap()).unwrap();
588     }
589 }