1 // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 * Concurrency-enabled mechanisms for sharing mutable and/or immutable state
17 * In this example, a large vector of floats is shared between several tasks.
18 * With simple pipes, without Arc, a copy would have to be made for each task.
23 * let numbers=vec::from_fn(100, |ind| (ind as float)*rand::random());
24 * let shared_numbers=arc::Arc::new(numbers);
27 * let (port, chan) = stream();
28 * chan.send(shared_numbers.clone());
31 * let shared_numbers=port.recv();
32 * let local_numbers=shared_numbers.get();
34 * // Work with the local numbers
40 #[allow(missing_doc)];
44 use sync::{Mutex, RWLock};
47 use std::unstable::sync::UnsafeArc;
51 /// As sync::condvar, a mechanism for unlock-and-descheduling and signaling.
52 pub struct Condvar<'self> {
54 priv failed: &'self mut bool,
55 priv cond: &'self sync::Condvar<'self>
58 impl<'self> Condvar<'self> {
59 /// Atomically exit the associated Arc and block until a signal is sent.
61 pub fn wait(&self) { self.wait_on(0) }
64 * Atomically exit the associated Arc and block on a specified condvar
65 * until a signal is sent on that same condvar (as sync::cond.wait_on).
67 * wait() is equivalent to wait_on(0).
70 pub fn wait_on(&self, condvar_id: uint) {
71 assert!(!*self.failed);
72 self.cond.wait_on(condvar_id);
73 // This is why we need to wrap sync::condvar.
74 check_poison(self.is_mutex, *self.failed);
77 /// Wake up a blocked task. Returns false if there was no blocked task.
79 pub fn signal(&self) -> bool { self.signal_on(0) }
82 * Wake up a blocked task on a specified condvar (as
83 * sync::cond.signal_on). Returns false if there was no blocked task.
86 pub fn signal_on(&self, condvar_id: uint) -> bool {
87 assert!(!*self.failed);
88 self.cond.signal_on(condvar_id)
91 /// Wake up all blocked tasks. Returns the number of tasks woken.
93 pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
96 * Wake up all blocked tasks on a specified condvar (as
97 * sync::cond.broadcast_on). Returns the number of tasks woken.
100 pub fn broadcast_on(&self, condvar_id: uint) -> uint {
101 assert!(!*self.failed);
102 self.cond.broadcast_on(condvar_id)
106 /****************************************************************************
108 ****************************************************************************/
110 /// An atomically reference counted wrapper for shared immutable state.
111 pub struct Arc<T> { priv x: UnsafeArc<T> }
115 * Access the underlying data in an atomically reference counted
118 impl<T:Freeze+Send> Arc<T> {
119 /// Create an atomically reference counted wrapper.
121 pub fn new(data: T) -> Arc<T> {
122 Arc { x: UnsafeArc::new(data) }
126 pub fn get<'a>(&'a self) -> &'a T {
127 unsafe { &*self.x.get_immut() }
131 * Retrieve the data back out of the Arc. This function blocks until the
132 * reference given to it is the last existing one, and then unwrap the data
133 * instead of destroying it.
135 * If multiple tasks call unwrap, all but the first will fail. Do not call
136 * unwrap from a task that holds another reference to the same Arc; it is
137 * guaranteed to deadlock.
139 pub fn unwrap(self) -> T {
140 let Arc { x: x } = self;
145 impl<T:Freeze + Send> Clone for Arc<T> {
147 * Duplicate an atomically reference counted wrapper.
149 * The resulting two `arc` objects will point to the same underlying data
150 * object. However, one of the `arc` objects can be sent to another task,
151 * allowing them to share the underlying data.
154 fn clone(&self) -> Arc<T> {
155 Arc { x: self.x.clone() }
159 /****************************************************************************
160 * Mutex protected Arc (unsafe)
161 ****************************************************************************/
164 struct MutexArcInner<T> { priv lock: Mutex, priv failed: bool, priv data: T }
166 /// An Arc with mutable data protected by a blocking mutex.
168 pub struct MutexArc<T> { priv x: UnsafeArc<MutexArcInner<T>> }
171 impl<T:Send> Clone for MutexArc<T> {
172 /// Duplicate a mutex-protected Arc. See arc::clone for more details.
174 fn clone(&self) -> MutexArc<T> {
175 // NB: Cloning the underlying mutex is not necessary. Its reference
176 // count would be exactly the same as the shared state's.
177 MutexArc { x: self.x.clone() }
181 impl<T:Send> MutexArc<T> {
182 /// Create a mutex-protected Arc with the supplied data.
183 pub fn new(user_data: T) -> MutexArc<T> {
184 MutexArc::new_with_condvars(user_data, 1)
188 * Create a mutex-protected Arc with the supplied data and a specified number
189 * of condvars (as sync::Mutex::new_with_condvars).
191 pub fn new_with_condvars(user_data: T, num_condvars: uint) -> MutexArc<T> {
192 let data = MutexArcInner {
193 lock: Mutex::new_with_condvars(num_condvars),
194 failed: false, data: user_data
196 MutexArc { x: UnsafeArc::new(data) }
200 * Access the underlying mutable data with mutual exclusion from other
201 * tasks. The argument closure will be run with the mutex locked; all
202 * other tasks wishing to access the data will block until the closure
205 * The reason this function is 'unsafe' is because it is possible to
206 * construct a circular reference among multiple Arcs by mutating the
207 * underlying data. This creates potential for deadlock, but worse, this
208 * will guarantee a memory leak of all involved Arcs. Using MutexArcs
209 * inside of other Arcs is safe in absence of circular references.
211 * If you wish to nest MutexArcs, one strategy for ensuring safety at
212 * runtime is to add a "nesting level counter" inside the stored data, and
213 * when traversing the arcs, assert that they monotonically decrease.
217 * Failing while inside the Arc will unlock the Arc while unwinding, so
218 * that other tasks won't block forever. It will also poison the Arc:
219 * any tasks that subsequently try to access it (including those already
220 * blocked on the mutex) will also fail immediately.
223 pub unsafe fn unsafe_access<U>(&self, blk: |x: &mut T| -> U) -> U {
224 let state = self.x.get();
225 // Borrowck would complain about this if the function were
226 // not already unsafe. See borrow_rwlock, far below.
227 (&(*state).lock).lock(|| {
228 check_poison(true, (*state).failed);
229 let _z = PoisonOnFail(&mut (*state).failed);
230 blk(&mut (*state).data)
234 /// As unsafe_access(), but with a condvar, as sync::mutex.lock_cond().
236 pub unsafe fn unsafe_access_cond<U>(&self,
237 blk: |x: &mut T, c: &Condvar| -> U)
239 let state = self.x.get();
240 (&(*state).lock).lock_cond(|cond| {
241 check_poison(true, (*state).failed);
242 let _z = PoisonOnFail(&mut (*state).failed);
243 blk(&mut (*state).data,
244 &Condvar {is_mutex: true,
245 failed: &mut (*state).failed,
251 * Retrieves the data, blocking until all other references are dropped,
252 * exactly as arc::unwrap.
254 * Will additionally fail if another task has failed while accessing the arc.
256 pub fn unwrap(self) -> T {
257 let MutexArc { x: x } = self;
258 let inner = x.unwrap();
259 let MutexArcInner { failed: failed, data: data, .. } = inner;
261 fail!("Can't unwrap poisoned MutexArc - another task failed inside!");
267 impl<T:Freeze + Send> MutexArc<T> {
272 * The difference between access and unsafe_access is that the former
273 * forbids mutexes to be nested. While unsafe_access can be used on
274 * MutexArcs without freezable interiors, this safe version of access
275 * requires the Freeze bound, which prohibits access on MutexArcs which
276 * might contain nested MutexArcs inside.
278 * The purpose of this is to offer a safe implementation of MutexArc to be
279 * used instead of RWArc in cases where no readers are needed and sightly
280 * better performance is required.
282 * Both methods have the same failure behaviour as unsafe_access and
283 * unsafe_access_cond.
286 pub fn access<U>(&self, blk: |x: &mut T| -> U) -> U {
287 unsafe { self.unsafe_access(blk) }
290 /// As unsafe_access_cond but safe and Freeze.
292 pub fn access_cond<U>(&self,
293 blk: |x: &mut T, c: &Condvar| -> U)
295 unsafe { self.unsafe_access_cond(blk) }
299 // Common code for {mutex.access,rwlock.write}{,_cond}.
302 fn check_poison(is_mutex: bool, failed: bool) {
305 fail!("Poisoned MutexArc - another task failed inside!");
307 fail!("Poisoned rw_arc - another task failed inside!");
313 struct PoisonOnFail {
317 impl Drop for PoisonOnFail {
320 /* assert!(!*self.failed);
321 -- might be false in case of cond.wait() */
329 fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail {
335 /****************************************************************************
336 * R/W lock protected Arc
337 ****************************************************************************/
340 struct RWArcInner<T> { priv lock: RWLock, priv failed: bool, priv data: T }
342 * A dual-mode Arc protected by a reader-writer lock. The data can be accessed
343 * mutably or immutably, and immutably-accessing tasks may run concurrently.
345 * Unlike mutex_arcs, rw_arcs are safe, because they cannot be nested.
348 pub struct RWArc<T> {
349 priv x: UnsafeArc<RWArcInner<T>>,
352 impl<T:Freeze + Send> Clone for RWArc<T> {
353 /// Duplicate a rwlock-protected Arc. See arc::clone for more details.
355 fn clone(&self) -> RWArc<T> {
356 RWArc { x: self.x.clone() }
361 impl<T:Freeze + Send> RWArc<T> {
362 /// Create a reader/writer Arc with the supplied data.
363 pub fn new(user_data: T) -> RWArc<T> {
364 RWArc::new_with_condvars(user_data, 1)
368 * Create a reader/writer Arc with the supplied data and a specified number
369 * of condvars (as sync::RWLock::new_with_condvars).
371 pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWArc<T> {
372 let data = RWArcInner {
373 lock: RWLock::new_with_condvars(num_condvars),
374 failed: false, data: user_data
376 RWArc { x: UnsafeArc::new(data), }
380 * Access the underlying data mutably. Locks the rwlock in write mode;
381 * other readers and writers will block.
385 * Failing while inside the Arc will unlock the Arc while unwinding, so
386 * that other tasks won't block forever. As MutexArc.access, it will also
387 * poison the Arc, so subsequent readers and writers will both also fail.
390 pub fn write<U>(&self, blk: |x: &mut T| -> U) -> U {
392 let state = self.x.get();
393 (*borrow_rwlock(state)).write(|| {
394 check_poison(false, (*state).failed);
395 let _z = PoisonOnFail(&mut (*state).failed);
396 blk(&mut (*state).data)
401 /// As write(), but with a condvar, as sync::rwlock.write_cond().
403 pub fn write_cond<U>(&self,
404 blk: |x: &mut T, c: &Condvar| -> U)
407 let state = self.x.get();
408 (*borrow_rwlock(state)).write_cond(|cond| {
409 check_poison(false, (*state).failed);
410 let _z = PoisonOnFail(&mut (*state).failed);
411 blk(&mut (*state).data,
412 &Condvar {is_mutex: false,
413 failed: &mut (*state).failed,
420 * Access the underlying data immutably. May run concurrently with other
425 * Failing will unlock the Arc while unwinding. However, unlike all other
426 * access modes, this will not poison the Arc.
428 pub fn read<U>(&self, blk: |x: &T| -> U) -> U {
430 let state = self.x.get();
431 (*state).lock.read(|| {
432 check_poison(false, (*state).failed);
439 * As write(), but with the ability to atomically 'downgrade' the lock.
440 * See sync::rwlock.write_downgrade(). The RWWriteMode token must be used
441 * to obtain the &mut T, and can be transformed into a RWReadMode token by
442 * calling downgrade(), after which a &T can be obtained instead.
447 * do arc.write_downgrade |mut write_token| {
448 * do write_token.write_cond |state, condvar| {
449 * ... exclusive access with mutable state ...
451 * let read_token = arc.downgrade(write_token);
452 * do read_token.read |state| {
453 * ... shared access with immutable state ...
458 pub fn write_downgrade<U>(&self, blk: |v: RWWriteMode<T>| -> U) -> U {
460 let state = self.x.get();
461 (*borrow_rwlock(state)).write_downgrade(|write_mode| {
462 check_poison(false, (*state).failed);
464 data: &mut (*state).data,
466 poison: PoisonOnFail(&mut (*state).failed)
472 /// To be called inside of the write_downgrade block.
473 pub fn downgrade<'a>(&self, token: RWWriteMode<'a, T>)
474 -> RWReadMode<'a, T> {
476 // The rwlock should assert that the token belongs to us for us.
477 let state = self.x.get();
484 let new_token = (*state).lock.downgrade(t);
485 // Whatever region the input reference had, it will be safe to use
486 // the same region for the output reference. (The only 'unsafe' part
487 // of this cast is removing the mutability.)
489 // Downgrade ensured the token belonged to us. Just a sanity check.
490 assert!(borrow::ref_eq(&(*state).data, new_data));
500 * Retrieves the data, blocking until all other references are dropped,
501 * exactly as arc::unwrap.
503 * Will additionally fail if another task has failed while accessing the arc
506 pub fn unwrap(self) -> T {
507 let RWArc { x: x, .. } = self;
508 let inner = x.unwrap();
509 let RWArcInner { failed: failed, data: data, .. } = inner;
511 fail!("Can't unwrap poisoned RWArc - another task failed inside!")
517 // Borrowck rightly complains about immutably aliasing the rwlock in order to
518 // lock it. This wraps the unsafety, with the justification that the 'lock'
519 // field is never overwritten; only 'failed' and 'data'.
521 fn borrow_rwlock<T:Freeze + Send>(state: *mut RWArcInner<T>) -> *RWLock {
522 unsafe { cast::transmute(&(*state).lock) }
525 /// The "write permission" token used for RWArc.write_downgrade().
526 pub struct RWWriteMode<'self, T> {
527 priv data: &'self mut T,
528 priv token: sync::RWLockWriteMode<'self>,
529 priv poison: PoisonOnFail,
532 /// The "read permission" token used for RWArc.write_downgrade().
533 pub struct RWReadMode<'self, T> {
535 priv token: sync::RWLockReadMode<'self>,
538 impl<'self, T:Freeze + Send> RWWriteMode<'self, T> {
539 /// Access the pre-downgrade RWArc in write mode.
540 pub fn write<U>(&mut self, blk: |x: &mut T| -> U) -> U {
547 token.write(|| blk(data))
552 /// Access the pre-downgrade RWArc in write mode with a condvar.
553 pub fn write_cond<U>(&mut self,
554 blk: |x: &mut T, c: &Condvar| -> U)
562 token.write_cond(|cond| {
566 failed: &mut *poison.failed,
577 impl<'self, T:Freeze + Send> RWReadMode<'self, T> {
578 /// Access the post-downgrade rwlock in read mode.
579 pub fn read<U>(&self, blk: |x: &T| -> U) -> U {
585 token.read(|| blk(data))
591 /****************************************************************************
593 ****************************************************************************/
605 fn manually_share_arc() {
606 let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
607 let arc_v = Arc::new(v);
609 let (p, c) = comm::stream();
612 let arc_v: Arc<~[int]> = p.recv();
614 let v = arc_v.get().clone();
618 c.send(arc_v.clone());
620 assert_eq!(arc_v.get()[2], 3);
621 assert_eq!(arc_v.get()[4], 5);
623 info!("{:?}", arc_v);
627 fn test_mutex_arc_condvar() {
628 let arc = ~MutexArc::new(false);
629 let arc2 = ~arc.clone();
630 let (p,c) = comm::oneshot();
631 let (c,p) = (Cell::new(c), Cell::new(p));
633 // wait until parent gets in
635 arc2.access_cond(|state, cond| {
641 arc.access_cond(|state, cond| {
650 #[test] #[should_fail]
651 fn test_arc_condvar_poison() {
652 let arc = ~MutexArc::new(1);
653 let arc2 = ~arc.clone();
654 let (p, c) = comm::stream();
658 arc2.access_cond(|one, cond| {
660 // Parent should fail when it wakes up.
665 arc.access_cond(|one, cond| {
673 #[test] #[should_fail]
674 fn test_mutex_arc_poison() {
675 let arc = ~MutexArc::new(1);
676 let arc2 = ~arc.clone();
687 #[test] #[should_fail]
688 pub fn test_mutex_arc_unwrap_poison() {
689 let arc = MutexArc::new(1);
690 let arc2 = ~(&arc).clone();
691 let (p, c) = comm::stream();
699 let one = arc.unwrap();
704 fn test_unsafe_mutex_arc_nested() {
706 // Tests nested mutexes and access
707 // to underlaying data.
708 let arc = ~MutexArc::new(1);
709 let arc2 = ~MutexArc::new(*arc);
711 (*arc2).unsafe_access(|mutex| {
712 (*mutex).access(|one| {
720 #[test] #[should_fail]
721 fn test_rw_arc_poison_wr() {
722 let arc = RWArc::new(1);
723 let arc2 = arc.clone();
734 #[test] #[should_fail]
735 fn test_rw_arc_poison_ww() {
736 let arc = RWArc::new(1);
737 let arc2 = arc.clone();
747 #[test] #[should_fail]
748 fn test_rw_arc_poison_dw() {
749 let arc = RWArc::new(1);
750 let arc2 = arc.clone();
752 arc2.write_downgrade(|mut write_mode| {
753 write_mode.write(|one| {
763 fn test_rw_arc_no_poison_rr() {
764 let arc = RWArc::new(1);
765 let arc2 = arc.clone();
776 fn test_rw_arc_no_poison_rw() {
777 let arc = RWArc::new(1);
778 let arc2 = arc.clone();
789 fn test_rw_arc_no_poison_dr() {
790 let arc = RWArc::new(1);
791 let arc2 = arc.clone();
793 arc2.write_downgrade(|write_mode| {
794 let read_mode = arc2.downgrade(write_mode);
795 read_mode.read(|one| {
806 let arc = RWArc::new(0);
807 let arc2 = arc.clone();
808 let (p, c) = comm::stream();
822 // Readers try to catch the writer in the act
823 let mut children = ~[];
825 let arc3 = arc.clone();
826 let mut builder = task::task();
827 children.push(builder.future_result());
835 // Wait for children to pass their asserts
836 for r in children.iter() {
840 // Wait for writer to finish
843 assert_eq!(*num, 10);
847 fn test_rw_downgrade() {
848 // (1) A downgrader gets in write mode and does cond.wait.
849 // (2) A writer gets in write mode, sets state to 42, and does signal.
850 // (3) Downgrader wakes, sets state to 31337.
851 // (4) tells writer and all other readers to contend as it downgrades.
852 // (5) Writer attempts to set state back to 42, while downgraded task
853 // and all reader tasks assert that it's 31337.
854 let arc = RWArc::new(0);
857 let mut reader_convos = ~[];
859 let ((rp1, rc1), (rp2, rc2)) = (comm::stream(), comm::stream());
860 reader_convos.push((rc1, rp2));
861 let arcn = arc.clone();
863 rp1.recv(); // wait for downgrader to give go-ahead
865 assert_eq!(*state, 31337);
872 let arc2 = arc.clone();
873 let ((wp1, wc1), (wp2, wc2)) = (comm::stream(), comm::stream());
876 arc2.write_cond(|state, cond| {
877 assert_eq!(*state, 0);
883 // This shouldn't happen until after the downgrade read
884 // section, and all other readers, finish.
885 assert_eq!(*state, 31337);
892 arc.write_downgrade(|mut write_mode| {
893 write_mode.write_cond(|state, cond| {
894 wc1.send(()); // send to another writer who will wake us up
898 assert_eq!(*state, 42);
900 // send to other readers
901 for &(ref rc, _) in reader_convos.iter() {
905 let read_mode = arc.downgrade(write_mode);
906 read_mode.read(|state| {
907 // complete handshake with other readers
908 for &(_, ref rp) in reader_convos.iter() {
911 wc1.send(()); // tell writer to try again
912 assert_eq!(*state, 31337);
916 wp2.recv(); // complete handshake with writer
919 fn test_rw_write_cond_downgrade_read_race_helper() {
920 // Tests that when a downgrader hands off the "reader cloud" lock
921 // because of a contending reader, a writer can't race to get it
922 // instead, which would result in readers_and_writers. This tests
923 // the sync module rather than this one, but it's here because an
924 // rwarc gives us extra shared state to help check for the race.
925 // If you want to see this test fail, go to sync.rs and replace the
926 // line in RWLock::write_cond() that looks like:
927 // "blk(&Condvar { order: opt_lock, ..*cond })"
928 // with just "blk(cond)".
929 let x = RWArc::new(true);
930 let (wp, wc) = comm::stream();
935 xw.write_cond(|state, c| {
936 wc.send(()); // tell downgrader it's ok to go
938 // The core of the test is here: the condvar reacquire path
939 // must involve order_lock, so that it cannot race with a reader
940 // trying to receive the "reader cloud lock hand-off".
945 wp.recv(); // wait for writer to get in
947 x.write_downgrade(|mut write_mode| {
948 write_mode.write_cond(|state, c| {
950 // make writer contend in the cond-reacquire path
953 // make a reader task to trigger the "reader cloud lock" handoff
955 let (rp, rc) = comm::stream();
958 xr.read(|_state| { })
960 rp.recv(); // wait for reader task to exist
962 let read_mode = x.downgrade(write_mode);
963 read_mode.read(|state| {
964 // if writer mistakenly got in, make sure it mutates state
965 // before we assert on it
966 5.times(|| task::deschedule());
967 // make sure writer didn't get in.
973 fn test_rw_write_cond_downgrade_read_race() {
974 // Ideally the above test case would have deschedule statements in it that
975 // helped to expose the race nearly 100% of the time... but adding
976 // deschedules in the intuitively-right locations made it even less likely,
977 // and I wasn't sure why :( . This is a mediocre "next best" option.
978 8.times(|| test_rw_write_cond_downgrade_read_race_helper());