1 // Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Wrappers for safe, shared, mutable memory between tasks
13 //! The wrappers in this module build on the primitives from `sync::raw` to
14 //! provide safe interfaces around using the primitive locks. These primitives
15 //! implement a technique called "poisoning" where when a task failed with a
16 //! held lock, all future attempts to use the lock will fail.
18 //! For example, if two tasks are contending on a mutex and one of them fails
19 //! after grabbing the lock, the second task will immediately fail because the
20 //! lock is now poisoned.
25 use rustrt::local::Local;
26 use rustrt::task::Task;
30 /****************************************************************************
32 ****************************************************************************/
34 struct PoisonOnFail<'a> {
39 fn failing() -> bool {
40 Local::borrow(None::<Task>).unwinder.unwinding()
43 impl<'a> PoisonOnFail<'a> {
44 fn check(flag: bool, name: &str) {
46 fail!("Poisoned {} - another task failed inside!", name);
50 fn new<'a>(flag: &'a mut bool, name: &str) -> PoisonOnFail<'a> {
51 PoisonOnFail::check(*flag, name);
60 impl<'a> Drop for PoisonOnFail<'a> {
62 if !self.failed && failing() {
68 /****************************************************************************
70 ****************************************************************************/
73 InnerMutex(raw::MutexGuard<'a>),
74 InnerRWLock(raw::RWLockWriteGuard<'a>),
78 fn cond<'a>(&'a self) -> &'a raw::Condvar<'b> {
80 InnerMutex(ref m) => &m.cond,
81 InnerRWLock(ref m) => &m.cond,
86 /// A condition variable, a mechanism for unlock-and-descheduling and
87 /// signaling, for use with the lock types.
88 pub struct Condvar<'a> {
90 // n.b. Inner must be after PoisonOnFail because we must set the poison flag
91 // *inside* the mutex, and struct fields are destroyed top-to-bottom
92 // (destroy the lock guard last).
93 poison: PoisonOnFail<'a>,
97 impl<'a> Condvar<'a> {
98 /// Atomically exit the associated lock and block until a signal is sent.
100 /// wait() is equivalent to wait_on(0).
104 /// A task which is killed while waiting on a condition variable will wake
105 /// up, fail, and unlock the associated lock as it unwinds.
107 pub fn wait(&self) { self.wait_on(0) }
109 /// Atomically exit the associated lock and block on a specified condvar
110 /// until a signal is sent on that same condvar.
112 /// The associated lock must have been initialised with an appropriate
113 /// number of condvars. The condvar_id must be between 0 and num_condvars-1
114 /// or else this call will fail.
116 pub fn wait_on(&self, condvar_id: uint) {
117 assert!(!*self.poison.flag);
118 self.inner.cond().wait_on(condvar_id);
119 // This is why we need to wrap sync::condvar.
120 PoisonOnFail::check(*self.poison.flag, self.name);
123 /// Wake up a blocked task. Returns false if there was no blocked task.
125 pub fn signal(&self) -> bool { self.signal_on(0) }
127 /// Wake up a blocked task on a specified condvar (as
128 /// sync::cond.signal_on). Returns false if there was no blocked task.
130 pub fn signal_on(&self, condvar_id: uint) -> bool {
131 assert!(!*self.poison.flag);
132 self.inner.cond().signal_on(condvar_id)
135 /// Wake up all blocked tasks. Returns the number of tasks woken.
137 pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
139 /// Wake up all blocked tasks on a specified condvar (as
140 /// sync::cond.broadcast_on). Returns the number of tasks woken.
142 pub fn broadcast_on(&self, condvar_id: uint) -> uint {
143 assert!(!*self.poison.flag);
144 self.inner.cond().broadcast_on(condvar_id)
148 /****************************************************************************
150 ****************************************************************************/
152 /// A wrapper type which provides synchronized access to the underlying data, of
153 /// type `T`. A mutex always provides exclusive access, and concurrent requests
154 /// will block while the mutex is already locked.
159 /// use sync::{Mutex, Arc};
161 /// let mutex = Arc::new(Mutex::new(1));
162 /// let mutex2 = mutex.clone();
165 /// let mut val = mutex2.lock();
167 /// val.cond.signal();
170 /// let mut value = mutex.lock();
171 /// while *value != 2 {
172 /// value.cond.wait();
175 pub struct Mutex<T> {
177 failed: Unsafe<bool>,
181 /// An guard which is created by locking a mutex. Through this guard the
182 /// underlying data can be accessed.
183 pub struct MutexGuard<'a, T> {
184 // FIXME #12808: strange name to try to avoid interfering with
185 // field accesses of the contained type via Deref
187 /// Inner condition variable connected to the locked mutex that this guard
188 /// was created from. This can be used for atomic-unlock-and-deschedule.
189 pub cond: Condvar<'a>,
192 impl<T: Send> Mutex<T> {
193 /// Creates a new mutex to protect the user-supplied data.
194 pub fn new(user_data: T) -> Mutex<T> {
195 Mutex::new_with_condvars(user_data, 1)
198 /// Create a new mutex, with a specified number of associated condvars.
200 /// This will allow calling wait_on/signal_on/broadcast_on with condvar IDs
201 /// between 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be
202 /// allowed but any operations on the condvar will fail.)
203 pub fn new_with_condvars(user_data: T, num_condvars: uint) -> Mutex<T> {
205 lock: raw::Mutex::new_with_condvars(num_condvars),
206 failed: Unsafe::new(false),
207 data: Unsafe::new(user_data),
211 /// Access the underlying mutable data with mutual exclusion from other
212 /// tasks. The returned value is an RAII guard which will unlock the mutex
213 /// when dropped. All concurrent tasks attempting to lock the mutex will
214 /// block while the returned value is still alive.
218 /// Failing while inside the Mutex will unlock the Mutex while unwinding, so
219 /// that other tasks won't block forever. It will also poison the Mutex:
220 /// any tasks that subsequently try to access it (including those already
221 /// blocked on the mutex) will also fail immediately.
223 pub fn lock<'a>(&'a self) -> MutexGuard<'a, T> {
224 let guard = self.lock.lock();
226 // These two accesses are safe because we're guranteed at this point
227 // that we have exclusive access to this mutex. We are indeed able to
228 // promote ourselves from &Mutex to `&mut T`
229 let poison = unsafe { &mut *self.failed.get() };
230 let data = unsafe { &mut *self.data.get() };
236 poison: PoisonOnFail::new(poison, "Mutex"),
237 inner: InnerMutex(guard),
243 impl<'a, T: Send> Deref<T> for MutexGuard<'a, T> {
244 fn deref<'a>(&'a self) -> &'a T { &*self._data }
246 impl<'a, T: Send> DerefMut<T> for MutexGuard<'a, T> {
247 fn deref_mut<'a>(&'a mut self) -> &'a mut T { &mut *self._data }
250 /****************************************************************************
251 * R/W lock protected lock
252 ****************************************************************************/
254 /// A dual-mode reader-writer lock. The data can be accessed mutably or
255 /// immutably, and immutably-accessing tasks may run concurrently.
260 /// use sync::{RWLock, Arc};
262 /// let lock1 = Arc::new(RWLock::new(1));
263 /// let lock2 = lock1.clone();
266 /// let mut val = lock2.write();
268 /// let val = val.downgrade();
269 /// println!("{}", *val);
272 /// let val = lock1.read();
273 /// println!("{}", *val);
275 pub struct RWLock<T> {
277 failed: Unsafe<bool>,
281 /// A guard which is created by locking an rwlock in write mode. Through this
282 /// guard the underlying data can be accessed.
283 pub struct RWLockWriteGuard<'a, T> {
284 // FIXME #12808: strange name to try to avoid interfering with
285 // field accesses of the contained type via Deref
287 /// Inner condition variable that can be used to sleep on the write mode of
289 pub cond: Condvar<'a>,
292 /// A guard which is created by locking an rwlock in read mode. Through this
293 /// guard the underlying data can be accessed.
294 pub struct RWLockReadGuard<'a, T> {
295 // FIXME #12808: strange names to try to avoid interfering with
296 // field accesses of the contained type via Deref
298 _guard: raw::RWLockReadGuard<'a>,
301 impl<T: Send + Share> RWLock<T> {
302 /// Create a reader/writer lock with the supplied data.
303 pub fn new(user_data: T) -> RWLock<T> {
304 RWLock::new_with_condvars(user_data, 1)
307 /// Create a reader/writer lock with the supplied data and a specified number
308 /// of condvars (as sync::RWLock::new_with_condvars).
309 pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWLock<T> {
311 lock: raw::RWLock::new_with_condvars(num_condvars),
312 failed: Unsafe::new(false),
313 data: Unsafe::new(user_data),
317 /// Access the underlying data mutably. Locks the rwlock in write mode;
318 /// other readers and writers will block.
322 /// Failing while inside the lock will unlock the lock while unwinding, so
323 /// that other tasks won't block forever. As Mutex.lock, it will also poison
324 /// the lock, so subsequent readers and writers will both also fail.
326 pub fn write<'a>(&'a self) -> RWLockWriteGuard<'a, T> {
327 let guard = self.lock.write();
329 // These two accesses are safe because we're guranteed at this point
330 // that we have exclusive access to this rwlock. We are indeed able to
331 // promote ourselves from &RWLock to `&mut T`
332 let poison = unsafe { &mut *self.failed.get() };
333 let data = unsafe { &mut *self.data.get() };
339 poison: PoisonOnFail::new(poison, "RWLock"),
340 inner: InnerRWLock(guard),
345 /// Access the underlying data immutably. May run concurrently with other
350 /// Failing will unlock the lock while unwinding. However, unlike all other
351 /// access modes, this will not poison the lock.
352 pub fn read<'a>(&'a self) -> RWLockReadGuard<'a, T> {
353 let guard = self.lock.read();
354 PoisonOnFail::check(unsafe { *self.failed.get() }, "RWLock");
357 _data: unsafe { &*self.data.get() },
362 impl<'a, T: Send + Share> RWLockWriteGuard<'a, T> {
363 /// Consumes this write lock token, returning a new read lock token.
365 /// This will allow pending readers to come into the lock.
366 pub fn downgrade(self) -> RWLockReadGuard<'a, T> {
367 let RWLockWriteGuard { _data, cond } = self;
368 // convert the data to read-only explicitly
370 let guard = match cond.inner {
371 InnerMutex(..) => unreachable!(),
372 InnerRWLock(guard) => guard.downgrade()
374 RWLockReadGuard { _guard: guard, _data: data }
378 impl<'a, T: Send + Share> Deref<T> for RWLockReadGuard<'a, T> {
379 fn deref<'a>(&'a self) -> &'a T { self._data }
381 impl<'a, T: Send + Share> Deref<T> for RWLockWriteGuard<'a, T> {
382 fn deref<'a>(&'a self) -> &'a T { &*self._data }
384 impl<'a, T: Send + Share> DerefMut<T> for RWLockWriteGuard<'a, T> {
385 fn deref_mut<'a>(&'a mut self) -> &'a mut T { &mut *self._data }
388 /****************************************************************************
390 ****************************************************************************/
392 /// A barrier enables multiple tasks to synchronize the beginning
393 /// of some computation.
396 /// use sync::{Arc, Barrier};
398 /// let barrier = Arc::new(Barrier::new(10));
399 /// for _ in range(0, 10) {
400 /// let c = barrier.clone();
401 /// // The same messages will be printed together.
402 /// // You will NOT see any interleaving.
404 /// println!("before wait");
406 /// println!("after wait");
411 lock: Mutex<BarrierState>,
415 // The inner state of a double barrier
416 struct BarrierState {
422 /// Create a new barrier that can block a given number of tasks.
423 pub fn new(num_tasks: uint) -> Barrier {
425 lock: Mutex::new(BarrierState {
429 num_tasks: num_tasks,
433 /// Block the current task until a certain number of tasks is waiting.
435 let mut lock = self.lock.lock();
436 let local_gen = lock.generation_id;
438 if lock.count < self.num_tasks {
439 // We need a while loop to guard against spurious wakeups.
440 // http://en.wikipedia.org/wiki/Spurious_wakeup
441 while local_gen == lock.generation_id &&
442 lock.count < self.num_tasks {
447 lock.generation_id += 1;
448 lock.cond.broadcast();
453 /****************************************************************************
455 ****************************************************************************/
460 use std::comm::Empty;
462 use std::task::try_future;
465 use super::{Mutex, Barrier, RWLock};
468 fn test_mutex_arc_condvar() {
469 let arc = Arc::new(Mutex::new(false));
470 let arc2 = arc.clone();
471 let (tx, rx) = channel();
473 // wait until parent gets in
475 let mut lock = arc2.lock();
480 let lock = arc.lock();
488 #[test] #[should_fail]
489 fn test_arc_condvar_poison() {
490 let arc = Arc::new(Mutex::new(1));
491 let arc2 = arc.clone();
492 let (tx, rx) = channel();
496 let lock = arc2.lock();
498 // Parent should fail when it wakes up.
502 let lock = arc.lock();
509 #[test] #[should_fail]
510 fn test_mutex_arc_poison() {
511 let arc = Arc::new(Mutex::new(1));
512 let arc2 = arc.clone();
513 let _ = task::try(proc() {
514 let lock = arc2.lock();
515 assert_eq!(*lock, 2);
517 let lock = arc.lock();
518 assert_eq!(*lock, 1);
522 fn test_mutex_arc_nested() {
523 // Tests nested mutexes and access
524 // to underlying data.
525 let arc = Arc::new(Mutex::new(1));
526 let arc2 = Arc::new(Mutex::new(arc));
528 let lock = arc2.lock();
529 let lock2 = lock.deref().lock();
530 assert_eq!(*lock2, 1);
535 fn test_mutex_arc_access_in_unwind() {
536 let arc = Arc::new(Mutex::new(1i));
537 let arc2 = arc.clone();
538 let _ = task::try::<()>(proc() {
542 impl Drop for Unwinder {
544 let mut lock = self.i.lock();
548 let _u = Unwinder { i: arc2 };
551 let lock = arc.lock();
552 assert_eq!(*lock, 2);
555 #[test] #[should_fail]
556 fn test_rw_arc_poison_wr() {
557 let arc = Arc::new(RWLock::new(1));
558 let arc2 = arc.clone();
559 let _ = task::try(proc() {
560 let lock = arc2.write();
561 assert_eq!(*lock, 2);
563 let lock = arc.read();
564 assert_eq!(*lock, 1);
566 #[test] #[should_fail]
567 fn test_rw_arc_poison_ww() {
568 let arc = Arc::new(RWLock::new(1));
569 let arc2 = arc.clone();
570 let _ = task::try(proc() {
571 let lock = arc2.write();
572 assert_eq!(*lock, 2);
574 let lock = arc.write();
575 assert_eq!(*lock, 1);
578 fn test_rw_arc_no_poison_rr() {
579 let arc = Arc::new(RWLock::new(1));
580 let arc2 = arc.clone();
581 let _ = task::try(proc() {
582 let lock = arc2.read();
583 assert_eq!(*lock, 2);
585 let lock = arc.read();
586 assert_eq!(*lock, 1);
589 fn test_rw_arc_no_poison_rw() {
590 let arc = Arc::new(RWLock::new(1));
591 let arc2 = arc.clone();
592 let _ = task::try(proc() {
593 let lock = arc2.read();
594 assert_eq!(*lock, 2);
596 let lock = arc.write();
597 assert_eq!(*lock, 1);
600 fn test_rw_arc_no_poison_dr() {
601 let arc = Arc::new(RWLock::new(1));
602 let arc2 = arc.clone();
603 let _ = task::try(proc() {
604 let lock = arc2.write().downgrade();
605 assert_eq!(*lock, 2);
607 let lock = arc.write();
608 assert_eq!(*lock, 1);
613 let arc = Arc::new(RWLock::new(0));
614 let arc2 = arc.clone();
615 let (tx, rx) = channel();
618 let mut lock = arc2.write();
619 for _ in range(0, 10) {
628 // Readers try to catch the writer in the act
629 let mut children = Vec::new();
630 for _ in range(0, 5) {
631 let arc3 = arc.clone();
632 children.push(try_future(proc() {
633 let lock = arc3.read();
638 // Wait for children to pass their asserts
639 for r in children.mut_iter() {
640 assert!(r.get_ref().is_ok());
643 // Wait for writer to finish
645 let lock = arc.read();
646 assert_eq!(*lock, 10);
650 fn test_rw_arc_access_in_unwind() {
651 let arc = Arc::new(RWLock::new(1i));
652 let arc2 = arc.clone();
653 let _ = task::try::<()>(proc() {
657 impl Drop for Unwinder {
659 let mut lock = self.i.write();
663 let _u = Unwinder { i: arc2 };
666 let lock = arc.read();
667 assert_eq!(*lock, 2);
671 fn test_rw_downgrade() {
672 // (1) A downgrader gets in write mode and does cond.wait.
673 // (2) A writer gets in write mode, sets state to 42, and does signal.
674 // (3) Downgrader wakes, sets state to 31337.
675 // (4) tells writer and all other readers to contend as it downgrades.
676 // (5) Writer attempts to set state back to 42, while downgraded task
677 // and all reader tasks assert that it's 31337.
678 let arc = Arc::new(RWLock::new(0));
681 let mut reader_convos = Vec::new();
682 for _ in range(0, 10) {
683 let ((tx1, rx1), (tx2, rx2)) = (channel(), channel());
684 reader_convos.push((tx1, rx2));
685 let arcn = arc.clone();
687 rx1.recv(); // wait for downgrader to give go-ahead
688 let lock = arcn.read();
689 assert_eq!(*lock, 31337);
695 let arc2 = arc.clone();
696 let ((tx1, rx1), (tx2, rx2)) = (channel(), channel());
700 let mut lock = arc2.write();
701 assert_eq!(*lock, 0);
707 let mut lock = arc2.write();
708 // This shouldn't happen until after the downgrade read
709 // section, and all other readers, finish.
710 assert_eq!(*lock, 31337);
717 let mut lock = arc.write();
718 tx1.send(()); // send to another writer who will wake us up
722 assert_eq!(*lock, 42);
724 // send to other readers
725 for &(ref mut rc, _) in reader_convos.mut_iter() {
728 let lock = lock.downgrade();
729 // complete handshake with other readers
730 for &(_, ref mut rp) in reader_convos.mut_iter() {
733 tx1.send(()); // tell writer to try again
734 assert_eq!(*lock, 31337);
737 rx2.recv(); // complete handshake with writer
741 fn test_rw_write_cond_downgrade_read_race_helper() {
742 // Tests that when a downgrader hands off the "reader cloud" lock
743 // because of a contending reader, a writer can't race to get it
744 // instead, which would result in readers_and_writers. This tests
745 // the raw module rather than this one, but it's here because an
746 // rwarc gives us extra shared state to help check for the race.
747 let x = Arc::new(RWLock::new(true));
748 let (tx, rx) = channel();
753 let mut lock = xw.write();
754 tx.send(()); // tell downgrader it's ok to go
756 // The core of the test is here: the condvar reacquire path
757 // must involve order_lock, so that it cannot race with a reader
758 // trying to receive the "reader cloud lock hand-off".
762 rx.recv(); // wait for writer to get in
764 let lock = x.write();
766 // make writer contend in the cond-reacquire path
768 // make a reader task to trigger the "reader cloud lock" handoff
770 let (tx, rx) = channel();
775 rx.recv(); // wait for reader task to exist
777 let lock = lock.downgrade();
778 // if writer mistakenly got in, make sure it mutates state
779 // before we assert on it
780 for _ in range(0, 5) { task::deschedule(); }
781 // make sure writer didn't get in.
785 fn test_rw_write_cond_downgrade_read_race() {
786 // Ideally the above test case would have deschedule statements in it
787 // that helped to expose the race nearly 100% of the time... but adding
788 // deschedules in the intuitively-right locations made it even less
789 // likely, and I wasn't sure why :( . This is a mediocre "next best"
791 for _ in range(0, 8) {
792 test_rw_write_cond_downgrade_read_race_helper();
796 /************************************************************************
798 ************************************************************************/
801 let barrier = Arc::new(Barrier::new(10));
802 let (tx, rx) = channel();
804 for _ in range(0, 9) {
805 let c = barrier.clone();
813 // At this point, all spawned tasks should be blocked,
814 // so we shouldn't get anything from the port
815 assert!(match rx.try_recv() {
821 // Now, the barrier is cleared and we should get data.
822 for _ in range(0, 9) {