]> git.lizzy.rs Git - rust.git/blob - src/libextra/arc.rs
3fbfae52c6300bf925d071a95c611447149476ec
[rust.git] / src / libextra / arc.rs
1 // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*!
12  * Concurrency-enabled mechanisms for sharing mutable and/or immutable state
13  * between tasks.
14  *
15  * # Example
16  *
17  * In this example, a large vector of floats is shared between several tasks.
18  * With simple pipes, without Arc, a copy would have to be made for each task.
19  *
20  * ~~~ {.rust}
21  * extern mod std;
22  * use extra::arc;
23  * let numbers=vec::from_fn(100, |ind| (ind as float)*rand::random());
24  * let shared_numbers=arc::Arc::new(numbers);
25  *
26  *   do 10.times {
27  *       let (port, chan)  = stream();
28  *       chan.send(shared_numbers.clone());
29  *
30  *       do spawn {
31  *           let shared_numbers=port.recv();
32  *           let local_numbers=shared_numbers.get();
33  *
34  *           // Work with the local numbers
35  *       }
36  *   }
37  * ~~~
38  */
39
40 #[allow(missing_doc)];
41
42
43 use sync;
44 use sync::{Mutex, RWLock};
45
46 use std::cast;
47 use std::unstable::sync::UnsafeArc;
48 use std::task;
49 use std::borrow;
50
51 /// As sync::condvar, a mechanism for unlock-and-descheduling and signaling.
52 pub struct Condvar<'self> {
53     priv is_mutex: bool,
54     priv failed: &'self mut bool,
55     priv cond: &'self sync::Condvar<'self>
56 }
57
58 impl<'self> Condvar<'self> {
59     /// Atomically exit the associated Arc and block until a signal is sent.
60     #[inline]
61     pub fn wait(&self) { self.wait_on(0) }
62
63     /**
64      * Atomically exit the associated Arc and block on a specified condvar
65      * until a signal is sent on that same condvar (as sync::cond.wait_on).
66      *
67      * wait() is equivalent to wait_on(0).
68      */
69     #[inline]
70     pub fn wait_on(&self, condvar_id: uint) {
71         assert!(!*self.failed);
72         self.cond.wait_on(condvar_id);
73         // This is why we need to wrap sync::condvar.
74         check_poison(self.is_mutex, *self.failed);
75     }
76
77     /// Wake up a blocked task. Returns false if there was no blocked task.
78     #[inline]
79     pub fn signal(&self) -> bool { self.signal_on(0) }
80
81     /**
82      * Wake up a blocked task on a specified condvar (as
83      * sync::cond.signal_on). Returns false if there was no blocked task.
84      */
85     #[inline]
86     pub fn signal_on(&self, condvar_id: uint) -> bool {
87         assert!(!*self.failed);
88         self.cond.signal_on(condvar_id)
89     }
90
91     /// Wake up all blocked tasks. Returns the number of tasks woken.
92     #[inline]
93     pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
94
95     /**
96      * Wake up all blocked tasks on a specified condvar (as
97      * sync::cond.broadcast_on). Returns the number of tasks woken.
98      */
99     #[inline]
100     pub fn broadcast_on(&self, condvar_id: uint) -> uint {
101         assert!(!*self.failed);
102         self.cond.broadcast_on(condvar_id)
103     }
104 }
105
106 /****************************************************************************
107  * Immutable Arc
108  ****************************************************************************/
109
110 /// An atomically reference counted wrapper for shared immutable state.
111 pub struct Arc<T> { priv x: UnsafeArc<T> }
112
113
114 /**
115  * Access the underlying data in an atomically reference counted
116  * wrapper.
117  */
118 impl<T:Freeze+Send> Arc<T> {
119     /// Create an atomically reference counted wrapper.
120     pub fn new(data: T) -> Arc<T> {
121         Arc { x: UnsafeArc::new(data) }
122     }
123
124     pub fn get<'a>(&'a self) -> &'a T {
125         unsafe { &*self.x.get_immut() }
126     }
127
128     /**
129      * Retrieve the data back out of the Arc. This function blocks until the
130      * reference given to it is the last existing one, and then unwrap the data
131      * instead of destroying it.
132      *
133      * If multiple tasks call unwrap, all but the first will fail. Do not call
134      * unwrap from a task that holds another reference to the same Arc; it is
135      * guaranteed to deadlock.
136      */
137     pub fn unwrap(self) -> T {
138         let Arc { x: x } = self;
139         x.unwrap()
140     }
141 }
142
143 impl<T:Freeze + Send> Clone for Arc<T> {
144     /**
145     * Duplicate an atomically reference counted wrapper.
146     *
147     * The resulting two `arc` objects will point to the same underlying data
148     * object. However, one of the `arc` objects can be sent to another task,
149     * allowing them to share the underlying data.
150     */
151     fn clone(&self) -> Arc<T> {
152         Arc { x: self.x.clone() }
153     }
154 }
155
156 /****************************************************************************
157  * Mutex protected Arc (unsafe)
158  ****************************************************************************/
159
160 #[doc(hidden)]
161 struct MutexArcInner<T> { priv lock: Mutex, priv failed: bool, priv data: T }
162
163 /// An Arc with mutable data protected by a blocking mutex.
164 #[no_freeze]
165 struct MutexArc<T> { priv x: UnsafeArc<MutexArcInner<T>> }
166
167
168 impl<T:Send> Clone for MutexArc<T> {
169     /// Duplicate a mutex-protected Arc. See arc::clone for more details.
170     fn clone(&self) -> MutexArc<T> {
171         // NB: Cloning the underlying mutex is not necessary. Its reference
172         // count would be exactly the same as the shared state's.
173         MutexArc { x: self.x.clone() }
174     }
175 }
176
177 impl<T:Send> MutexArc<T> {
178     /// Create a mutex-protected Arc with the supplied data.
179     pub fn new(user_data: T) -> MutexArc<T> {
180         MutexArc::new_with_condvars(user_data, 1)
181     }
182
183     /**
184      * Create a mutex-protected Arc with the supplied data and a specified number
185      * of condvars (as sync::Mutex::new_with_condvars).
186      */
187     pub fn new_with_condvars(user_data: T, num_condvars: uint) -> MutexArc<T> {
188         let data = MutexArcInner {
189             lock: Mutex::new_with_condvars(num_condvars),
190             failed: false, data: user_data
191         };
192         MutexArc { x: UnsafeArc::new(data) }
193     }
194
195     /**
196      * Access the underlying mutable data with mutual exclusion from other
197      * tasks. The argument closure will be run with the mutex locked; all
198      * other tasks wishing to access the data will block until the closure
199      * finishes running.
200      *
201      * The reason this function is 'unsafe' is because it is possible to
202      * construct a circular reference among multiple Arcs by mutating the
203      * underlying data. This creates potential for deadlock, but worse, this
204      * will guarantee a memory leak of all involved Arcs. Using MutexArcs
205      * inside of other Arcs is safe in absence of circular references.
206      *
207      * If you wish to nest MutexArcs, one strategy for ensuring safety at
208      * runtime is to add a "nesting level counter" inside the stored data, and
209      * when traversing the arcs, assert that they monotonically decrease.
210      *
211      * # Failure
212      *
213      * Failing while inside the Arc will unlock the Arc while unwinding, so
214      * that other tasks won't block forever. It will also poison the Arc:
215      * any tasks that subsequently try to access it (including those already
216      * blocked on the mutex) will also fail immediately.
217      */
218     #[inline]
219     pub unsafe fn unsafe_access<U>(&self, blk: &fn(x: &mut T) -> U) -> U {
220         let state = self.x.get();
221         // Borrowck would complain about this if the function were
222         // not already unsafe. See borrow_rwlock, far below.
223         do (&(*state).lock).lock {
224             check_poison(true, (*state).failed);
225             let _z = PoisonOnFail(&mut (*state).failed);
226             blk(&mut (*state).data)
227         }
228     }
229
230     /// As unsafe_access(), but with a condvar, as sync::mutex.lock_cond().
231     #[inline]
232     pub unsafe fn unsafe_access_cond<'x, 'c, U>(&self,
233                                          blk: &fn(x: &'x mut T,
234                                                   c: &'c Condvar) -> U)
235                                          -> U {
236         let state = self.x.get();
237         do (&(*state).lock).lock_cond |cond| {
238             check_poison(true, (*state).failed);
239             let _z = PoisonOnFail(&mut (*state).failed);
240             blk(&mut (*state).data,
241                 &Condvar {is_mutex: true,
242                           failed: &mut (*state).failed,
243                           cond: cond })
244         }
245     }
246
247     /**
248      * Retrieves the data, blocking until all other references are dropped,
249      * exactly as arc::unwrap.
250      *
251      * Will additionally fail if another task has failed while accessing the arc.
252      */
253     pub fn unwrap(self) -> T {
254         let MutexArc { x: x } = self;
255         let inner = x.unwrap();
256         let MutexArcInner { failed: failed, data: data, _ } = inner;
257         if failed {
258             fail!(~"Can't unwrap poisoned MutexArc - another task failed inside!");
259         }
260         data
261     }
262 }
263
264 impl<T:Freeze + Send> MutexArc<T> {
265
266     /**
267      * As unsafe_access.
268      *
269      * The difference between access and unsafe_access is that the former
270      * forbids mutexes to be nested. While unsafe_access can be used on
271      * MutexArcs without freezable interiors, this safe version of access
272      * requires the Freeze bound, which prohibits access on MutexArcs which
273      * might contain nested MutexArcs inside.
274      *
275      * The purpose of this is to offer a safe implementation of MutexArc to be
276      * used instead of RWArc in cases where no readers are needed and sightly
277      * better performance is required.
278      *
279      * Both methods have the same failure behaviour as unsafe_access and
280      * unsafe_access_cond.
281      */
282     #[inline]
283     pub fn access<U>(&self, blk: &fn(x: &mut T) -> U) -> U {
284         unsafe { self.unsafe_access(blk) }
285     }
286
287     /// As unsafe_access_cond but safe and Freeze.
288     #[inline]
289     pub fn access_cond<'x, 'c, U>(&self,
290                                   blk: &fn(x: &'x mut T,
291                                            c: &'c Condvar) -> U)
292                                   -> U {
293         unsafe { self.unsafe_access_cond(blk) }
294     }
295 }
296
297 // Common code for {mutex.access,rwlock.write}{,_cond}.
298 #[inline]
299 #[doc(hidden)]
300 fn check_poison(is_mutex: bool, failed: bool) {
301     if failed {
302         if is_mutex {
303             fail!("Poisoned MutexArc - another task failed inside!");
304         } else {
305             fail!("Poisoned rw_arc - another task failed inside!");
306         }
307     }
308 }
309
310 #[doc(hidden)]
311 struct PoisonOnFail {
312     failed: *mut bool,
313 }
314
315 impl Drop for PoisonOnFail {
316     fn drop(&self) {
317         unsafe {
318             /* assert!(!*self.failed);
319                -- might be false in case of cond.wait() */
320             if task::failing() {
321                 *self.failed = true;
322             }
323         }
324     }
325 }
326
327 fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail {
328     PoisonOnFail {
329         failed: failed
330     }
331 }
332
333 /****************************************************************************
334  * R/W lock protected Arc
335  ****************************************************************************/
336
337 #[doc(hidden)]
338 struct RWArcInner<T> { priv lock: RWLock, priv failed: bool, priv data: T }
339 /**
340  * A dual-mode Arc protected by a reader-writer lock. The data can be accessed
341  * mutably or immutably, and immutably-accessing tasks may run concurrently.
342  *
343  * Unlike mutex_arcs, rw_arcs are safe, because they cannot be nested.
344  */
345 #[no_freeze]
346 struct RWArc<T> {
347     priv x: UnsafeArc<RWArcInner<T>>,
348 }
349
350 impl<T:Freeze + Send> Clone for RWArc<T> {
351     /// Duplicate a rwlock-protected Arc. See arc::clone for more details.
352     fn clone(&self) -> RWArc<T> {
353         RWArc { x: self.x.clone() }
354     }
355
356 }
357
358 impl<T:Freeze + Send> RWArc<T> {
359     /// Create a reader/writer Arc with the supplied data.
360     pub fn new(user_data: T) -> RWArc<T> {
361         RWArc::new_with_condvars(user_data, 1)
362     }
363
364     /**
365      * Create a reader/writer Arc with the supplied data and a specified number
366      * of condvars (as sync::RWLock::new_with_condvars).
367      */
368     pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWArc<T> {
369         let data = RWArcInner {
370             lock: RWLock::new_with_condvars(num_condvars),
371             failed: false, data: user_data
372         };
373         RWArc { x: UnsafeArc::new(data), }
374     }
375
376     /**
377      * Access the underlying data mutably. Locks the rwlock in write mode;
378      * other readers and writers will block.
379      *
380      * # Failure
381      *
382      * Failing while inside the Arc will unlock the Arc while unwinding, so
383      * that other tasks won't block forever. As MutexArc.access, it will also
384      * poison the Arc, so subsequent readers and writers will both also fail.
385      */
386     #[inline]
387     pub fn write<U>(&self, blk: &fn(x: &mut T) -> U) -> U {
388         unsafe {
389             let state = self.x.get();
390             do (*borrow_rwlock(state)).write {
391                 check_poison(false, (*state).failed);
392                 let _z = PoisonOnFail(&mut (*state).failed);
393                 blk(&mut (*state).data)
394             }
395         }
396     }
397
398     /// As write(), but with a condvar, as sync::rwlock.write_cond().
399     #[inline]
400     pub fn write_cond<'x, 'c, U>(&self,
401                                  blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
402                                  -> U {
403         unsafe {
404             let state = self.x.get();
405             do (*borrow_rwlock(state)).write_cond |cond| {
406                 check_poison(false, (*state).failed);
407                 let _z = PoisonOnFail(&mut (*state).failed);
408                 blk(&mut (*state).data,
409                     &Condvar {is_mutex: false,
410                               failed: &mut (*state).failed,
411                               cond: cond})
412             }
413         }
414     }
415
416     /**
417      * Access the underlying data immutably. May run concurrently with other
418      * reading tasks.
419      *
420      * # Failure
421      *
422      * Failing will unlock the Arc while unwinding. However, unlike all other
423      * access modes, this will not poison the Arc.
424      */
425     pub fn read<U>(&self, blk: &fn(x: &T) -> U) -> U {
426         unsafe {
427             let state = self.x.get();
428             do (*state).lock.read {
429                 check_poison(false, (*state).failed);
430                 blk(&(*state).data)
431             }
432         }
433     }
434
435     /**
436      * As write(), but with the ability to atomically 'downgrade' the lock.
437      * See sync::rwlock.write_downgrade(). The RWWriteMode token must be used
438      * to obtain the &mut T, and can be transformed into a RWReadMode token by
439      * calling downgrade(), after which a &T can be obtained instead.
440      *
441      * # Example
442      *
443      * ~~~ {.rust}
444      * do arc.write_downgrade |mut write_token| {
445      *     do write_token.write_cond |state, condvar| {
446      *         ... exclusive access with mutable state ...
447      *     }
448      *     let read_token = arc.downgrade(write_token);
449      *     do read_token.read |state| {
450      *         ... shared access with immutable state ...
451      *     }
452      * }
453      * ~~~
454      */
455     pub fn write_downgrade<U>(&self, blk: &fn(v: RWWriteMode<T>) -> U) -> U {
456         unsafe {
457             let state = self.x.get();
458             do (*borrow_rwlock(state)).write_downgrade |write_mode| {
459                 check_poison(false, (*state).failed);
460                 blk(RWWriteMode {
461                     data: &mut (*state).data,
462                     token: write_mode,
463                     poison: PoisonOnFail(&mut (*state).failed)
464                 })
465             }
466         }
467     }
468
469     /// To be called inside of the write_downgrade block.
470     pub fn downgrade<'a>(&self, token: RWWriteMode<'a, T>)
471                          -> RWReadMode<'a, T> {
472         unsafe {
473             // The rwlock should assert that the token belongs to us for us.
474             let state = self.x.get();
475             let RWWriteMode {
476                 data: data,
477                 token: t,
478                 poison: _poison
479             } = token;
480             // Let readers in
481             let new_token = (*state).lock.downgrade(t);
482             // Whatever region the input reference had, it will be safe to use
483             // the same region for the output reference. (The only 'unsafe' part
484             // of this cast is removing the mutability.)
485             let new_data = cast::transmute_immut(data);
486             // Downgrade ensured the token belonged to us. Just a sanity check.
487             assert!(borrow::ref_eq(&(*state).data, new_data));
488             // Produce new token
489             RWReadMode {
490                 data: new_data,
491                 token: new_token,
492             }
493         }
494     }
495
496     /**
497      * Retrieves the data, blocking until all other references are dropped,
498      * exactly as arc::unwrap.
499      *
500      * Will additionally fail if another task has failed while accessing the arc
501      * in write mode.
502      */
503     pub fn unwrap(self) -> T {
504         let RWArc { x: x, _ } = self;
505         let inner = x.unwrap();
506         let RWArcInner { failed: failed, data: data, _ } = inner;
507         if failed {
508             fail!(~"Can't unwrap poisoned RWArc - another task failed inside!")
509         }
510         data
511     }
512 }
513
514 // Borrowck rightly complains about immutably aliasing the rwlock in order to
515 // lock it. This wraps the unsafety, with the justification that the 'lock'
516 // field is never overwritten; only 'failed' and 'data'.
517 #[doc(hidden)]
518 fn borrow_rwlock<T:Freeze + Send>(state: *mut RWArcInner<T>) -> *RWLock {
519     unsafe { cast::transmute(&(*state).lock) }
520 }
521
522 /// The "write permission" token used for RWArc.write_downgrade().
523 pub struct RWWriteMode<'self, T> {
524     data: &'self mut T,
525     token: sync::RWLockWriteMode<'self>,
526     poison: PoisonOnFail,
527 }
528
529 /// The "read permission" token used for RWArc.write_downgrade().
530 pub struct RWReadMode<'self, T> {
531     data: &'self T,
532     token: sync::RWLockReadMode<'self>,
533 }
534
535 impl<'self, T:Freeze + Send> RWWriteMode<'self, T> {
536     /// Access the pre-downgrade RWArc in write mode.
537     pub fn write<U>(&mut self, blk: &fn(x: &mut T) -> U) -> U {
538         match *self {
539             RWWriteMode {
540                 data: &ref mut data,
541                 token: ref token,
542                 poison: _
543             } => {
544                 do token.write {
545                     blk(data)
546                 }
547             }
548         }
549     }
550
551     /// Access the pre-downgrade RWArc in write mode with a condvar.
552     pub fn write_cond<'x, 'c, U>(&mut self,
553                                  blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
554                                  -> U {
555         match *self {
556             RWWriteMode {
557                 data: &ref mut data,
558                 token: ref token,
559                 poison: ref poison
560             } => {
561                 do token.write_cond |cond| {
562                     unsafe {
563                         let cvar = Condvar {
564                             is_mutex: false,
565                             failed: &mut *poison.failed,
566                             cond: cond
567                         };
568                         blk(data, &cvar)
569                     }
570                 }
571             }
572         }
573     }
574 }
575
576 impl<'self, T:Freeze + Send> RWReadMode<'self, T> {
577     /// Access the post-downgrade rwlock in read mode.
578     pub fn read<U>(&self, blk: &fn(x: &T) -> U) -> U {
579         match *self {
580             RWReadMode {
581                 data: data,
582                 token: ref token
583             } => {
584                 do token.read { blk(data) }
585             }
586         }
587     }
588 }
589
590 /****************************************************************************
591  * Tests
592  ****************************************************************************/
593
594 #[cfg(test)]
595 mod tests {
596
597     use arc::*;
598
599     use std::cell::Cell;
600     use std::comm;
601     use std::task;
602
603     #[test]
604     fn manually_share_arc() {
605         let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
606         let arc_v = Arc::new(v);
607
608         let (p, c) = comm::stream();
609
610         do task::spawn {
611             let arc_v: Arc<~[int]> = p.recv();
612
613             let v = arc_v.get().clone();
614             assert_eq!(v[3], 4);
615         };
616
617         c.send(arc_v.clone());
618
619         assert_eq!(arc_v.get()[2], 3);
620         assert_eq!(arc_v.get()[4], 5);
621
622         info!(arc_v);
623     }
624
625     #[test]
626     fn test_mutex_arc_condvar() {
627         let arc = ~MutexArc::new(false);
628         let arc2 = ~arc.clone();
629         let (p,c) = comm::oneshot();
630         let (c,p) = (Cell::new(c), Cell::new(p));
631         do task::spawn || {
632             // wait until parent gets in
633             p.take().recv();
634             do arc2.access_cond |state, cond| {
635                 *state = true;
636                 cond.signal();
637             }
638         }
639
640         do arc.access_cond |state, cond| {
641             c.take().send(());
642             assert!(!*state);
643             while !*state {
644                 cond.wait();
645             }
646         }
647     }
648
649     #[test] #[should_fail]
650     fn test_arc_condvar_poison() {
651         let arc = ~MutexArc::new(1);
652         let arc2 = ~arc.clone();
653         let (p, c) = comm::stream();
654
655         do task::spawn_unlinked || {
656             let _ = p.recv();
657             do arc2.access_cond |one, cond| {
658                 cond.signal();
659                 // Parent should fail when it wakes up.
660                 assert_eq!(*one, 0);
661             }
662         }
663
664         do arc.access_cond |one, cond| {
665             c.send(());
666             while *one == 1 {
667                 cond.wait();
668             }
669         }
670     }
671
672     #[test] #[should_fail]
673     fn test_mutex_arc_poison() {
674         let arc = ~MutexArc::new(1);
675         let arc2 = ~arc.clone();
676         do task::try || {
677             do arc2.access |one| {
678                 assert_eq!(*one, 2);
679             }
680         };
681         do arc.access |one| {
682             assert_eq!(*one, 1);
683         }
684     }
685
686     #[test] #[should_fail]
687     pub fn test_mutex_arc_unwrap_poison() {
688         let arc = MutexArc::new(1);
689         let arc2 = ~(&arc).clone();
690         let (p, c) = comm::stream();
691         do task::spawn {
692             do arc2.access |one| {
693                 c.send(());
694                 assert!(*one == 2);
695             }
696         }
697         let _ = p.recv();
698         let one = arc.unwrap();
699         assert!(one == 1);
700     }
701
702     #[test]
703     fn test_unsafe_mutex_arc_nested() {
704         unsafe {
705             // Tests nested mutexes and access
706             // to underlaying data.
707             let arc = ~MutexArc::new(1);
708             let arc2 = ~MutexArc::new(*arc);
709             do task::spawn || {
710                 do (*arc2).unsafe_access |mutex| {
711                     do (*mutex).access |one| {
712                         assert!(*one == 1);
713                     }
714                 }
715             };
716         }
717     }
718
719     #[test] #[should_fail]
720     fn test_rw_arc_poison_wr() {
721         let arc = RWArc::new(1);
722         let arc2 = arc.clone();
723         do task::try {
724             do arc2.write |one| {
725                 assert_eq!(*one, 2);
726             }
727         };
728         do arc.read |one| {
729             assert_eq!(*one, 1);
730         }
731     }
732
733     #[test] #[should_fail]
734     fn test_rw_arc_poison_ww() {
735         let arc = RWArc::new(1);
736         let arc2 = arc.clone();
737         do task::try {
738             do arc2.write |one| {
739                 assert_eq!(*one, 2);
740             }
741         };
742         do arc.write |one| {
743             assert_eq!(*one, 1);
744         }
745     }
746     #[test] #[should_fail]
747     fn test_rw_arc_poison_dw() {
748         let arc = RWArc::new(1);
749         let arc2 = arc.clone();
750         do task::try {
751             do arc2.write_downgrade |mut write_mode| {
752                 do write_mode.write |one| {
753                     assert_eq!(*one, 2);
754                 }
755             }
756         };
757         do arc.write |one| {
758             assert_eq!(*one, 1);
759         }
760     }
761     #[test]
762     fn test_rw_arc_no_poison_rr() {
763         let arc = RWArc::new(1);
764         let arc2 = arc.clone();
765         do task::try {
766             do arc2.read |one| {
767                 assert_eq!(*one, 2);
768             }
769         };
770         do arc.read |one| {
771             assert_eq!(*one, 1);
772         }
773     }
774     #[test]
775     fn test_rw_arc_no_poison_rw() {
776         let arc = RWArc::new(1);
777         let arc2 = arc.clone();
778         do task::try {
779             do arc2.read |one| {
780                 assert_eq!(*one, 2);
781             }
782         };
783         do arc.write |one| {
784             assert_eq!(*one, 1);
785         }
786     }
787     #[test]
788     fn test_rw_arc_no_poison_dr() {
789         let arc = RWArc::new(1);
790         let arc2 = arc.clone();
791         do task::try {
792             do arc2.write_downgrade |write_mode| {
793                 let read_mode = arc2.downgrade(write_mode);
794                 do read_mode.read |one| {
795                     assert_eq!(*one, 2);
796                 }
797             }
798         };
799         do arc.write |one| {
800             assert_eq!(*one, 1);
801         }
802     }
803     #[test]
804     fn test_rw_arc() {
805         let arc = RWArc::new(0);
806         let arc2 = arc.clone();
807         let (p, c) = comm::stream();
808
809         do task::spawn {
810             do arc2.write |num| {
811                 do 10.times {
812                     let tmp = *num;
813                     *num = -1;
814                     task::deschedule();
815                     *num = tmp + 1;
816                 }
817                 c.send(());
818             }
819         }
820
821         // Readers try to catch the writer in the act
822         let mut children = ~[];
823         do 5.times {
824             let arc3 = arc.clone();
825             let mut builder = task::task();
826             builder.future_result(|r| children.push(r));
827             do builder.spawn {
828                 do arc3.read |num| {
829                     assert!(*num >= 0);
830                 }
831             }
832         }
833
834         // Wait for children to pass their asserts
835         for r in children.iter() {
836             r.recv();
837         }
838
839         // Wait for writer to finish
840         p.recv();
841         do arc.read |num| {
842             assert_eq!(*num, 10);
843         }
844     }
845     #[test]
846     fn test_rw_downgrade() {
847         // (1) A downgrader gets in write mode and does cond.wait.
848         // (2) A writer gets in write mode, sets state to 42, and does signal.
849         // (3) Downgrader wakes, sets state to 31337.
850         // (4) tells writer and all other readers to contend as it downgrades.
851         // (5) Writer attempts to set state back to 42, while downgraded task
852         //     and all reader tasks assert that it's 31337.
853         let arc = RWArc::new(0);
854
855         // Reader tasks
856         let mut reader_convos = ~[];
857         do 10.times {
858             let ((rp1, rc1), (rp2, rc2)) = (comm::stream(), comm::stream());
859             reader_convos.push((rc1, rp2));
860             let arcn = arc.clone();
861             do task::spawn {
862                 rp1.recv(); // wait for downgrader to give go-ahead
863                 do arcn.read |state| {
864                     assert_eq!(*state, 31337);
865                     rc2.send(());
866                 }
867             }
868         }
869
870         // Writer task
871         let arc2 = arc.clone();
872         let ((wp1, wc1), (wp2, wc2)) = (comm::stream(), comm::stream());
873         do task::spawn || {
874             wp1.recv();
875             do arc2.write_cond |state, cond| {
876                 assert_eq!(*state, 0);
877                 *state = 42;
878                 cond.signal();
879             }
880             wp1.recv();
881             do arc2.write |state| {
882                 // This shouldn't happen until after the downgrade read
883                 // section, and all other readers, finish.
884                 assert_eq!(*state, 31337);
885                 *state = 42;
886             }
887             wc2.send(());
888         }
889
890         // Downgrader (us)
891         do arc.write_downgrade |mut write_mode| {
892             do write_mode.write_cond |state, cond| {
893                 wc1.send(()); // send to another writer who will wake us up
894                 while *state == 0 {
895                     cond.wait();
896                 }
897                 assert_eq!(*state, 42);
898                 *state = 31337;
899                 // send to other readers
900                 for &(ref rc, _) in reader_convos.iter() {
901                     rc.send(())
902                 }
903             }
904             let read_mode = arc.downgrade(write_mode);
905             do read_mode.read |state| {
906                 // complete handshake with other readers
907                 for &(_, ref rp) in reader_convos.iter() {
908                     rp.recv()
909                 }
910                 wc1.send(()); // tell writer to try again
911                 assert_eq!(*state, 31337);
912             }
913         }
914
915         wp2.recv(); // complete handshake with writer
916     }
917     #[cfg(test)]
918     fn test_rw_write_cond_downgrade_read_race_helper() {
919         // Tests that when a downgrader hands off the "reader cloud" lock
920         // because of a contending reader, a writer can't race to get it
921         // instead, which would result in readers_and_writers. This tests
922         // the sync module rather than this one, but it's here because an
923         // rwarc gives us extra shared state to help check for the race.
924         // If you want to see this test fail, go to sync.rs and replace the
925         // line in RWLock::write_cond() that looks like:
926         //     "blk(&Condvar { order: opt_lock, ..*cond })"
927         // with just "blk(cond)".
928         let x = RWArc::new(true);
929         let (wp, wc) = comm::stream();
930
931         // writer task
932         let xw = x.clone();
933         do task::spawn {
934             do xw.write_cond |state, c| {
935                 wc.send(()); // tell downgrader it's ok to go
936                 c.wait();
937                 // The core of the test is here: the condvar reacquire path
938                 // must involve order_lock, so that it cannot race with a reader
939                 // trying to receive the "reader cloud lock hand-off".
940                 *state = false;
941             }
942         }
943
944         wp.recv(); // wait for writer to get in
945
946         do x.write_downgrade |mut write_mode| {
947             do write_mode.write_cond |state, c| {
948                 assert!(*state);
949                 // make writer contend in the cond-reacquire path
950                 c.signal();
951             }
952             // make a reader task to trigger the "reader cloud lock" handoff
953             let xr = x.clone();
954             let (rp, rc) = comm::stream();
955             do task::spawn {
956                 rc.send(());
957                 do xr.read |_state| { }
958             }
959             rp.recv(); // wait for reader task to exist
960
961             let read_mode = x.downgrade(write_mode);
962             do read_mode.read |state| {
963                 // if writer mistakenly got in, make sure it mutates state
964                 // before we assert on it
965                 do 5.times { task::deschedule(); }
966                 // make sure writer didn't get in.
967                 assert!(*state);
968             }
969         }
970     }
971     #[test]
972     fn test_rw_write_cond_downgrade_read_race() {
973         // Ideally the above test case would have deschedule statements in it that
974         // helped to expose the race nearly 100% of the time... but adding
975         // deschedules in the intuitively-right locations made it even less likely,
976         // and I wasn't sure why :( . This is a mediocre "next best" option.
977         do 8.times { test_rw_write_cond_downgrade_read_race_helper() }
978     }
979 }