1 // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 * Concurrency-enabled mechanisms for sharing mutable and/or immutable state
17 * In this example, a large vector of floats is shared between several tasks.
18 * With simple pipes, without Arc, a copy would have to be made for each task.
23 * let numbers=vec::from_fn(100, |ind| (ind as float)*rand::random());
24 * let shared_numbers=arc::Arc::new(numbers);
27 * let (port, chan) = stream();
28 * chan.send(shared_numbers.clone());
31 * let shared_numbers=port.recv();
32 * let local_numbers=shared_numbers.get();
34 * // Work with the local numbers
40 #[allow(missing_doc)];
44 use sync::{Mutex, RWLock};
47 use std::unstable::sync::UnsafeAtomicRcBox;
51 /// As sync::condvar, a mechanism for unlock-and-descheduling and signaling.
52 pub struct Condvar<'self> {
54 priv failed: &'self mut bool,
55 priv cond: &'self sync::Condvar<'self>
58 impl<'self> Condvar<'self> {
59 /// Atomically exit the associated Arc and block until a signal is sent.
61 pub fn wait(&self) { self.wait_on(0) }
64 * Atomically exit the associated Arc and block on a specified condvar
65 * until a signal is sent on that same condvar (as sync::cond.wait_on).
67 * wait() is equivalent to wait_on(0).
70 pub fn wait_on(&self, condvar_id: uint) {
71 assert!(!*self.failed);
72 self.cond.wait_on(condvar_id);
73 // This is why we need to wrap sync::condvar.
74 check_poison(self.is_mutex, *self.failed);
77 /// Wake up a blocked task. Returns false if there was no blocked task.
79 pub fn signal(&self) -> bool { self.signal_on(0) }
82 * Wake up a blocked task on a specified condvar (as
83 * sync::cond.signal_on). Returns false if there was no blocked task.
86 pub fn signal_on(&self, condvar_id: uint) -> bool {
87 assert!(!*self.failed);
88 self.cond.signal_on(condvar_id)
91 /// Wake up all blocked tasks. Returns the number of tasks woken.
93 pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
96 * Wake up all blocked tasks on a specified condvar (as
97 * sync::cond.broadcast_on). Returns the number of tasks woken.
100 pub fn broadcast_on(&self, condvar_id: uint) -> uint {
101 assert!(!*self.failed);
102 self.cond.broadcast_on(condvar_id)
106 /****************************************************************************
108 ****************************************************************************/
110 /// An atomically reference counted wrapper for shared immutable state.
111 pub struct Arc<T> { priv x: UnsafeAtomicRcBox<T> }
115 * Access the underlying data in an atomically reference counted
118 impl<T:Freeze+Send> Arc<T> {
119 /// Create an atomically reference counted wrapper.
120 pub fn new(data: T) -> Arc<T> {
121 Arc { x: UnsafeAtomicRcBox::new(data) }
124 pub fn get<'a>(&'a self) -> &'a T {
125 unsafe { &*self.x.get_immut() }
129 * Retrieve the data back out of the Arc. This function blocks until the
130 * reference given to it is the last existing one, and then unwrap the data
131 * instead of destroying it.
133 * If multiple tasks call unwrap, all but the first will fail. Do not call
134 * unwrap from a task that holds another reference to the same Arc; it is
135 * guaranteed to deadlock.
137 pub fn unwrap(self) -> T {
138 let Arc { x: x } = self;
143 impl<T:Freeze + Send> Clone for Arc<T> {
145 * Duplicate an atomically reference counted wrapper.
147 * The resulting two `arc` objects will point to the same underlying data
148 * object. However, one of the `arc` objects can be sent to another task,
149 * allowing them to share the underlying data.
151 fn clone(&self) -> Arc<T> {
152 Arc { x: self.x.clone() }
156 /****************************************************************************
157 * Mutex protected Arc (unsafe)
158 ****************************************************************************/
161 struct MutexArcInner<T> { priv lock: Mutex, priv failed: bool, priv data: T }
162 /// An Arc with mutable data protected by a blocking mutex.
163 struct MutexArc<T> { priv x: UnsafeAtomicRcBox<MutexArcInner<T>> }
166 impl<T:Send> Clone for MutexArc<T> {
167 /// Duplicate a mutex-protected Arc. See arc::clone for more details.
168 fn clone(&self) -> MutexArc<T> {
169 // NB: Cloning the underlying mutex is not necessary. Its reference
170 // count would be exactly the same as the shared state's.
171 MutexArc { x: self.x.clone() }
175 impl<T:Send> MutexArc<T> {
176 /// Create a mutex-protected Arc with the supplied data.
177 pub fn new(user_data: T) -> MutexArc<T> {
178 MutexArc::new_with_condvars(user_data, 1)
182 * Create a mutex-protected Arc with the supplied data and a specified number
183 * of condvars (as sync::Mutex::new_with_condvars).
185 pub fn new_with_condvars(user_data: T, num_condvars: uint) -> MutexArc<T> {
186 let data = MutexArcInner {
187 lock: Mutex::new_with_condvars(num_condvars),
188 failed: false, data: user_data
190 MutexArc { x: UnsafeAtomicRcBox::new(data) }
194 * Access the underlying mutable data with mutual exclusion from other
195 * tasks. The argument closure will be run with the mutex locked; all
196 * other tasks wishing to access the data will block until the closure
199 * The reason this function is 'unsafe' is because it is possible to
200 * construct a circular reference among multiple Arcs by mutating the
201 * underlying data. This creates potential for deadlock, but worse, this
202 * will guarantee a memory leak of all involved Arcs. Using mutex Arcs
203 * inside of other Arcs is safe in absence of circular references.
205 * If you wish to nest mutex_arcs, one strategy for ensuring safety at
206 * runtime is to add a "nesting level counter" inside the stored data, and
207 * when traversing the arcs, assert that they monotonically decrease.
211 * Failing while inside the Arc will unlock the Arc while unwinding, so
212 * that other tasks won't block forever. It will also poison the Arc:
213 * any tasks that subsequently try to access it (including those already
214 * blocked on the mutex) will also fail immediately.
217 pub unsafe fn access<U>(&self, blk: &fn(x: &mut T) -> U) -> U {
218 let state = self.x.get();
219 // Borrowck would complain about this if the function were
220 // not already unsafe. See borrow_rwlock, far below.
221 do (&(*state).lock).lock {
222 check_poison(true, (*state).failed);
223 let _z = PoisonOnFail(&mut (*state).failed);
224 blk(&mut (*state).data)
228 /// As access(), but with a condvar, as sync::mutex.lock_cond().
230 pub unsafe fn access_cond<'x, 'c, U>(&self,
231 blk: &fn(x: &'x mut T,
232 c: &'c Condvar) -> U)
234 let state = self.x.get();
235 do (&(*state).lock).lock_cond |cond| {
236 check_poison(true, (*state).failed);
237 let _z = PoisonOnFail(&mut (*state).failed);
238 blk(&mut (*state).data,
239 &Condvar {is_mutex: true,
240 failed: &mut (*state).failed,
246 * Retrieves the data, blocking until all other references are dropped,
247 * exactly as arc::unwrap.
249 * Will additionally fail if another task has failed while accessing the arc.
251 pub fn unwrap(self) -> T {
252 let MutexArc { x: x } = self;
253 let inner = x.unwrap();
254 let MutexArcInner { failed: failed, data: data, _ } = inner;
256 fail!(~"Can't unwrap poisoned MutexArc - another task failed inside!");
262 // Common code for {mutex.access,rwlock.write}{,_cond}.
265 fn check_poison(is_mutex: bool, failed: bool) {
268 fail!("Poisoned MutexArc - another task failed inside!");
270 fail!("Poisoned rw_arc - another task failed inside!");
276 struct PoisonOnFail {
280 impl Drop for PoisonOnFail {
283 /* assert!(!*self.failed);
284 -- might be false in case of cond.wait() */
292 fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail {
298 /****************************************************************************
299 * R/W lock protected Arc
300 ****************************************************************************/
303 struct RWArcInner<T> { priv lock: RWLock, priv failed: bool, priv data: T }
305 * A dual-mode Arc protected by a reader-writer lock. The data can be accessed
306 * mutably or immutably, and immutably-accessing tasks may run concurrently.
308 * Unlike mutex_arcs, rw_arcs are safe, because they cannot be nested.
312 priv x: UnsafeAtomicRcBox<RWArcInner<T>>,
315 impl<T:Freeze + Send> Clone for RWArc<T> {
316 /// Duplicate a rwlock-protected Arc. See arc::clone for more details.
317 fn clone(&self) -> RWArc<T> {
318 RWArc { x: self.x.clone() }
323 impl<T:Freeze + Send> RWArc<T> {
324 /// Create a reader/writer Arc with the supplied data.
325 pub fn new(user_data: T) -> RWArc<T> {
326 RWArc::new_with_condvars(user_data, 1)
330 * Create a reader/writer Arc with the supplied data and a specified number
331 * of condvars (as sync::RWLock::new_with_condvars).
333 pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWArc<T> {
334 let data = RWArcInner {
335 lock: RWLock::new_with_condvars(num_condvars),
336 failed: false, data: user_data
338 RWArc { x: UnsafeAtomicRcBox::new(data), }
342 * Access the underlying data mutably. Locks the rwlock in write mode;
343 * other readers and writers will block.
347 * Failing while inside the Arc will unlock the Arc while unwinding, so
348 * that other tasks won't block forever. As MutexArc.access, it will also
349 * poison the Arc, so subsequent readers and writers will both also fail.
352 pub fn write<U>(&self, blk: &fn(x: &mut T) -> U) -> U {
354 let state = self.x.get();
355 do (*borrow_rwlock(state)).write {
356 check_poison(false, (*state).failed);
357 let _z = PoisonOnFail(&mut (*state).failed);
358 blk(&mut (*state).data)
363 /// As write(), but with a condvar, as sync::rwlock.write_cond().
365 pub fn write_cond<'x, 'c, U>(&self,
366 blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
369 let state = self.x.get();
370 do (*borrow_rwlock(state)).write_cond |cond| {
371 check_poison(false, (*state).failed);
372 let _z = PoisonOnFail(&mut (*state).failed);
373 blk(&mut (*state).data,
374 &Condvar {is_mutex: false,
375 failed: &mut (*state).failed,
382 * Access the underlying data immutably. May run concurrently with other
387 * Failing will unlock the Arc while unwinding. However, unlike all other
388 * access modes, this will not poison the Arc.
390 pub fn read<U>(&self, blk: &fn(x: &T) -> U) -> U {
392 let state = self.x.get();
393 do (*state).lock.read {
394 check_poison(false, (*state).failed);
401 * As write(), but with the ability to atomically 'downgrade' the lock.
402 * See sync::rwlock.write_downgrade(). The RWWriteMode token must be used
403 * to obtain the &mut T, and can be transformed into a RWReadMode token by
404 * calling downgrade(), after which a &T can be obtained instead.
409 * do arc.write_downgrade |mut write_token| {
410 * do write_token.write_cond |state, condvar| {
411 * ... exclusive access with mutable state ...
413 * let read_token = arc.downgrade(write_token);
414 * do read_token.read |state| {
415 * ... shared access with immutable state ...
420 pub fn write_downgrade<U>(&self, blk: &fn(v: RWWriteMode<T>) -> U) -> U {
422 let state = self.x.get();
423 do (*borrow_rwlock(state)).write_downgrade |write_mode| {
424 check_poison(false, (*state).failed);
426 data: &mut (*state).data,
428 poison: PoisonOnFail(&mut (*state).failed)
434 /// To be called inside of the write_downgrade block.
435 pub fn downgrade<'a>(&self, token: RWWriteMode<'a, T>)
436 -> RWReadMode<'a, T> {
438 // The rwlock should assert that the token belongs to us for us.
439 let state = self.x.get();
446 let new_token = (*state).lock.downgrade(t);
447 // Whatever region the input reference had, it will be safe to use
448 // the same region for the output reference. (The only 'unsafe' part
449 // of this cast is removing the mutability.)
450 let new_data = cast::transmute_immut(data);
451 // Downgrade ensured the token belonged to us. Just a sanity check.
452 assert!(borrow::ref_eq(&(*state).data, new_data));
462 * Retrieves the data, blocking until all other references are dropped,
463 * exactly as arc::unwrap.
465 * Will additionally fail if another task has failed while accessing the arc
468 pub fn unwrap(self) -> T {
469 let RWArc { x: x, _ } = self;
470 let inner = x.unwrap();
471 let RWArcInner { failed: failed, data: data, _ } = inner;
473 fail!(~"Can't unwrap poisoned RWArc - another task failed inside!")
479 // Borrowck rightly complains about immutably aliasing the rwlock in order to
480 // lock it. This wraps the unsafety, with the justification that the 'lock'
481 // field is never overwritten; only 'failed' and 'data'.
483 fn borrow_rwlock<T:Freeze + Send>(state: *mut RWArcInner<T>) -> *RWLock {
484 unsafe { cast::transmute(&(*state).lock) }
487 /// The "write permission" token used for RWArc.write_downgrade().
488 pub struct RWWriteMode<'self, T> {
490 token: sync::RWLockWriteMode<'self>,
491 poison: PoisonOnFail,
494 /// The "read permission" token used for RWArc.write_downgrade().
495 pub struct RWReadMode<'self, T> {
497 token: sync::RWLockReadMode<'self>,
500 impl<'self, T:Freeze + Send> RWWriteMode<'self, T> {
501 /// Access the pre-downgrade RWArc in write mode.
502 pub fn write<U>(&mut self, blk: &fn(x: &mut T) -> U) -> U {
516 /// Access the pre-downgrade RWArc in write mode with a condvar.
517 pub fn write_cond<'x, 'c, U>(&mut self,
518 blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
526 do token.write_cond |cond| {
530 failed: &mut *poison.failed,
541 impl<'self, T:Freeze + Send> RWReadMode<'self, T> {
542 /// Access the post-downgrade rwlock in read mode.
543 pub fn read<U>(&self, blk: &fn(x: &T) -> U) -> U {
549 do token.read { blk(data) }
555 /****************************************************************************
557 ****************************************************************************/
569 fn manually_share_arc() {
570 let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
571 let arc_v = Arc::new(v);
573 let (p, c) = comm::stream();
575 do task::spawn() || {
576 let arc_v : Arc<~[int]> = p.recv();
578 let v = (*arc_v.get()).clone();
582 c.send(arc_v.clone());
584 assert_eq!(arc_v.get()[2], 3);
585 assert_eq!(arc_v.get()[4], 5);
591 fn test_mutex_arc_condvar() {
593 let arc = ~MutexArc::new(false);
594 let arc2 = ~arc.clone();
595 let (p,c) = comm::oneshot();
596 let (c,p) = (Cell::new(c), Cell::new(p));
598 // wait until parent gets in
599 comm::recv_one(p.take());
600 do arc2.access_cond |state, cond| {
605 do arc.access_cond |state, cond| {
606 comm::send_one(c.take(), ());
614 #[test] #[should_fail] #[ignore(cfg(windows))]
615 fn test_arc_condvar_poison() {
617 let arc = ~MutexArc::new(1);
618 let arc2 = ~arc.clone();
619 let (p, c) = comm::stream();
621 do task::spawn_unlinked || {
623 do arc2.access_cond |one, cond| {
625 // Parent should fail when it wakes up.
630 do arc.access_cond |one, cond| {
638 #[test] #[should_fail] #[ignore(cfg(windows))]
639 fn test_mutex_arc_poison() {
641 let arc = ~MutexArc::new(1);
642 let arc2 = ~arc.clone();
644 do arc2.access |one| {
648 do arc.access |one| {
653 #[test] #[should_fail] #[ignore(cfg(windows))]
654 pub fn test_mutex_arc_unwrap_poison() {
655 let arc = MutexArc::new(1);
656 let arc2 = ~(&arc).clone();
657 let (p, c) = comm::stream();
660 do arc2.access |one| {
667 let one = arc.unwrap();
670 #[test] #[should_fail] #[ignore(cfg(windows))]
671 fn test_rw_arc_poison_wr() {
672 let arc = ~RWArc::new(1);
673 let arc2 = (*arc).clone();
675 do arc2.write |one| {
683 #[test] #[should_fail] #[ignore(cfg(windows))]
684 fn test_rw_arc_poison_ww() {
685 let arc = ~RWArc::new(1);
686 let arc2 = (*arc).clone();
688 do arc2.write |one| {
696 #[test] #[should_fail] #[ignore(cfg(windows))]
697 fn test_rw_arc_poison_dw() {
698 let arc = ~RWArc::new(1);
699 let arc2 = (*arc).clone();
701 do arc2.write_downgrade |mut write_mode| {
702 do write_mode.write |one| {
711 #[test] #[ignore(cfg(windows))]
712 fn test_rw_arc_no_poison_rr() {
713 let arc = ~RWArc::new(1);
714 let arc2 = (*arc).clone();
724 #[test] #[ignore(cfg(windows))]
725 fn test_rw_arc_no_poison_rw() {
726 let arc = ~RWArc::new(1);
727 let arc2 = (*arc).clone();
737 #[test] #[ignore(cfg(windows))]
738 fn test_rw_arc_no_poison_dr() {
739 let arc = ~RWArc::new(1);
740 let arc2 = (*arc).clone();
742 do arc2.write_downgrade |write_mode| {
743 let read_mode = arc2.downgrade(write_mode);
744 do (&read_mode).read |one| {
755 let arc = ~RWArc::new(0);
756 let arc2 = (*arc).clone();
757 let (p,c) = comm::stream();
760 do arc2.write |num| {
771 // Readers try to catch the writer in the act
772 let mut children = ~[];
774 let arc3 = (*arc).clone();
775 let mut builder = task::task();
776 builder.future_result(|r| children.push(r));
784 // Wait for children to pass their asserts
785 for r in children.iter() {
789 // Wait for writer to finish
792 assert_eq!(*num, 10);
796 fn test_rw_downgrade() {
797 // (1) A downgrader gets in write mode and does cond.wait.
798 // (2) A writer gets in write mode, sets state to 42, and does signal.
799 // (3) Downgrader wakes, sets state to 31337.
800 // (4) tells writer and all other readers to contend as it downgrades.
801 // (5) Writer attempts to set state back to 42, while downgraded task
802 // and all reader tasks assert that it's 31337.
803 let arc = ~RWArc::new(0);
806 let mut reader_convos = ~[];
808 let ((rp1,rc1),(rp2,rc2)) = (comm::stream(),comm::stream());
809 reader_convos.push((rc1, rp2));
810 let arcn = (*arc).clone();
812 rp1.recv(); // wait for downgrader to give go-ahead
813 do arcn.read |state| {
814 assert_eq!(*state, 31337);
821 let arc2 = (*arc).clone();
822 let ((wp1,wc1),(wp2,wc2)) = (comm::stream(),comm::stream());
825 do arc2.write_cond |state, cond| {
826 assert_eq!(*state, 0);
831 do arc2.write |state| {
832 // This shouldn't happen until after the downgrade read
833 // section, and all other readers, finish.
834 assert_eq!(*state, 31337);
841 do arc.write_downgrade |mut write_mode| {
842 do write_mode.write_cond |state, cond| {
843 wc1.send(()); // send to another writer who will wake us up
847 assert_eq!(*state, 42);
849 // FIXME: #7372: hits type inference bug with iterators
850 // send to other readers
851 for i in range(0u, reader_convos.len()) {
852 match reader_convos[i] {
853 (ref rc, _) => rc.send(()),
857 let read_mode = arc.downgrade(write_mode);
858 do (&read_mode).read |state| {
859 // FIXME: #7372: hits type inference bug with iterators
860 // complete handshake with other readers
861 for i in range(0u, reader_convos.len()) {
862 match reader_convos[i] {
863 (_, ref rp) => rp.recv(),
866 wc1.send(()); // tell writer to try again
867 assert_eq!(*state, 31337);
871 wp2.recv(); // complete handshake with writer
874 fn test_rw_write_cond_downgrade_read_race_helper() {
875 // Tests that when a downgrader hands off the "reader cloud" lock
876 // because of a contending reader, a writer can't race to get it
877 // instead, which would result in readers_and_writers. This tests
878 // the sync module rather than this one, but it's here because an
879 // rwarc gives us extra shared state to help check for the race.
880 // If you want to see this test fail, go to sync.rs and replace the
881 // line in RWLock::write_cond() that looks like:
882 // "blk(&Condvar { order: opt_lock, ..*cond })"
883 // with just "blk(cond)".
884 let x = ~RWArc::new(true);
885 let (wp, wc) = comm::stream();
888 let xw = (*x).clone();
890 do xw.write_cond |state, c| {
891 wc.send(()); // tell downgrader it's ok to go
893 // The core of the test is here: the condvar reacquire path
894 // must involve order_lock, so that it cannot race with a reader
895 // trying to receive the "reader cloud lock hand-off".
900 wp.recv(); // wait for writer to get in
902 do x.write_downgrade |mut write_mode| {
903 do write_mode.write_cond |state, c| {
905 // make writer contend in the cond-reacquire path
908 // make a reader task to trigger the "reader cloud lock" handoff
909 let xr = (*x).clone();
910 let (rp, rc) = comm::stream();
913 do xr.read |_state| { }
915 rp.recv(); // wait for reader task to exist
917 let read_mode = x.downgrade(write_mode);
918 do read_mode.read |state| {
919 // if writer mistakenly got in, make sure it mutates state
920 // before we assert on it
921 do 5.times { task::yield(); }
922 // make sure writer didn't get in.
928 fn test_rw_write_cond_downgrade_read_race() {
929 // Ideally the above test case would have yield statements in it that
930 // helped to expose the race nearly 100% of the time... but adding
931 // yields in the intuitively-right locations made it even less likely,
932 // and I wasn't sure why :( . This is a mediocre "next best" option.
933 do 8.times { test_rw_write_cond_downgrade_read_race_helper() }