1 // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 * Concurrency-enabled mechanisms for sharing mutable and/or immutable state
17 * In this example, a large vector of floats is shared between several tasks.
18 * With simple pipes, without ARC, a copy would have to be made for each task.
23 * let numbers=vec::from_fn(100, |ind| (ind as float)*rand::random());
24 * let shared_numbers=arc::ARC(numbers);
27 * let (port, chan) = stream();
28 * chan.send(shared_numbers.clone());
31 * let shared_numbers=port.recv();
32 * let local_numbers=shared_numbers.get();
34 * // Work with the local numbers
40 #[allow(missing_doc)];
45 use sync::{Mutex, mutex_with_condvars, RWlock, rwlock_with_condvars};
48 use core::unstable::sync::UnsafeAtomicRcBox;
52 /// As sync::condvar, a mechanism for unlock-and-descheduling and signaling.
53 pub struct Condvar<'self> {
55 failed: &'self mut bool,
56 cond: &'self sync::Condvar<'self>
59 impl<'self> Condvar<'self> {
60 /// Atomically exit the associated ARC and block until a signal is sent.
62 pub fn wait(&self) { self.wait_on(0) }
65 * Atomically exit the associated ARC and block on a specified condvar
66 * until a signal is sent on that same condvar (as sync::cond.wait_on).
68 * wait() is equivalent to wait_on(0).
71 pub fn wait_on(&self, condvar_id: uint) {
72 assert!(!*self.failed);
73 self.cond.wait_on(condvar_id);
74 // This is why we need to wrap sync::condvar.
75 check_poison(self.is_mutex, *self.failed);
78 /// Wake up a blocked task. Returns false if there was no blocked task.
80 pub fn signal(&self) -> bool { self.signal_on(0) }
83 * Wake up a blocked task on a specified condvar (as
84 * sync::cond.signal_on). Returns false if there was no blocked task.
87 pub fn signal_on(&self, condvar_id: uint) -> bool {
88 assert!(!*self.failed);
89 self.cond.signal_on(condvar_id)
92 /// Wake up all blocked tasks. Returns the number of tasks woken.
94 pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
97 * Wake up all blocked tasks on a specified condvar (as
98 * sync::cond.broadcast_on). Returns the number of tasks woken.
101 pub fn broadcast_on(&self, condvar_id: uint) -> uint {
102 assert!(!*self.failed);
103 self.cond.broadcast_on(condvar_id)
107 /****************************************************************************
109 ****************************************************************************/
111 /// An atomically reference counted wrapper for shared immutable state.
112 pub struct ARC<T> { x: UnsafeAtomicRcBox<T> }
114 /// Create an atomically reference counted wrapper.
115 pub fn ARC<T:Const + Owned>(data: T) -> ARC<T> {
116 ARC { x: UnsafeAtomicRcBox::new(data) }
120 * Access the underlying data in an atomically reference counted
123 impl<T:Const+Owned> ARC<T> {
124 pub fn get<'a>(&'a self) -> &'a T {
125 unsafe { &*self.x.get_immut() }
130 * Duplicate an atomically reference counted wrapper.
132 * The resulting two `arc` objects will point to the same underlying data
133 * object. However, one of the `arc` objects can be sent to another task,
134 * allowing them to share the underlying data.
136 impl<T:Const + Owned> Clone for ARC<T> {
137 fn clone(&self) -> ARC<T> {
138 ARC { x: self.x.clone() }
142 /****************************************************************************
143 * Mutex protected ARC (unsafe)
144 ****************************************************************************/
147 struct MutexARCInner<T> { lock: Mutex, failed: bool, data: T }
148 /// An ARC with mutable data protected by a blocking mutex.
149 struct MutexARC<T> { x: UnsafeAtomicRcBox<MutexARCInner<T>> }
151 /// Create a mutex-protected ARC with the supplied data.
152 pub fn MutexARC<T:Owned>(user_data: T) -> MutexARC<T> {
153 mutex_arc_with_condvars(user_data, 1)
156 * Create a mutex-protected ARC with the supplied data and a specified number
157 * of condvars (as sync::mutex_with_condvars).
159 pub fn mutex_arc_with_condvars<T:Owned>(user_data: T,
160 num_condvars: uint) -> MutexARC<T> {
162 MutexARCInner { lock: mutex_with_condvars(num_condvars),
163 failed: false, data: user_data };
164 MutexARC { x: UnsafeAtomicRcBox::new(data) }
167 impl<T:Owned> Clone for MutexARC<T> {
168 /// Duplicate a mutex-protected ARC, as arc::clone.
169 fn clone(&self) -> MutexARC<T> {
170 // NB: Cloning the underlying mutex is not necessary. Its reference
171 // count would be exactly the same as the shared state's.
172 MutexARC { x: self.x.clone() }
176 impl<T:Owned> MutexARC<T> {
179 * Access the underlying mutable data with mutual exclusion from other
180 * tasks. The argument closure will be run with the mutex locked; all
181 * other tasks wishing to access the data will block until the closure
184 * The reason this function is 'unsafe' is because it is possible to
185 * construct a circular reference among multiple ARCs by mutating the
186 * underlying data. This creates potential for deadlock, but worse, this
187 * will guarantee a memory leak of all involved ARCs. Using mutex ARCs
188 * inside of other ARCs is safe in absence of circular references.
190 * If you wish to nest mutex_arcs, one strategy for ensuring safety at
191 * runtime is to add a "nesting level counter" inside the stored data, and
192 * when traversing the arcs, assert that they monotonically decrease.
196 * Failing while inside the ARC will unlock the ARC while unwinding, so
197 * that other tasks won't block forever. It will also poison the ARC:
198 * any tasks that subsequently try to access it (including those already
199 * blocked on the mutex) will also fail immediately.
202 pub unsafe fn access<U>(&self, blk: &fn(x: &mut T) -> U) -> U {
203 let state = self.x.get();
204 // Borrowck would complain about this if the function were
205 // not already unsafe. See borrow_rwlock, far below.
206 do (&(*state).lock).lock {
207 check_poison(true, (*state).failed);
208 let _z = PoisonOnFail(&mut (*state).failed);
209 blk(&mut (*state).data)
213 /// As access(), but with a condvar, as sync::mutex.lock_cond().
215 pub unsafe fn access_cond<'x, 'c, U>(&self,
216 blk: &fn(x: &'x mut T,
217 c: &'c Condvar) -> U)
219 let state = self.x.get();
220 do (&(*state).lock).lock_cond |cond| {
221 check_poison(true, (*state).failed);
222 let _z = PoisonOnFail(&mut (*state).failed);
223 blk(&mut (*state).data,
224 &Condvar {is_mutex: true,
225 failed: &mut (*state).failed,
231 // Common code for {mutex.access,rwlock.write}{,_cond}.
234 fn check_poison(is_mutex: bool, failed: bool) {
237 fail!("Poisoned MutexARC - another task failed inside!");
239 fail!("Poisoned rw_arc - another task failed inside!");
245 struct PoisonOnFail {
249 impl Drop for PoisonOnFail {
252 /* assert!(!*self.failed);
253 -- might be false in case of cond.wait() */
261 fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail {
267 /****************************************************************************
268 * R/W lock protected ARC
269 ****************************************************************************/
272 struct RWARCInner<T> { lock: RWlock, failed: bool, data: T }
274 * A dual-mode ARC protected by a reader-writer lock. The data can be accessed
275 * mutably or immutably, and immutably-accessing tasks may run concurrently.
277 * Unlike mutex_arcs, rw_arcs are safe, because they cannot be nested.
281 x: UnsafeAtomicRcBox<RWARCInner<T>>,
284 /// Create a reader/writer ARC with the supplied data.
285 pub fn RWARC<T:Const + Owned>(user_data: T) -> RWARC<T> {
286 rw_arc_with_condvars(user_data, 1)
289 * Create a reader/writer ARC with the supplied data and a specified number
290 * of condvars (as sync::rwlock_with_condvars).
292 pub fn rw_arc_with_condvars<T:Const + Owned>(
294 num_condvars: uint) -> RWARC<T>
297 RWARCInner { lock: rwlock_with_condvars(num_condvars),
298 failed: false, data: user_data };
299 RWARC { x: UnsafeAtomicRcBox::new(data), }
302 impl<T:Const + Owned> RWARC<T> {
303 /// Duplicate a rwlock-protected ARC, as arc::clone.
304 pub fn clone(&self) -> RWARC<T> {
312 impl<T:Const + Owned> RWARC<T> {
314 * Access the underlying data mutably. Locks the rwlock in write mode;
315 * other readers and writers will block.
319 * Failing while inside the ARC will unlock the ARC while unwinding, so
320 * that other tasks won't block forever. As MutexARC.access, it will also
321 * poison the ARC, so subsequent readers and writers will both also fail.
324 pub fn write<U>(&self, blk: &fn(x: &mut T) -> U) -> U {
326 let state = self.x.get();
327 do (*borrow_rwlock(state)).write {
328 check_poison(false, (*state).failed);
329 let _z = PoisonOnFail(&mut (*state).failed);
330 blk(&mut (*state).data)
335 /// As write(), but with a condvar, as sync::rwlock.write_cond().
337 pub fn write_cond<'x, 'c, U>(&self,
338 blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
341 let state = self.x.get();
342 do (*borrow_rwlock(state)).write_cond |cond| {
343 check_poison(false, (*state).failed);
344 let _z = PoisonOnFail(&mut (*state).failed);
345 blk(&mut (*state).data,
346 &Condvar {is_mutex: false,
347 failed: &mut (*state).failed,
354 * Access the underlying data immutably. May run concurrently with other
359 * Failing will unlock the ARC while unwinding. However, unlike all other
360 * access modes, this will not poison the ARC.
362 pub fn read<U>(&self, blk: &fn(x: &T) -> U) -> U {
364 let state = self.x.get();
365 do (*state).lock.read {
366 check_poison(false, (*state).failed);
373 * As write(), but with the ability to atomically 'downgrade' the lock.
374 * See sync::rwlock.write_downgrade(). The RWWriteMode token must be used
375 * to obtain the &mut T, and can be transformed into a RWReadMode token by
376 * calling downgrade(), after which a &T can be obtained instead.
381 * do arc.write_downgrade |mut write_token| {
382 * do write_token.write_cond |state, condvar| {
383 * ... exclusive access with mutable state ...
385 * let read_token = arc.downgrade(write_token);
386 * do read_token.read |state| {
387 * ... shared access with immutable state ...
392 pub fn write_downgrade<U>(&self, blk: &fn(v: RWWriteMode<T>) -> U) -> U {
394 let state = self.x.get();
395 do (*borrow_rwlock(state)).write_downgrade |write_mode| {
396 check_poison(false, (*state).failed);
398 data: &mut (*state).data,
400 poison: PoisonOnFail(&mut (*state).failed)
406 /// To be called inside of the write_downgrade block.
407 pub fn downgrade<'a>(&self, token: RWWriteMode<'a, T>)
408 -> RWReadMode<'a, T> {
410 // The rwlock should assert that the token belongs to us for us.
411 let state = self.x.get();
418 let new_token = (*state).lock.downgrade(t);
419 // Whatever region the input reference had, it will be safe to use
420 // the same region for the output reference. (The only 'unsafe' part
421 // of this cast is removing the mutability.)
422 let new_data = cast::transmute_immut(data);
423 // Downgrade ensured the token belonged to us. Just a sanity check.
424 assert!(borrow::ref_eq(&(*state).data, new_data));
434 // Borrowck rightly complains about immutably aliasing the rwlock in order to
435 // lock it. This wraps the unsafety, with the justification that the 'lock'
436 // field is never overwritten; only 'failed' and 'data'.
438 fn borrow_rwlock<T:Const + Owned>(state: *const RWARCInner<T>) -> *RWlock {
439 unsafe { cast::transmute(&const (*state).lock) }
442 /// The "write permission" token used for RWARC.write_downgrade().
443 pub struct RWWriteMode<'self, T> {
445 token: sync::RWlockWriteMode<'self>,
446 poison: PoisonOnFail,
449 /// The "read permission" token used for RWARC.write_downgrade().
450 pub struct RWReadMode<'self, T> {
452 token: sync::RWlockReadMode<'self>,
455 impl<'self, T:Const + Owned> RWWriteMode<'self, T> {
456 /// Access the pre-downgrade RWARC in write mode.
457 pub fn write<U>(&mut self, blk: &fn(x: &mut T) -> U) -> U {
471 /// Access the pre-downgrade RWARC in write mode with a condvar.
472 pub fn write_cond<'x, 'c, U>(&mut self,
473 blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
481 do token.write_cond |cond| {
485 failed: &mut *poison.failed,
496 impl<'self, T:Const + Owned> RWReadMode<'self, T> {
497 /// Access the post-downgrade rwlock in read mode.
498 pub fn read<U>(&self, blk: &fn(x: &T) -> U) -> U {
504 do token.read { blk(data) }
510 /****************************************************************************
512 ****************************************************************************/
516 use core::prelude::*;
521 use core::cell::Cell;
526 fn manually_share_arc() {
527 let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
530 let (p, c) = comm::stream();
532 do task::spawn() || {
533 let p = comm::PortSet::new();
536 let arc_v : ARC<~[int]> = p.recv();
538 let v = copy (*arc_v.get());
543 c.send(arc_v.clone());
545 assert_eq!(arc_v.get()[2], 3);
546 assert_eq!(arc_v.get()[4], 5);
552 fn test_mutex_arc_condvar() {
554 let arc = ~MutexARC(false);
555 let arc2 = ~arc.clone();
556 let (p,c) = comm::oneshot();
557 let (c,p) = (Cell::new(c), Cell::new(p));
559 // wait until parent gets in
560 comm::recv_one(p.take());
561 do arc2.access_cond |state, cond| {
566 do arc.access_cond |state, cond| {
567 comm::send_one(c.take(), ());
575 #[test] #[should_fail] #[ignore(cfg(windows))]
576 fn test_arc_condvar_poison() {
578 let arc = ~MutexARC(1);
579 let arc2 = ~arc.clone();
580 let (p, c) = comm::stream();
582 do task::spawn_unlinked || {
584 do arc2.access_cond |one, cond| {
586 // Parent should fail when it wakes up.
591 do arc.access_cond |one, cond| {
599 #[test] #[should_fail] #[ignore(cfg(windows))]
600 fn test_mutex_arc_poison() {
602 let arc = ~MutexARC(1);
603 let arc2 = ~arc.clone();
605 do arc2.access |one| {
609 do arc.access |one| {
614 #[test] #[should_fail] #[ignore(cfg(windows))]
615 fn test_rw_arc_poison_wr() {
617 let arc2 = (*arc).clone();
619 do arc2.write |one| {
627 #[test] #[should_fail] #[ignore(cfg(windows))]
628 fn test_rw_arc_poison_ww() {
630 let arc2 = (*arc).clone();
632 do arc2.write |one| {
640 #[test] #[should_fail] #[ignore(cfg(windows))]
641 fn test_rw_arc_poison_dw() {
643 let arc2 = (*arc).clone();
645 do arc2.write_downgrade |mut write_mode| {
646 do write_mode.write |one| {
655 #[test] #[ignore(cfg(windows))]
656 fn test_rw_arc_no_poison_rr() {
658 let arc2 = (*arc).clone();
668 #[test] #[ignore(cfg(windows))]
669 fn test_rw_arc_no_poison_rw() {
671 let arc2 = (*arc).clone();
681 #[test] #[ignore(cfg(windows))]
682 fn test_rw_arc_no_poison_dr() {
684 let arc2 = (*arc).clone();
686 do arc2.write_downgrade |write_mode| {
687 let read_mode = arc2.downgrade(write_mode);
688 do (&read_mode).read |one| {
700 let arc2 = (*arc).clone();
701 let (p,c) = comm::stream();
704 do arc2.write |num| {
715 // Readers try to catch the writer in the act
716 let mut children = ~[];
718 let arc3 = (*arc).clone();
719 let mut builder = task::task();
720 builder.future_result(|r| children.push(r));
728 // Wait for children to pass their asserts
729 for children.iter().advance |r| {
733 // Wait for writer to finish
736 assert_eq!(*num, 10);
740 fn test_rw_downgrade() {
741 // (1) A downgrader gets in write mode and does cond.wait.
742 // (2) A writer gets in write mode, sets state to 42, and does signal.
743 // (3) Downgrader wakes, sets state to 31337.
744 // (4) tells writer and all other readers to contend as it downgrades.
745 // (5) Writer attempts to set state back to 42, while downgraded task
746 // and all reader tasks assert that it's 31337.
750 let mut reader_convos = ~[];
752 let ((rp1,rc1),(rp2,rc2)) = (comm::stream(),comm::stream());
753 reader_convos.push((rc1, rp2));
754 let arcn = (*arc).clone();
756 rp1.recv(); // wait for downgrader to give go-ahead
757 do arcn.read |state| {
758 assert_eq!(*state, 31337);
765 let arc2 = (*arc).clone();
766 let ((wp1,wc1),(wp2,wc2)) = (comm::stream(),comm::stream());
769 do arc2.write_cond |state, cond| {
770 assert_eq!(*state, 0);
775 do arc2.write |state| {
776 // This shouldn't happen until after the downgrade read
777 // section, and all other readers, finish.
778 assert_eq!(*state, 31337);
785 do arc.write_downgrade |mut write_mode| {
786 do write_mode.write_cond |state, cond| {
787 wc1.send(()); // send to another writer who will wake us up
791 assert_eq!(*state, 42);
793 // send to other readers
794 for vec::each(reader_convos) |x| {
796 (ref rc, _) => rc.send(()),
800 let read_mode = arc.downgrade(write_mode);
801 do (&read_mode).read |state| {
802 // complete handshake with other readers
803 for vec::each(reader_convos) |x| {
805 (_, ref rp) => rp.recv(),
808 wc1.send(()); // tell writer to try again
809 assert_eq!(*state, 31337);
813 wp2.recv(); // complete handshake with writer
816 fn test_rw_write_cond_downgrade_read_race_helper() {
817 // Tests that when a downgrader hands off the "reader cloud" lock
818 // because of a contending reader, a writer can't race to get it
819 // instead, which would result in readers_and_writers. This tests
820 // the sync module rather than this one, but it's here because an
821 // rwarc gives us extra shared state to help check for the race.
822 // If you want to see this test fail, go to sync.rs and replace the
823 // line in RWlock::write_cond() that looks like:
824 // "blk(&Condvar { order: opt_lock, ..*cond })"
825 // with just "blk(cond)".
826 let x = ~RWARC(true);
827 let (wp, wc) = comm::stream();
830 let xw = (*x).clone();
832 do xw.write_cond |state, c| {
833 wc.send(()); // tell downgrader it's ok to go
835 // The core of the test is here: the condvar reacquire path
836 // must involve order_lock, so that it cannot race with a reader
837 // trying to receive the "reader cloud lock hand-off".
842 wp.recv(); // wait for writer to get in
844 do x.write_downgrade |mut write_mode| {
845 do write_mode.write_cond |state, c| {
847 // make writer contend in the cond-reacquire path
850 // make a reader task to trigger the "reader cloud lock" handoff
851 let xr = (*x).clone();
852 let (rp, rc) = comm::stream();
855 do xr.read |_state| { }
857 rp.recv(); // wait for reader task to exist
859 let read_mode = x.downgrade(write_mode);
860 do read_mode.read |state| {
861 // if writer mistakenly got in, make sure it mutates state
862 // before we assert on it
863 for 5.times { task::yield(); }
864 // make sure writer didn't get in.
870 fn test_rw_write_cond_downgrade_read_race() {
871 // Ideally the above test case would have yield statements in it that
872 // helped to expose the race nearly 100% of the time... but adding
873 // yields in the intuitively-right locations made it even less likely,
874 // and I wasn't sure why :( . This is a mediocre "next best" option.
875 for 8.times { test_rw_write_cond_downgrade_read_race_helper() }