]> git.lizzy.rs Git - rust.git/blob - src/libextra/arc.rs
cb4468f48ecb1473b65c3805ff15e22a2581c69f
[rust.git] / src / libextra / arc.rs
1 // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*!
12  * Concurrency-enabled mechanisms for sharing mutable and/or immutable state
13  * between tasks.
14  *
15  * # Example
16  *
17  * In this example, a large vector of floats is shared between several tasks.
18  * With simple pipes, without Arc, a copy would have to be made for each task.
19  *
20  * ~~~ {.rust}
21  * extern mod std;
22  * use extra::arc;
23  * let numbers=vec::from_fn(100, |ind| (ind as float)*rand::random());
24  * let shared_numbers=arc::Arc::new(numbers);
25  *
26  *   do 10.times {
27  *       let (port, chan)  = stream();
28  *       chan.send(shared_numbers.clone());
29  *
30  *       do spawn {
31  *           let shared_numbers=port.recv();
32  *           let local_numbers=shared_numbers.get();
33  *
34  *           // Work with the local numbers
35  *       }
36  *   }
37  * ~~~
38  */
39
40 #[allow(missing_doc)];
41
42
43 use sync;
44 use sync::{Mutex, RWLock};
45
46 use std::cast;
47 use std::unstable::sync::UnsafeAtomicRcBox;
48 use std::task;
49 use std::borrow;
50
51 /// As sync::condvar, a mechanism for unlock-and-descheduling and signaling.
52 pub struct Condvar<'self> {
53     priv is_mutex: bool,
54     priv failed: &'self mut bool,
55     priv cond: &'self sync::Condvar<'self>
56 }
57
58 impl<'self> Condvar<'self> {
59     /// Atomically exit the associated Arc and block until a signal is sent.
60     #[inline]
61     pub fn wait(&self) { self.wait_on(0) }
62
63     /**
64      * Atomically exit the associated Arc and block on a specified condvar
65      * until a signal is sent on that same condvar (as sync::cond.wait_on).
66      *
67      * wait() is equivalent to wait_on(0).
68      */
69     #[inline]
70     pub fn wait_on(&self, condvar_id: uint) {
71         assert!(!*self.failed);
72         self.cond.wait_on(condvar_id);
73         // This is why we need to wrap sync::condvar.
74         check_poison(self.is_mutex, *self.failed);
75     }
76
77     /// Wake up a blocked task. Returns false if there was no blocked task.
78     #[inline]
79     pub fn signal(&self) -> bool { self.signal_on(0) }
80
81     /**
82      * Wake up a blocked task on a specified condvar (as
83      * sync::cond.signal_on). Returns false if there was no blocked task.
84      */
85     #[inline]
86     pub fn signal_on(&self, condvar_id: uint) -> bool {
87         assert!(!*self.failed);
88         self.cond.signal_on(condvar_id)
89     }
90
91     /// Wake up all blocked tasks. Returns the number of tasks woken.
92     #[inline]
93     pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
94
95     /**
96      * Wake up all blocked tasks on a specified condvar (as
97      * sync::cond.broadcast_on). Returns the number of tasks woken.
98      */
99     #[inline]
100     pub fn broadcast_on(&self, condvar_id: uint) -> uint {
101         assert!(!*self.failed);
102         self.cond.broadcast_on(condvar_id)
103     }
104 }
105
106 /****************************************************************************
107  * Immutable Arc
108  ****************************************************************************/
109
110 /// An atomically reference counted wrapper for shared immutable state.
111 pub struct Arc<T> { priv x: UnsafeAtomicRcBox<T> }
112
113
114 /**
115  * Access the underlying data in an atomically reference counted
116  * wrapper.
117  */
118 impl<T:Freeze+Send> Arc<T> {
119     /// Create an atomically reference counted wrapper.
120     pub fn new(data: T) -> Arc<T> {
121         Arc { x: UnsafeAtomicRcBox::new(data) }
122     }
123
124     pub fn get<'a>(&'a self) -> &'a T {
125         unsafe { &*self.x.get_immut() }
126     }
127
128     /**
129      * Retrieve the data back out of the Arc. This function blocks until the
130      * reference given to it is the last existing one, and then unwrap the data
131      * instead of destroying it.
132      *
133      * If multiple tasks call unwrap, all but the first will fail. Do not call
134      * unwrap from a task that holds another reference to the same Arc; it is
135      * guaranteed to deadlock.
136      */
137     pub fn unwrap(self) -> T {
138         let Arc { x: x } = self;
139         x.unwrap()
140     }
141 }
142
143 impl<T:Freeze + Send> Clone for Arc<T> {
144     /**
145     * Duplicate an atomically reference counted wrapper.
146     *
147     * The resulting two `arc` objects will point to the same underlying data
148     * object. However, one of the `arc` objects can be sent to another task,
149     * allowing them to share the underlying data.
150     */
151     fn clone(&self) -> Arc<T> {
152         Arc { x: self.x.clone() }
153     }
154 }
155
156 /****************************************************************************
157  * Mutex protected Arc (unsafe)
158  ****************************************************************************/
159
160 #[doc(hidden)]
161 struct MutexArcInner<T> { priv lock: Mutex, priv failed: bool, priv data: T }
162 /// An Arc with mutable data protected by a blocking mutex.
163 struct MutexArc<T> { priv x: UnsafeAtomicRcBox<MutexArcInner<T>> }
164
165
166 impl<T:Send> Clone for MutexArc<T> {
167     /// Duplicate a mutex-protected Arc. See arc::clone for more details.
168     fn clone(&self) -> MutexArc<T> {
169         // NB: Cloning the underlying mutex is not necessary. Its reference
170         // count would be exactly the same as the shared state's.
171         MutexArc { x: self.x.clone() }
172     }
173 }
174
175 impl<T:Send> MutexArc<T> {
176     /// Create a mutex-protected Arc with the supplied data.
177     pub fn new(user_data: T) -> MutexArc<T> {
178         MutexArc::new_with_condvars(user_data, 1)
179     }
180
181     /**
182      * Create a mutex-protected Arc with the supplied data and a specified number
183      * of condvars (as sync::Mutex::new_with_condvars).
184      */
185     pub fn new_with_condvars(user_data: T, num_condvars: uint) -> MutexArc<T> {
186         let data = MutexArcInner {
187             lock: Mutex::new_with_condvars(num_condvars),
188             failed: false, data: user_data
189         };
190         MutexArc { x: UnsafeAtomicRcBox::new(data) }
191     }
192
193     /**
194      * Access the underlying mutable data with mutual exclusion from other
195      * tasks. The argument closure will be run with the mutex locked; all
196      * other tasks wishing to access the data will block until the closure
197      * finishes running.
198      *
199      * The reason this function is 'unsafe' is because it is possible to
200      * construct a circular reference among multiple Arcs by mutating the
201      * underlying data. This creates potential for deadlock, but worse, this
202      * will guarantee a memory leak of all involved Arcs. Using mutex Arcs
203      * inside of other Arcs is safe in absence of circular references.
204      *
205      * If you wish to nest mutex_arcs, one strategy for ensuring safety at
206      * runtime is to add a "nesting level counter" inside the stored data, and
207      * when traversing the arcs, assert that they monotonically decrease.
208      *
209      * # Failure
210      *
211      * Failing while inside the Arc will unlock the Arc while unwinding, so
212      * that other tasks won't block forever. It will also poison the Arc:
213      * any tasks that subsequently try to access it (including those already
214      * blocked on the mutex) will also fail immediately.
215      */
216     #[inline]
217     pub unsafe fn access<U>(&self, blk: &fn(x: &mut T) -> U) -> U {
218         let state = self.x.get();
219         // Borrowck would complain about this if the function were
220         // not already unsafe. See borrow_rwlock, far below.
221         do (&(*state).lock).lock {
222             check_poison(true, (*state).failed);
223             let _z = PoisonOnFail(&mut (*state).failed);
224             blk(&mut (*state).data)
225         }
226     }
227
228     /// As access(), but with a condvar, as sync::mutex.lock_cond().
229     #[inline]
230     pub unsafe fn access_cond<'x, 'c, U>(&self,
231                                          blk: &fn(x: &'x mut T,
232                                                   c: &'c Condvar) -> U)
233                                          -> U {
234         let state = self.x.get();
235         do (&(*state).lock).lock_cond |cond| {
236             check_poison(true, (*state).failed);
237             let _z = PoisonOnFail(&mut (*state).failed);
238             blk(&mut (*state).data,
239                 &Condvar {is_mutex: true,
240                           failed: &mut (*state).failed,
241                           cond: cond })
242         }
243     }
244
245     /**
246      * Retrieves the data, blocking until all other references are dropped,
247      * exactly as arc::unwrap.
248      *
249      * Will additionally fail if another task has failed while accessing the arc.
250      */
251     pub fn unwrap(self) -> T {
252         let MutexArc { x: x } = self;
253         let inner = x.unwrap();
254         let MutexArcInner { failed: failed, data: data, _ } = inner;
255         if failed {
256             fail!(~"Can't unwrap poisoned MutexArc - another task failed inside!");
257         }
258         data
259     }
260 }
261
262 // Common code for {mutex.access,rwlock.write}{,_cond}.
263 #[inline]
264 #[doc(hidden)]
265 fn check_poison(is_mutex: bool, failed: bool) {
266     if failed {
267         if is_mutex {
268             fail!("Poisoned MutexArc - another task failed inside!");
269         } else {
270             fail!("Poisoned rw_arc - another task failed inside!");
271         }
272     }
273 }
274
275 #[doc(hidden)]
276 struct PoisonOnFail {
277     failed: *mut bool,
278 }
279
280 impl Drop for PoisonOnFail {
281     fn drop(&self) {
282         unsafe {
283             /* assert!(!*self.failed);
284                -- might be false in case of cond.wait() */
285             if task::failing() {
286                 *self.failed = true;
287             }
288         }
289     }
290 }
291
292 fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail {
293     PoisonOnFail {
294         failed: failed
295     }
296 }
297
298 /****************************************************************************
299  * R/W lock protected Arc
300  ****************************************************************************/
301
302 #[doc(hidden)]
303 struct RWArcInner<T> { priv lock: RWLock, priv failed: bool, priv data: T }
304 /**
305  * A dual-mode Arc protected by a reader-writer lock. The data can be accessed
306  * mutably or immutably, and immutably-accessing tasks may run concurrently.
307  *
308  * Unlike mutex_arcs, rw_arcs are safe, because they cannot be nested.
309  */
310 #[no_freeze]
311 struct RWArc<T> {
312     priv x: UnsafeAtomicRcBox<RWArcInner<T>>,
313 }
314
315 impl<T:Freeze + Send> Clone for RWArc<T> {
316     /// Duplicate a rwlock-protected Arc. See arc::clone for more details.
317     fn clone(&self) -> RWArc<T> {
318         RWArc { x: self.x.clone() }
319     }
320
321 }
322
323 impl<T:Freeze + Send> RWArc<T> {
324     /// Create a reader/writer Arc with the supplied data.
325     pub fn new(user_data: T) -> RWArc<T> {
326         RWArc::new_with_condvars(user_data, 1)
327     }
328
329     /**
330      * Create a reader/writer Arc with the supplied data and a specified number
331      * of condvars (as sync::RWLock::new_with_condvars).
332      */
333     pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWArc<T> {
334         let data = RWArcInner {
335             lock: RWLock::new_with_condvars(num_condvars),
336             failed: false, data: user_data
337         };
338         RWArc { x: UnsafeAtomicRcBox::new(data), }
339     }
340
341     /**
342      * Access the underlying data mutably. Locks the rwlock in write mode;
343      * other readers and writers will block.
344      *
345      * # Failure
346      *
347      * Failing while inside the Arc will unlock the Arc while unwinding, so
348      * that other tasks won't block forever. As MutexArc.access, it will also
349      * poison the Arc, so subsequent readers and writers will both also fail.
350      */
351     #[inline]
352     pub fn write<U>(&self, blk: &fn(x: &mut T) -> U) -> U {
353         unsafe {
354             let state = self.x.get();
355             do (*borrow_rwlock(state)).write {
356                 check_poison(false, (*state).failed);
357                 let _z = PoisonOnFail(&mut (*state).failed);
358                 blk(&mut (*state).data)
359             }
360         }
361     }
362
363     /// As write(), but with a condvar, as sync::rwlock.write_cond().
364     #[inline]
365     pub fn write_cond<'x, 'c, U>(&self,
366                                  blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
367                                  -> U {
368         unsafe {
369             let state = self.x.get();
370             do (*borrow_rwlock(state)).write_cond |cond| {
371                 check_poison(false, (*state).failed);
372                 let _z = PoisonOnFail(&mut (*state).failed);
373                 blk(&mut (*state).data,
374                     &Condvar {is_mutex: false,
375                               failed: &mut (*state).failed,
376                               cond: cond})
377             }
378         }
379     }
380
381     /**
382      * Access the underlying data immutably. May run concurrently with other
383      * reading tasks.
384      *
385      * # Failure
386      *
387      * Failing will unlock the Arc while unwinding. However, unlike all other
388      * access modes, this will not poison the Arc.
389      */
390     pub fn read<U>(&self, blk: &fn(x: &T) -> U) -> U {
391         unsafe {
392             let state = self.x.get();
393             do (*state).lock.read {
394                 check_poison(false, (*state).failed);
395                 blk(&(*state).data)
396             }
397         }
398     }
399
400     /**
401      * As write(), but with the ability to atomically 'downgrade' the lock.
402      * See sync::rwlock.write_downgrade(). The RWWriteMode token must be used
403      * to obtain the &mut T, and can be transformed into a RWReadMode token by
404      * calling downgrade(), after which a &T can be obtained instead.
405      *
406      * # Example
407      *
408      * ~~~ {.rust}
409      * do arc.write_downgrade |mut write_token| {
410      *     do write_token.write_cond |state, condvar| {
411      *         ... exclusive access with mutable state ...
412      *     }
413      *     let read_token = arc.downgrade(write_token);
414      *     do read_token.read |state| {
415      *         ... shared access with immutable state ...
416      *     }
417      * }
418      * ~~~
419      */
420     pub fn write_downgrade<U>(&self, blk: &fn(v: RWWriteMode<T>) -> U) -> U {
421         unsafe {
422             let state = self.x.get();
423             do (*borrow_rwlock(state)).write_downgrade |write_mode| {
424                 check_poison(false, (*state).failed);
425                 blk(RWWriteMode {
426                     data: &mut (*state).data,
427                     token: write_mode,
428                     poison: PoisonOnFail(&mut (*state).failed)
429                 })
430             }
431         }
432     }
433
434     /// To be called inside of the write_downgrade block.
435     pub fn downgrade<'a>(&self, token: RWWriteMode<'a, T>)
436                          -> RWReadMode<'a, T> {
437         unsafe {
438             // The rwlock should assert that the token belongs to us for us.
439             let state = self.x.get();
440             let RWWriteMode {
441                 data: data,
442                 token: t,
443                 poison: _poison
444             } = token;
445             // Let readers in
446             let new_token = (*state).lock.downgrade(t);
447             // Whatever region the input reference had, it will be safe to use
448             // the same region for the output reference. (The only 'unsafe' part
449             // of this cast is removing the mutability.)
450             let new_data = cast::transmute_immut(data);
451             // Downgrade ensured the token belonged to us. Just a sanity check.
452             assert!(borrow::ref_eq(&(*state).data, new_data));
453             // Produce new token
454             RWReadMode {
455                 data: new_data,
456                 token: new_token,
457             }
458         }
459     }
460
461     /**
462      * Retrieves the data, blocking until all other references are dropped,
463      * exactly as arc::unwrap.
464      *
465      * Will additionally fail if another task has failed while accessing the arc
466      * in write mode.
467      */
468     pub fn unwrap(self) -> T {
469         let RWArc { x: x, _ } = self;
470         let inner = x.unwrap();
471         let RWArcInner { failed: failed, data: data, _ } = inner;
472         if failed {
473             fail!(~"Can't unwrap poisoned RWArc - another task failed inside!")
474         }
475         data
476     }
477 }
478
479 // Borrowck rightly complains about immutably aliasing the rwlock in order to
480 // lock it. This wraps the unsafety, with the justification that the 'lock'
481 // field is never overwritten; only 'failed' and 'data'.
482 #[doc(hidden)]
483 fn borrow_rwlock<T:Freeze + Send>(state: *mut RWArcInner<T>) -> *RWLock {
484     unsafe { cast::transmute(&(*state).lock) }
485 }
486
487 /// The "write permission" token used for RWArc.write_downgrade().
488 pub struct RWWriteMode<'self, T> {
489     data: &'self mut T,
490     token: sync::RWLockWriteMode<'self>,
491     poison: PoisonOnFail,
492 }
493
494 /// The "read permission" token used for RWArc.write_downgrade().
495 pub struct RWReadMode<'self, T> {
496     data: &'self T,
497     token: sync::RWLockReadMode<'self>,
498 }
499
500 impl<'self, T:Freeze + Send> RWWriteMode<'self, T> {
501     /// Access the pre-downgrade RWArc in write mode.
502     pub fn write<U>(&mut self, blk: &fn(x: &mut T) -> U) -> U {
503         match *self {
504             RWWriteMode {
505                 data: &ref mut data,
506                 token: ref token,
507                 poison: _
508             } => {
509                 do token.write {
510                     blk(data)
511                 }
512             }
513         }
514     }
515
516     /// Access the pre-downgrade RWArc in write mode with a condvar.
517     pub fn write_cond<'x, 'c, U>(&mut self,
518                                  blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
519                                  -> U {
520         match *self {
521             RWWriteMode {
522                 data: &ref mut data,
523                 token: ref token,
524                 poison: ref poison
525             } => {
526                 do token.write_cond |cond| {
527                     unsafe {
528                         let cvar = Condvar {
529                             is_mutex: false,
530                             failed: &mut *poison.failed,
531                             cond: cond
532                         };
533                         blk(data, &cvar)
534                     }
535                 }
536             }
537         }
538     }
539 }
540
541 impl<'self, T:Freeze + Send> RWReadMode<'self, T> {
542     /// Access the post-downgrade rwlock in read mode.
543     pub fn read<U>(&self, blk: &fn(x: &T) -> U) -> U {
544         match *self {
545             RWReadMode {
546                 data: data,
547                 token: ref token
548             } => {
549                 do token.read { blk(data) }
550             }
551         }
552     }
553 }
554
555 /****************************************************************************
556  * Tests
557  ****************************************************************************/
558
559 #[cfg(test)]
560 mod tests {
561
562     use arc::*;
563
564     use std::cell::Cell;
565     use std::comm;
566     use std::task;
567
568     #[test]
569     fn manually_share_arc() {
570         let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
571         let arc_v = Arc::new(v);
572
573         let (p, c) = comm::stream();
574
575         do task::spawn() || {
576             let arc_v : Arc<~[int]> = p.recv();
577
578             let v = (*arc_v.get()).clone();
579             assert_eq!(v[3], 4);
580         };
581
582         c.send(arc_v.clone());
583
584         assert_eq!(arc_v.get()[2], 3);
585         assert_eq!(arc_v.get()[4], 5);
586
587         info!(arc_v);
588     }
589
590     #[test]
591     fn test_mutex_arc_condvar() {
592         unsafe {
593             let arc = ~MutexArc::new(false);
594             let arc2 = ~arc.clone();
595             let (p,c) = comm::oneshot();
596             let (c,p) = (Cell::new(c), Cell::new(p));
597             do task::spawn || {
598                 // wait until parent gets in
599                 comm::recv_one(p.take());
600                 do arc2.access_cond |state, cond| {
601                     *state = true;
602                     cond.signal();
603                 }
604             }
605             do arc.access_cond |state, cond| {
606                 comm::send_one(c.take(), ());
607                 assert!(!*state);
608                 while !*state {
609                     cond.wait();
610                 }
611             }
612         }
613     }
614     #[test] #[should_fail] #[ignore(cfg(windows))]
615     fn test_arc_condvar_poison() {
616         unsafe {
617             let arc = ~MutexArc::new(1);
618             let arc2 = ~arc.clone();
619             let (p, c) = comm::stream();
620
621             do task::spawn_unlinked || {
622                 let _ = p.recv();
623                 do arc2.access_cond |one, cond| {
624                     cond.signal();
625                     // Parent should fail when it wakes up.
626                     assert_eq!(*one, 0);
627                 }
628             }
629
630             do arc.access_cond |one, cond| {
631                 c.send(());
632                 while *one == 1 {
633                     cond.wait();
634                 }
635             }
636         }
637     }
638     #[test] #[should_fail] #[ignore(cfg(windows))]
639     fn test_mutex_arc_poison() {
640         unsafe {
641             let arc = ~MutexArc::new(1);
642             let arc2 = ~arc.clone();
643             do task::try || {
644                 do arc2.access |one| {
645                     assert_eq!(*one, 2);
646                 }
647             };
648             do arc.access |one| {
649                 assert_eq!(*one, 1);
650             }
651         }
652     }
653     #[test] #[should_fail] #[ignore(cfg(windows))]
654     pub fn test_mutex_arc_unwrap_poison() {
655         let arc = MutexArc::new(1);
656         let arc2 = ~(&arc).clone();
657         let (p, c) = comm::stream();
658         do task::spawn {
659             unsafe {
660                 do arc2.access |one| {
661                     c.send(());
662                     assert!(*one == 2);
663                 }
664             }
665         }
666         let _ = p.recv();
667         let one = arc.unwrap();
668         assert!(one == 1);
669     }
670     #[test] #[should_fail] #[ignore(cfg(windows))]
671     fn test_rw_arc_poison_wr() {
672         let arc = ~RWArc::new(1);
673         let arc2 = (*arc).clone();
674         do task::try || {
675             do arc2.write |one| {
676                 assert_eq!(*one, 2);
677             }
678         };
679         do arc.read |one| {
680             assert_eq!(*one, 1);
681         }
682     }
683     #[test] #[should_fail] #[ignore(cfg(windows))]
684     fn test_rw_arc_poison_ww() {
685         let arc = ~RWArc::new(1);
686         let arc2 = (*arc).clone();
687         do task::try || {
688             do arc2.write |one| {
689                 assert_eq!(*one, 2);
690             }
691         };
692         do arc.write |one| {
693             assert_eq!(*one, 1);
694         }
695     }
696     #[test] #[should_fail] #[ignore(cfg(windows))]
697     fn test_rw_arc_poison_dw() {
698         let arc = ~RWArc::new(1);
699         let arc2 = (*arc).clone();
700         do task::try || {
701             do arc2.write_downgrade |mut write_mode| {
702                 do write_mode.write |one| {
703                     assert_eq!(*one, 2);
704                 }
705             }
706         };
707         do arc.write |one| {
708             assert_eq!(*one, 1);
709         }
710     }
711     #[test] #[ignore(cfg(windows))]
712     fn test_rw_arc_no_poison_rr() {
713         let arc = ~RWArc::new(1);
714         let arc2 = (*arc).clone();
715         do task::try || {
716             do arc2.read |one| {
717                 assert_eq!(*one, 2);
718             }
719         };
720         do arc.read |one| {
721             assert_eq!(*one, 1);
722         }
723     }
724     #[test] #[ignore(cfg(windows))]
725     fn test_rw_arc_no_poison_rw() {
726         let arc = ~RWArc::new(1);
727         let arc2 = (*arc).clone();
728         do task::try || {
729             do arc2.read |one| {
730                 assert_eq!(*one, 2);
731             }
732         };
733         do arc.write |one| {
734             assert_eq!(*one, 1);
735         }
736     }
737     #[test] #[ignore(cfg(windows))]
738     fn test_rw_arc_no_poison_dr() {
739         let arc = ~RWArc::new(1);
740         let arc2 = (*arc).clone();
741         do task::try || {
742             do arc2.write_downgrade |write_mode| {
743                 let read_mode = arc2.downgrade(write_mode);
744                 do (&read_mode).read |one| {
745                     assert_eq!(*one, 2);
746                 }
747             }
748         };
749         do arc.write |one| {
750             assert_eq!(*one, 1);
751         }
752     }
753     #[test]
754     fn test_rw_arc() {
755         let arc = ~RWArc::new(0);
756         let arc2 = (*arc).clone();
757         let (p,c) = comm::stream();
758
759         do task::spawn || {
760             do arc2.write |num| {
761                 do 10.times {
762                     let tmp = *num;
763                     *num = -1;
764                     task::yield();
765                     *num = tmp + 1;
766                 }
767                 c.send(());
768             }
769         }
770
771         // Readers try to catch the writer in the act
772         let mut children = ~[];
773         do 5.times {
774             let arc3 = (*arc).clone();
775             let mut builder = task::task();
776             builder.future_result(|r| children.push(r));
777             do builder.spawn {
778                 do arc3.read |num| {
779                     assert!(*num >= 0);
780                 }
781             }
782         }
783
784         // Wait for children to pass their asserts
785         for r in children.iter() {
786             r.recv();
787         }
788
789         // Wait for writer to finish
790         p.recv();
791         do arc.read |num| {
792             assert_eq!(*num, 10);
793         }
794     }
795     #[test]
796     fn test_rw_downgrade() {
797         // (1) A downgrader gets in write mode and does cond.wait.
798         // (2) A writer gets in write mode, sets state to 42, and does signal.
799         // (3) Downgrader wakes, sets state to 31337.
800         // (4) tells writer and all other readers to contend as it downgrades.
801         // (5) Writer attempts to set state back to 42, while downgraded task
802         //     and all reader tasks assert that it's 31337.
803         let arc = ~RWArc::new(0);
804
805         // Reader tasks
806         let mut reader_convos = ~[];
807         do 10.times {
808             let ((rp1,rc1),(rp2,rc2)) = (comm::stream(),comm::stream());
809             reader_convos.push((rc1, rp2));
810             let arcn = (*arc).clone();
811             do task::spawn || {
812                 rp1.recv(); // wait for downgrader to give go-ahead
813                 do arcn.read |state| {
814                     assert_eq!(*state, 31337);
815                     rc2.send(());
816                 }
817             }
818         }
819
820         // Writer task
821         let arc2 = (*arc).clone();
822         let ((wp1,wc1),(wp2,wc2)) = (comm::stream(),comm::stream());
823         do task::spawn || {
824             wp1.recv();
825             do arc2.write_cond |state, cond| {
826                 assert_eq!(*state, 0);
827                 *state = 42;
828                 cond.signal();
829             }
830             wp1.recv();
831             do arc2.write |state| {
832                 // This shouldn't happen until after the downgrade read
833                 // section, and all other readers, finish.
834                 assert_eq!(*state, 31337);
835                 *state = 42;
836             }
837             wc2.send(());
838         }
839
840         // Downgrader (us)
841         do arc.write_downgrade |mut write_mode| {
842             do write_mode.write_cond |state, cond| {
843                 wc1.send(()); // send to another writer who will wake us up
844                 while *state == 0 {
845                     cond.wait();
846                 }
847                 assert_eq!(*state, 42);
848                 *state = 31337;
849                 // FIXME: #7372: hits type inference bug with iterators
850                 // send to other readers
851                 for i in range(0u, reader_convos.len()) {
852                     match reader_convos[i] {
853                         (ref rc, _) => rc.send(()),
854                     }
855                 }
856             }
857             let read_mode = arc.downgrade(write_mode);
858             do (&read_mode).read |state| {
859                 // FIXME: #7372: hits type inference bug with iterators
860                 // complete handshake with other readers
861                 for i in range(0u, reader_convos.len()) {
862                     match reader_convos[i] {
863                         (_, ref rp) => rp.recv(),
864                     }
865                 }
866                 wc1.send(()); // tell writer to try again
867                 assert_eq!(*state, 31337);
868             }
869         }
870
871         wp2.recv(); // complete handshake with writer
872     }
873     #[cfg(test)]
874     fn test_rw_write_cond_downgrade_read_race_helper() {
875         // Tests that when a downgrader hands off the "reader cloud" lock
876         // because of a contending reader, a writer can't race to get it
877         // instead, which would result in readers_and_writers. This tests
878         // the sync module rather than this one, but it's here because an
879         // rwarc gives us extra shared state to help check for the race.
880         // If you want to see this test fail, go to sync.rs and replace the
881         // line in RWLock::write_cond() that looks like:
882         //     "blk(&Condvar { order: opt_lock, ..*cond })"
883         // with just "blk(cond)".
884         let x = ~RWArc::new(true);
885         let (wp, wc) = comm::stream();
886
887         // writer task
888         let xw = (*x).clone();
889         do task::spawn {
890             do xw.write_cond |state, c| {
891                 wc.send(()); // tell downgrader it's ok to go
892                 c.wait();
893                 // The core of the test is here: the condvar reacquire path
894                 // must involve order_lock, so that it cannot race with a reader
895                 // trying to receive the "reader cloud lock hand-off".
896                 *state = false;
897             }
898         }
899
900         wp.recv(); // wait for writer to get in
901
902         do x.write_downgrade |mut write_mode| {
903             do write_mode.write_cond |state, c| {
904                 assert!(*state);
905                 // make writer contend in the cond-reacquire path
906                 c.signal();
907             }
908             // make a reader task to trigger the "reader cloud lock" handoff
909             let xr = (*x).clone();
910             let (rp, rc) = comm::stream();
911             do task::spawn {
912                 rc.send(());
913                 do xr.read |_state| { }
914             }
915             rp.recv(); // wait for reader task to exist
916
917             let read_mode = x.downgrade(write_mode);
918             do read_mode.read |state| {
919                 // if writer mistakenly got in, make sure it mutates state
920                 // before we assert on it
921                 do 5.times { task::yield(); }
922                 // make sure writer didn't get in.
923                 assert!(*state);
924             }
925         }
926     }
927     #[test]
928     fn test_rw_write_cond_downgrade_read_race() {
929         // Ideally the above test case would have yield statements in it that
930         // helped to expose the race nearly 100% of the time... but adding
931         // yields in the intuitively-right locations made it even less likely,
932         // and I wasn't sure why :( . This is a mediocre "next best" option.
933         do 8.times { test_rw_write_cond_downgrade_read_race_helper() }
934     }
935 }