]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/condvar.rs
0ff3a6907026ed34cb5a699dbc986ccc46187f9f
[rust.git] / src / libstd / sync / condvar.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 use prelude::v1::*;
12
13 use sync::atomic::{AtomicUsize, Ordering};
14 use sync::{mutex, MutexGuard, PoisonError};
15 use sys_common::condvar as sys;
16 use sys_common::mutex as sys_mutex;
17 use sys_common::poison::{self, LockResult};
18 use time::{Instant, Duration};
19
20 /// A type indicating whether a timed wait on a condition variable returned
21 /// due to a time out or not.
22 #[derive(Debug, PartialEq, Eq, Copy, Clone)]
23 #[stable(feature = "wait_timeout", since = "1.5.0")]
24 pub struct WaitTimeoutResult(bool);
25
26 impl WaitTimeoutResult {
27     /// Returns whether the wait was known to have timed out.
28     #[stable(feature = "wait_timeout", since = "1.5.0")]
29     pub fn timed_out(&self) -> bool {
30         self.0
31     }
32 }
33
34 /// A Condition Variable
35 ///
36 /// Condition variables represent the ability to block a thread such that it
37 /// consumes no CPU time while waiting for an event to occur. Condition
38 /// variables are typically associated with a boolean predicate (a condition)
39 /// and a mutex. The predicate is always verified inside of the mutex before
40 /// determining that thread must block.
41 ///
42 /// Functions in this module will block the current **thread** of execution and
43 /// are bindings to system-provided condition variables where possible. Note
44 /// that this module places one additional restriction over the system condition
45 /// variables: each condvar can be used with precisely one mutex at runtime. Any
46 /// attempt to use multiple mutexes on the same condition variable will result
47 /// in a runtime panic. If this is not desired, then the unsafe primitives in
48 /// `sys` do not have this restriction but may result in undefined behavior.
49 ///
50 /// # Examples
51 ///
52 /// ```
53 /// use std::sync::{Arc, Mutex, Condvar};
54 /// use std::thread;
55 ///
56 /// let pair = Arc::new((Mutex::new(false), Condvar::new()));
57 /// let pair2 = pair.clone();
58 ///
59 /// // Inside of our lock, spawn a new thread, and then wait for it to start
60 /// thread::spawn(move|| {
61 ///     let &(ref lock, ref cvar) = &*pair2;
62 ///     let mut started = lock.lock().unwrap();
63 ///     *started = true;
64 ///     cvar.notify_one();
65 /// });
66 ///
67 /// // wait for the thread to start up
68 /// let &(ref lock, ref cvar) = &*pair;
69 /// let mut started = lock.lock().unwrap();
70 /// while !*started {
71 ///     started = cvar.wait(started).unwrap();
72 /// }
73 /// ```
74 #[stable(feature = "rust1", since = "1.0.0")]
75 pub struct Condvar { inner: Box<StaticCondvar> }
76
77 /// Statically allocated condition variables.
78 ///
79 /// This structure is identical to `Condvar` except that it is suitable for use
80 /// in static initializers for other structures.
81 ///
82 /// # Examples
83 ///
84 /// ```
85 /// #![feature(static_condvar)]
86 ///
87 /// use std::sync::{StaticCondvar, CONDVAR_INIT};
88 ///
89 /// static CVAR: StaticCondvar = CONDVAR_INIT;
90 /// ```
91 #[unstable(feature = "static_condvar",
92            reason = "may be merged with Condvar in the future",
93            issue = "27717")]
94 pub struct StaticCondvar {
95     inner: sys::Condvar,
96     mutex: AtomicUsize,
97 }
98
99 /// Constant initializer for a statically allocated condition variable.
100 #[unstable(feature = "static_condvar",
101            reason = "may be merged with Condvar in the future",
102            issue = "27717")]
103 pub const CONDVAR_INIT: StaticCondvar = StaticCondvar::new();
104
105 impl Condvar {
106     /// Creates a new condition variable which is ready to be waited on and
107     /// notified.
108     #[stable(feature = "rust1", since = "1.0.0")]
109     pub fn new() -> Condvar {
110         Condvar {
111             inner: box StaticCondvar {
112                 inner: sys::Condvar::new(),
113                 mutex: AtomicUsize::new(0),
114             }
115         }
116     }
117
118     /// Blocks the current thread until this condition variable receives a
119     /// notification.
120     ///
121     /// This function will atomically unlock the mutex specified (represented by
122     /// `mutex_guard`) and block the current thread. This means that any calls
123     /// to `notify_*()` which happen logically after the mutex is unlocked are
124     /// candidates to wake this thread up. When this function call returns, the
125     /// lock specified will have been re-acquired.
126     ///
127     /// Note that this function is susceptible to spurious wakeups. Condition
128     /// variables normally have a boolean predicate associated with them, and
129     /// the predicate must always be checked each time this function returns to
130     /// protect against spurious wakeups.
131     ///
132     /// # Errors
133     ///
134     /// This function will return an error if the mutex being waited on is
135     /// poisoned when this thread re-acquires the lock. For more information,
136     /// see information about poisoning on the Mutex type.
137     ///
138     /// # Panics
139     ///
140     /// This function will `panic!()` if it is used with more than one mutex
141     /// over time. Each condition variable is dynamically bound to exactly one
142     /// mutex to ensure defined behavior across platforms. If this functionality
143     /// is not desired, then unsafe primitives in `sys` are provided.
144     #[stable(feature = "rust1", since = "1.0.0")]
145     pub fn wait<'a, T>(&self, guard: MutexGuard<'a, T>)
146                        -> LockResult<MutexGuard<'a, T>> {
147         unsafe {
148             let me: &'static Condvar = &*(self as *const _);
149             me.inner.wait(guard)
150         }
151     }
152
153     /// Waits on this condition variable for a notification, timing out after a
154     /// specified duration.
155     ///
156     /// The semantics of this function are equivalent to `wait()`
157     /// except that the thread will be blocked for roughly no longer
158     /// than `ms` milliseconds. This method should not be used for
159     /// precise timing due to anomalies such as preemption or platform
160     /// differences that may not cause the maximum amount of time
161     /// waited to be precisely `ms`.
162     ///
163     /// The returned boolean is `false` only if the timeout is known
164     /// to have elapsed.
165     ///
166     /// Like `wait`, the lock specified will be re-acquired when this function
167     /// returns, regardless of whether the timeout elapsed or not.
168     #[stable(feature = "rust1", since = "1.0.0")]
169     #[rustc_deprecated(since = "1.6.0", reason = "replaced by `std::sync::Condvar::wait_timeout`")]
170     #[allow(deprecated)]
171     pub fn wait_timeout_ms<'a, T>(&self, guard: MutexGuard<'a, T>, ms: u32)
172                                   -> LockResult<(MutexGuard<'a, T>, bool)> {
173         unsafe {
174             let me: &'static Condvar = &*(self as *const _);
175             me.inner.wait_timeout_ms(guard, ms)
176         }
177     }
178
179     /// Waits on this condition variable for a notification, timing out after a
180     /// specified duration.
181     ///
182     /// The semantics of this function are equivalent to `wait()` except that
183     /// the thread will be blocked for roughly no longer than `dur`. This
184     /// method should not be used for precise timing due to anomalies such as
185     /// preemption or platform differences that may not cause the maximum
186     /// amount of time waited to be precisely `dur`.
187     ///
188     /// The returned `WaitTimeoutResult` value indicates if the timeout is
189     /// known to have elapsed.
190     ///
191     /// Like `wait`, the lock specified will be re-acquired when this function
192     /// returns, regardless of whether the timeout elapsed or not.
193     #[stable(feature = "wait_timeout", since = "1.5.0")]
194     pub fn wait_timeout<'a, T>(&self, guard: MutexGuard<'a, T>,
195                                dur: Duration)
196                                -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> {
197         unsafe {
198             let me: &'static Condvar = &*(self as *const _);
199             me.inner.wait_timeout(guard, dur)
200         }
201     }
202
203     /// Waits on this condition variable for a notification, timing out after a
204     /// specified duration.
205     ///
206     /// The semantics of this function are equivalent to `wait_timeout` except
207     /// that the implementation will repeatedly wait while the duration has not
208     /// passed and the provided function returns `false`.
209     #[unstable(feature = "wait_timeout_with",
210                reason = "unsure if this API is broadly needed or what form it should take",
211                issue = "27748")]
212     #[rustc_deprecated(since = "1.8.0",
213                        reason = "wonky signature and questionable \
214                                  implementation didn't justify existence")]
215     pub fn wait_timeout_with<'a, T, F>(&self,
216                                        guard: MutexGuard<'a, T>,
217                                        dur: Duration,
218                                        f: F)
219                                        -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)>
220             where F: FnMut(LockResult<&mut T>) -> bool {
221         unsafe {
222             let me: &'static Condvar = &*(self as *const _);
223             me.inner.wait_timeout_with(guard, dur, f)
224         }
225     }
226
227     /// Wakes up one blocked thread on this condvar.
228     ///
229     /// If there is a blocked thread on this condition variable, then it will
230     /// be woken up from its call to `wait` or `wait_timeout`. Calls to
231     /// `notify_one` are not buffered in any way.
232     ///
233     /// To wake up all threads, see `notify_all()`.
234     #[stable(feature = "rust1", since = "1.0.0")]
235     pub fn notify_one(&self) { unsafe { self.inner.inner.notify_one() } }
236
237     /// Wakes up all blocked threads on this condvar.
238     ///
239     /// This method will ensure that any current waiters on the condition
240     /// variable are awoken. Calls to `notify_all()` are not buffered in any
241     /// way.
242     ///
243     /// To wake up only one thread, see `notify_one()`.
244     #[stable(feature = "rust1", since = "1.0.0")]
245     pub fn notify_all(&self) { unsafe { self.inner.inner.notify_all() } }
246 }
247
248 #[stable(feature = "rust1", since = "1.0.0")]
249 impl Drop for Condvar {
250     fn drop(&mut self) {
251         unsafe { self.inner.inner.destroy() }
252     }
253 }
254
255 impl StaticCondvar {
256     /// Creates a new condition variable
257     #[unstable(feature = "static_condvar",
258                reason = "may be merged with Condvar in the future",
259                issue = "27717")]
260     pub const fn new() -> StaticCondvar {
261         StaticCondvar {
262             inner: sys::Condvar::new(),
263             mutex: AtomicUsize::new(0),
264         }
265     }
266
267     /// Blocks the current thread until this condition variable receives a
268     /// notification.
269     ///
270     /// See `Condvar::wait`.
271     #[unstable(feature = "static_condvar",
272                reason = "may be merged with Condvar in the future",
273                issue = "27717")]
274     pub fn wait<'a, T>(&'static self, guard: MutexGuard<'a, T>)
275                        -> LockResult<MutexGuard<'a, T>> {
276         let poisoned = unsafe {
277             let lock = mutex::guard_lock(&guard);
278             self.verify(lock);
279             self.inner.wait(lock);
280             mutex::guard_poison(&guard).get()
281         };
282         if poisoned {
283             Err(PoisonError::new(guard))
284         } else {
285             Ok(guard)
286         }
287     }
288
289     /// Waits on this condition variable for a notification, timing out after a
290     /// specified duration.
291     ///
292     /// See `Condvar::wait_timeout`.
293     #[unstable(feature = "static_condvar",
294                reason = "may be merged with Condvar in the future",
295                issue = "27717")]
296     #[rustc_deprecated(since = "1.6.0",
297                        reason = "replaced by `std::sync::StaticCondvar::wait_timeout`")]
298     pub fn wait_timeout_ms<'a, T>(&'static self, guard: MutexGuard<'a, T>, ms: u32)
299                                   -> LockResult<(MutexGuard<'a, T>, bool)> {
300         match self.wait_timeout(guard, Duration::from_millis(ms as u64)) {
301             Ok((guard, timed_out)) => Ok((guard, !timed_out.timed_out())),
302             Err(poison) => {
303                 let (guard, timed_out) = poison.into_inner();
304                 Err(PoisonError::new((guard, !timed_out.timed_out())))
305             }
306         }
307     }
308
309     /// Waits on this condition variable for a notification, timing out after a
310     /// specified duration.
311     ///
312     /// See `Condvar::wait_timeout`.
313     #[unstable(feature = "static_condvar",
314                reason = "may be merged with Condvar in the future",
315                issue = "27717")]
316     pub fn wait_timeout<'a, T>(&'static self,
317                                guard: MutexGuard<'a, T>,
318                                timeout: Duration)
319                                -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)> {
320         let (poisoned, result) = unsafe {
321             let lock = mutex::guard_lock(&guard);
322             self.verify(lock);
323             let success = self.inner.wait_timeout(lock, timeout);
324             (mutex::guard_poison(&guard).get(), WaitTimeoutResult(!success))
325         };
326         if poisoned {
327             Err(PoisonError::new((guard, result)))
328         } else {
329             Ok((guard, result))
330         }
331     }
332
333     /// Waits on this condition variable for a notification, timing out after a
334     /// specified duration.
335     ///
336     /// The implementation will repeatedly wait while the duration has not
337     /// passed and the function returns `false`.
338     ///
339     /// See `Condvar::wait_timeout_with`.
340     #[unstable(feature = "static_condvar",
341                reason = "may be merged with Condvar in the future",
342                issue = "27717")]
343     pub fn wait_timeout_with<'a, T, F>(&'static self,
344                                        guard: MutexGuard<'a, T>,
345                                        dur: Duration,
346                                        mut f: F)
347                                        -> LockResult<(MutexGuard<'a, T>, WaitTimeoutResult)>
348             where F: FnMut(LockResult<&mut T>) -> bool {
349         // This could be made more efficient by pushing the implementation into
350         // sys::condvar
351         let start = Instant::now();
352         let mut guard_result: LockResult<MutexGuard<'a, T>> = Ok(guard);
353         while !f(guard_result
354                     .as_mut()
355                     .map(|g| &mut **g)
356                     .map_err(|e| PoisonError::new(&mut **e.get_mut()))) {
357             let consumed = start.elapsed();
358             let guard = guard_result.unwrap_or_else(|e| e.into_inner());
359             let (new_guard_result, timed_out) = if consumed > dur {
360                 (Ok(guard), WaitTimeoutResult(true))
361             } else {
362                 match self.wait_timeout(guard, dur - consumed) {
363                     Ok((new_guard, timed_out)) => (Ok(new_guard), timed_out),
364                     Err(err) => {
365                         let (new_guard, no_timeout) = err.into_inner();
366                         (Err(PoisonError::new(new_guard)), no_timeout)
367                     }
368                 }
369             };
370             guard_result = new_guard_result;
371             if timed_out.timed_out() {
372                 let result = f(guard_result
373                                     .as_mut()
374                                     .map(|g| &mut **g)
375                                     .map_err(|e| PoisonError::new(&mut **e.get_mut())));
376                 let result = WaitTimeoutResult(!result);
377                 return poison::map_result(guard_result, |g| (g, result));
378             }
379         }
380
381         poison::map_result(guard_result, |g| (g, WaitTimeoutResult(false)))
382     }
383
384     /// Wakes up one blocked thread on this condvar.
385     ///
386     /// See `Condvar::notify_one`.
387     #[unstable(feature = "static_condvar",
388                reason = "may be merged with Condvar in the future",
389                issue = "27717")]
390     pub fn notify_one(&'static self) { unsafe { self.inner.notify_one() } }
391
392     /// Wakes up all blocked threads on this condvar.
393     ///
394     /// See `Condvar::notify_all`.
395     #[unstable(feature = "static_condvar",
396                reason = "may be merged with Condvar in the future",
397                issue = "27717")]
398     pub fn notify_all(&'static self) { unsafe { self.inner.notify_all() } }
399
400     /// Deallocates all resources associated with this static condvar.
401     ///
402     /// This method is unsafe to call as there is no guarantee that there are no
403     /// active users of the condvar, and this also doesn't prevent any future
404     /// users of the condvar. This method is required to be called to not leak
405     /// memory on all platforms.
406     #[unstable(feature = "static_condvar",
407                reason = "may be merged with Condvar in the future",
408                issue = "27717")]
409     pub unsafe fn destroy(&'static self) {
410         self.inner.destroy()
411     }
412
413     fn verify(&self, mutex: &sys_mutex::Mutex) {
414         let addr = mutex as *const _ as usize;
415         match self.mutex.compare_and_swap(0, addr, Ordering::SeqCst) {
416             // If we got out 0, then we have successfully bound the mutex to
417             // this cvar.
418             0 => {}
419
420             // If we get out a value that's the same as `addr`, then someone
421             // already beat us to the punch.
422             n if n == addr => {}
423
424             // Anything else and we're using more than one mutex on this cvar,
425             // which is currently disallowed.
426             _ => panic!("attempted to use a condition variable with two \
427                          mutexes"),
428         }
429     }
430 }
431
432 #[cfg(test)]
433 mod tests {
434     use prelude::v1::*;
435
436     use super::StaticCondvar;
437     use sync::mpsc::channel;
438     use sync::{StaticMutex, Condvar, Mutex, Arc};
439     use sync::atomic::{AtomicUsize, Ordering};
440     use thread;
441     use time::Duration;
442     use u32;
443
444     #[test]
445     fn smoke() {
446         let c = Condvar::new();
447         c.notify_one();
448         c.notify_all();
449     }
450
451     #[test]
452     fn static_smoke() {
453         static C: StaticCondvar = StaticCondvar::new();
454         C.notify_one();
455         C.notify_all();
456         unsafe { C.destroy(); }
457     }
458
459     #[test]
460     fn notify_one() {
461         static C: StaticCondvar = StaticCondvar::new();
462         static M: StaticMutex = StaticMutex::new();
463
464         let g = M.lock().unwrap();
465         let _t = thread::spawn(move|| {
466             let _g = M.lock().unwrap();
467             C.notify_one();
468         });
469         let g = C.wait(g).unwrap();
470         drop(g);
471         unsafe { C.destroy(); M.destroy(); }
472     }
473
474     #[test]
475     fn notify_all() {
476         const N: usize = 10;
477
478         let data = Arc::new((Mutex::new(0), Condvar::new()));
479         let (tx, rx) = channel();
480         for _ in 0..N {
481             let data = data.clone();
482             let tx = tx.clone();
483             thread::spawn(move|| {
484                 let &(ref lock, ref cond) = &*data;
485                 let mut cnt = lock.lock().unwrap();
486                 *cnt += 1;
487                 if *cnt == N {
488                     tx.send(()).unwrap();
489                 }
490                 while *cnt != 0 {
491                     cnt = cond.wait(cnt).unwrap();
492                 }
493                 tx.send(()).unwrap();
494             });
495         }
496         drop(tx);
497
498         let &(ref lock, ref cond) = &*data;
499         rx.recv().unwrap();
500         let mut cnt = lock.lock().unwrap();
501         *cnt = 0;
502         cond.notify_all();
503         drop(cnt);
504
505         for _ in 0..N {
506             rx.recv().unwrap();
507         }
508     }
509
510     #[test]
511     fn wait_timeout_ms() {
512         static C: StaticCondvar = StaticCondvar::new();
513         static M: StaticMutex = StaticMutex::new();
514
515         let g = M.lock().unwrap();
516         let (g, _no_timeout) = C.wait_timeout(g, Duration::from_millis(1)).unwrap();
517         // spurious wakeups mean this isn't necessarily true
518         // assert!(!no_timeout);
519         let _t = thread::spawn(move || {
520             let _g = M.lock().unwrap();
521             C.notify_one();
522         });
523         let (g, timeout_res) = C.wait_timeout(g, Duration::from_millis(u32::MAX as u64)).unwrap();
524         assert!(!timeout_res.timed_out());
525         drop(g);
526         unsafe { C.destroy(); M.destroy(); }
527     }
528
529     #[test]
530     fn wait_timeout_with() {
531         static C: StaticCondvar = StaticCondvar::new();
532         static M: StaticMutex = StaticMutex::new();
533         static S: AtomicUsize = AtomicUsize::new(0);
534
535         let g = M.lock().unwrap();
536         let (g, timed_out) = C.wait_timeout_with(g, Duration::new(0, 1000), |_| {
537             false
538         }).unwrap();
539         assert!(timed_out.timed_out());
540
541         let (tx, rx) = channel();
542         let _t = thread::spawn(move || {
543             rx.recv().unwrap();
544             let g = M.lock().unwrap();
545             S.store(1, Ordering::SeqCst);
546             C.notify_one();
547             drop(g);
548
549             rx.recv().unwrap();
550             let g = M.lock().unwrap();
551             S.store(2, Ordering::SeqCst);
552             C.notify_one();
553             drop(g);
554
555             rx.recv().unwrap();
556             let _g = M.lock().unwrap();
557             S.store(3, Ordering::SeqCst);
558             C.notify_one();
559         });
560
561         let mut state = 0;
562         let day = 24 * 60 * 60;
563         let (_g, timed_out) = C.wait_timeout_with(g, Duration::new(day, 0), |_| {
564             assert_eq!(state, S.load(Ordering::SeqCst));
565             tx.send(()).unwrap();
566             state += 1;
567             match state {
568                 1|2 => false,
569                 _ => true,
570             }
571         }).unwrap();
572         assert!(!timed_out.timed_out());
573     }
574
575     #[test]
576     #[should_panic]
577     fn two_mutexes() {
578         static M1: StaticMutex = StaticMutex::new();
579         static M2: StaticMutex = StaticMutex::new();
580         static C: StaticCondvar = StaticCondvar::new();
581
582         let mut g = M1.lock().unwrap();
583         let _t = thread::spawn(move|| {
584             let _g = M1.lock().unwrap();
585             C.notify_one();
586         });
587         g = C.wait(g).unwrap();
588         drop(g);
589
590         let _ = C.wait(M2.lock().unwrap()).unwrap();
591     }
592 }