]> git.lizzy.rs Git - rust.git/blob - src/libextra/arc.rs
Change finalize -> drop.
[rust.git] / src / libextra / arc.rs
1 // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*!
12  * Concurrency-enabled mechanisms for sharing mutable and/or immutable state
13  * between tasks.
14  *
15  * # Example
16  *
17  * In this example, a large vector of floats is shared between several tasks.
18  * With simple pipes, without ARC, a copy would have to be made for each task.
19  *
20  * ~~~ {.rust}
21  * extern mod std;
22  * use std::arc;
23  * let numbers=vec::from_fn(100, |ind| (ind as float)*rand::random());
24  * let shared_numbers=arc::ARC(numbers);
25  *
26  *   for 10.times {
27  *       let (port, chan)  = stream();
28  *       chan.send(shared_numbers.clone());
29  *
30  *       do spawn {
31  *           let shared_numbers=port.recv();
32  *           let local_numbers=shared_numbers.get();
33  *
34  *           // Work with the local numbers
35  *       }
36  *   }
37  * ~~~
38  */
39
40 #[allow(missing_doc)];
41
42 use core::prelude::*;
43
44 use sync;
45 use sync::{Mutex, mutex_with_condvars, RWlock, rwlock_with_condvars};
46
47 use core::cast;
48 use core::unstable::sync::UnsafeAtomicRcBox;
49 use core::task;
50 use core::borrow;
51
52 /// As sync::condvar, a mechanism for unlock-and-descheduling and signaling.
53 pub struct Condvar<'self> {
54     is_mutex: bool,
55     failed: &'self mut bool,
56     cond: &'self sync::Condvar<'self>
57 }
58
59 impl<'self> Condvar<'self> {
60     /// Atomically exit the associated ARC and block until a signal is sent.
61     #[inline]
62     pub fn wait(&self) { self.wait_on(0) }
63
64     /**
65      * Atomically exit the associated ARC and block on a specified condvar
66      * until a signal is sent on that same condvar (as sync::cond.wait_on).
67      *
68      * wait() is equivalent to wait_on(0).
69      */
70     #[inline]
71     pub fn wait_on(&self, condvar_id: uint) {
72         assert!(!*self.failed);
73         self.cond.wait_on(condvar_id);
74         // This is why we need to wrap sync::condvar.
75         check_poison(self.is_mutex, *self.failed);
76     }
77
78     /// Wake up a blocked task. Returns false if there was no blocked task.
79     #[inline]
80     pub fn signal(&self) -> bool { self.signal_on(0) }
81
82     /**
83      * Wake up a blocked task on a specified condvar (as
84      * sync::cond.signal_on). Returns false if there was no blocked task.
85      */
86     #[inline]
87     pub fn signal_on(&self, condvar_id: uint) -> bool {
88         assert!(!*self.failed);
89         self.cond.signal_on(condvar_id)
90     }
91
92     /// Wake up all blocked tasks. Returns the number of tasks woken.
93     #[inline]
94     pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
95
96     /**
97      * Wake up all blocked tasks on a specified condvar (as
98      * sync::cond.broadcast_on). Returns the number of tasks woken.
99      */
100     #[inline]
101     pub fn broadcast_on(&self, condvar_id: uint) -> uint {
102         assert!(!*self.failed);
103         self.cond.broadcast_on(condvar_id)
104     }
105 }
106
107 /****************************************************************************
108  * Immutable ARC
109  ****************************************************************************/
110
111 /// An atomically reference counted wrapper for shared immutable state.
112 pub struct ARC<T> { x: UnsafeAtomicRcBox<T> }
113
114 /// Create an atomically reference counted wrapper.
115 pub fn ARC<T:Const + Owned>(data: T) -> ARC<T> {
116     ARC { x: UnsafeAtomicRcBox::new(data) }
117 }
118
119 /**
120  * Access the underlying data in an atomically reference counted
121  * wrapper.
122  */
123 impl<T:Const+Owned> ARC<T> {
124     pub fn get<'a>(&'a self) -> &'a T {
125         unsafe { &*self.x.get_immut() }
126     }
127 }
128
129 /**
130  * Duplicate an atomically reference counted wrapper.
131  *
132  * The resulting two `arc` objects will point to the same underlying data
133  * object. However, one of the `arc` objects can be sent to another task,
134  * allowing them to share the underlying data.
135  */
136 impl<T:Const + Owned> Clone for ARC<T> {
137     fn clone(&self) -> ARC<T> {
138         ARC { x: self.x.clone() }
139     }
140 }
141
142 /****************************************************************************
143  * Mutex protected ARC (unsafe)
144  ****************************************************************************/
145
146 #[doc(hidden)]
147 struct MutexARCInner<T> { lock: Mutex, failed: bool, data: T }
148 /// An ARC with mutable data protected by a blocking mutex.
149 struct MutexARC<T> { x: UnsafeAtomicRcBox<MutexARCInner<T>> }
150
151 /// Create a mutex-protected ARC with the supplied data.
152 pub fn MutexARC<T:Owned>(user_data: T) -> MutexARC<T> {
153     mutex_arc_with_condvars(user_data, 1)
154 }
155 /**
156  * Create a mutex-protected ARC with the supplied data and a specified number
157  * of condvars (as sync::mutex_with_condvars).
158  */
159 pub fn mutex_arc_with_condvars<T:Owned>(user_data: T,
160                                     num_condvars: uint) -> MutexARC<T> {
161     let data =
162         MutexARCInner { lock: mutex_with_condvars(num_condvars),
163                           failed: false, data: user_data };
164     MutexARC { x: UnsafeAtomicRcBox::new(data) }
165 }
166
167 impl<T:Owned> Clone for MutexARC<T> {
168     /// Duplicate a mutex-protected ARC, as arc::clone.
169     fn clone(&self) -> MutexARC<T> {
170         // NB: Cloning the underlying mutex is not necessary. Its reference
171         // count would be exactly the same as the shared state's.
172         MutexARC { x: self.x.clone() }
173     }
174 }
175
176 impl<T:Owned> MutexARC<T> {
177
178     /**
179      * Access the underlying mutable data with mutual exclusion from other
180      * tasks. The argument closure will be run with the mutex locked; all
181      * other tasks wishing to access the data will block until the closure
182      * finishes running.
183      *
184      * The reason this function is 'unsafe' is because it is possible to
185      * construct a circular reference among multiple ARCs by mutating the
186      * underlying data. This creates potential for deadlock, but worse, this
187      * will guarantee a memory leak of all involved ARCs. Using mutex ARCs
188      * inside of other ARCs is safe in absence of circular references.
189      *
190      * If you wish to nest mutex_arcs, one strategy for ensuring safety at
191      * runtime is to add a "nesting level counter" inside the stored data, and
192      * when traversing the arcs, assert that they monotonically decrease.
193      *
194      * # Failure
195      *
196      * Failing while inside the ARC will unlock the ARC while unwinding, so
197      * that other tasks won't block forever. It will also poison the ARC:
198      * any tasks that subsequently try to access it (including those already
199      * blocked on the mutex) will also fail immediately.
200      */
201     #[inline]
202     pub unsafe fn access<U>(&self, blk: &fn(x: &mut T) -> U) -> U {
203         let state = self.x.get();
204         // Borrowck would complain about this if the function were
205         // not already unsafe. See borrow_rwlock, far below.
206         do (&(*state).lock).lock {
207             check_poison(true, (*state).failed);
208             let _z = PoisonOnFail(&mut (*state).failed);
209             blk(&mut (*state).data)
210         }
211     }
212
213     /// As access(), but with a condvar, as sync::mutex.lock_cond().
214     #[inline]
215     pub unsafe fn access_cond<'x, 'c, U>(&self,
216                                          blk: &fn(x: &'x mut T,
217                                                   c: &'c Condvar) -> U)
218                                          -> U {
219         let state = self.x.get();
220         do (&(*state).lock).lock_cond |cond| {
221             check_poison(true, (*state).failed);
222             let _z = PoisonOnFail(&mut (*state).failed);
223             blk(&mut (*state).data,
224                 &Condvar {is_mutex: true,
225                           failed: &mut (*state).failed,
226                           cond: cond })
227         }
228     }
229 }
230
231 // Common code for {mutex.access,rwlock.write}{,_cond}.
232 #[inline]
233 #[doc(hidden)]
234 fn check_poison(is_mutex: bool, failed: bool) {
235     if failed {
236         if is_mutex {
237             fail!("Poisoned MutexARC - another task failed inside!");
238         } else {
239             fail!("Poisoned rw_arc - another task failed inside!");
240         }
241     }
242 }
243
244 #[doc(hidden)]
245 struct PoisonOnFail {
246     failed: *mut bool,
247 }
248
249 impl Drop for PoisonOnFail {
250     fn drop(&self) {
251         unsafe {
252             /* assert!(!*self.failed);
253                -- might be false in case of cond.wait() */
254             if task::failing() {
255                 *self.failed = true;
256             }
257         }
258     }
259 }
260
261 fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail {
262     PoisonOnFail {
263         failed: failed
264     }
265 }
266
267 /****************************************************************************
268  * R/W lock protected ARC
269  ****************************************************************************/
270
271 #[doc(hidden)]
272 struct RWARCInner<T> { lock: RWlock, failed: bool, data: T }
273 /**
274  * A dual-mode ARC protected by a reader-writer lock. The data can be accessed
275  * mutably or immutably, and immutably-accessing tasks may run concurrently.
276  *
277  * Unlike mutex_arcs, rw_arcs are safe, because they cannot be nested.
278  */
279 #[mutable]
280 struct RWARC<T> {
281     x: UnsafeAtomicRcBox<RWARCInner<T>>,
282 }
283
284 /// Create a reader/writer ARC with the supplied data.
285 pub fn RWARC<T:Const + Owned>(user_data: T) -> RWARC<T> {
286     rw_arc_with_condvars(user_data, 1)
287 }
288 /**
289  * Create a reader/writer ARC with the supplied data and a specified number
290  * of condvars (as sync::rwlock_with_condvars).
291  */
292 pub fn rw_arc_with_condvars<T:Const + Owned>(
293     user_data: T,
294     num_condvars: uint) -> RWARC<T>
295 {
296     let data =
297         RWARCInner { lock: rwlock_with_condvars(num_condvars),
298                      failed: false, data: user_data };
299     RWARC { x: UnsafeAtomicRcBox::new(data), }
300 }
301
302 impl<T:Const + Owned> RWARC<T> {
303     /// Duplicate a rwlock-protected ARC, as arc::clone.
304     pub fn clone(&self) -> RWARC<T> {
305         RWARC {
306             x: self.x.clone(),
307         }
308     }
309
310 }
311
312 impl<T:Const + Owned> RWARC<T> {
313     /**
314      * Access the underlying data mutably. Locks the rwlock in write mode;
315      * other readers and writers will block.
316      *
317      * # Failure
318      *
319      * Failing while inside the ARC will unlock the ARC while unwinding, so
320      * that other tasks won't block forever. As MutexARC.access, it will also
321      * poison the ARC, so subsequent readers and writers will both also fail.
322      */
323     #[inline]
324     pub fn write<U>(&self, blk: &fn(x: &mut T) -> U) -> U {
325         unsafe {
326             let state = self.x.get();
327             do (*borrow_rwlock(state)).write {
328                 check_poison(false, (*state).failed);
329                 let _z = PoisonOnFail(&mut (*state).failed);
330                 blk(&mut (*state).data)
331             }
332         }
333     }
334
335     /// As write(), but with a condvar, as sync::rwlock.write_cond().
336     #[inline]
337     pub fn write_cond<'x, 'c, U>(&self,
338                                  blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
339                                  -> U {
340         unsafe {
341             let state = self.x.get();
342             do (*borrow_rwlock(state)).write_cond |cond| {
343                 check_poison(false, (*state).failed);
344                 let _z = PoisonOnFail(&mut (*state).failed);
345                 blk(&mut (*state).data,
346                     &Condvar {is_mutex: false,
347                               failed: &mut (*state).failed,
348                               cond: cond})
349             }
350         }
351     }
352
353     /**
354      * Access the underlying data immutably. May run concurrently with other
355      * reading tasks.
356      *
357      * # Failure
358      *
359      * Failing will unlock the ARC while unwinding. However, unlike all other
360      * access modes, this will not poison the ARC.
361      */
362     pub fn read<U>(&self, blk: &fn(x: &T) -> U) -> U {
363         unsafe {
364             let state = self.x.get();
365             do (*state).lock.read {
366                 check_poison(false, (*state).failed);
367                 blk(&(*state).data)
368             }
369         }
370     }
371
372     /**
373      * As write(), but with the ability to atomically 'downgrade' the lock.
374      * See sync::rwlock.write_downgrade(). The RWWriteMode token must be used
375      * to obtain the &mut T, and can be transformed into a RWReadMode token by
376      * calling downgrade(), after which a &T can be obtained instead.
377      *
378      * # Example
379      *
380      * ~~~ {.rust}
381      * do arc.write_downgrade |mut write_token| {
382      *     do write_token.write_cond |state, condvar| {
383      *         ... exclusive access with mutable state ...
384      *     }
385      *     let read_token = arc.downgrade(write_token);
386      *     do read_token.read |state| {
387      *         ... shared access with immutable state ...
388      *     }
389      * }
390      * ~~~
391      */
392     pub fn write_downgrade<U>(&self, blk: &fn(v: RWWriteMode<T>) -> U) -> U {
393         unsafe {
394             let state = self.x.get();
395             do (*borrow_rwlock(state)).write_downgrade |write_mode| {
396                 check_poison(false, (*state).failed);
397                 blk(RWWriteMode {
398                     data: &mut (*state).data,
399                     token: write_mode,
400                     poison: PoisonOnFail(&mut (*state).failed)
401                 })
402             }
403         }
404     }
405
406     /// To be called inside of the write_downgrade block.
407     pub fn downgrade<'a>(&self, token: RWWriteMode<'a, T>)
408                          -> RWReadMode<'a, T> {
409         unsafe {
410             // The rwlock should assert that the token belongs to us for us.
411             let state = self.x.get();
412             let RWWriteMode {
413                 data: data,
414                 token: t,
415                 poison: _poison
416             } = token;
417             // Let readers in
418             let new_token = (*state).lock.downgrade(t);
419             // Whatever region the input reference had, it will be safe to use
420             // the same region for the output reference. (The only 'unsafe' part
421             // of this cast is removing the mutability.)
422             let new_data = cast::transmute_immut(data);
423             // Downgrade ensured the token belonged to us. Just a sanity check.
424             assert!(borrow::ref_eq(&(*state).data, new_data));
425             // Produce new token
426             RWReadMode {
427                 data: new_data,
428                 token: new_token,
429             }
430         }
431     }
432 }
433
434 // Borrowck rightly complains about immutably aliasing the rwlock in order to
435 // lock it. This wraps the unsafety, with the justification that the 'lock'
436 // field is never overwritten; only 'failed' and 'data'.
437 #[doc(hidden)]
438 fn borrow_rwlock<T:Const + Owned>(state: *const RWARCInner<T>) -> *RWlock {
439     unsafe { cast::transmute(&const (*state).lock) }
440 }
441
442 /// The "write permission" token used for RWARC.write_downgrade().
443 pub struct RWWriteMode<'self, T> {
444     data: &'self mut T,
445     token: sync::RWlockWriteMode<'self>,
446     poison: PoisonOnFail,
447 }
448
449 /// The "read permission" token used for RWARC.write_downgrade().
450 pub struct RWReadMode<'self, T> {
451     data: &'self T,
452     token: sync::RWlockReadMode<'self>,
453 }
454
455 impl<'self, T:Const + Owned> RWWriteMode<'self, T> {
456     /// Access the pre-downgrade RWARC in write mode.
457     pub fn write<U>(&mut self, blk: &fn(x: &mut T) -> U) -> U {
458         match *self {
459             RWWriteMode {
460                 data: &ref mut data,
461                 token: ref token,
462                 poison: _
463             } => {
464                 do token.write {
465                     blk(data)
466                 }
467             }
468         }
469     }
470
471     /// Access the pre-downgrade RWARC in write mode with a condvar.
472     pub fn write_cond<'x, 'c, U>(&mut self,
473                                  blk: &fn(x: &'x mut T, c: &'c Condvar) -> U)
474                                  -> U {
475         match *self {
476             RWWriteMode {
477                 data: &ref mut data,
478                 token: ref token,
479                 poison: ref poison
480             } => {
481                 do token.write_cond |cond| {
482                     unsafe {
483                         let cvar = Condvar {
484                             is_mutex: false,
485                             failed: &mut *poison.failed,
486                             cond: cond
487                         };
488                         blk(data, &cvar)
489                     }
490                 }
491             }
492         }
493     }
494 }
495
496 impl<'self, T:Const + Owned> RWReadMode<'self, T> {
497     /// Access the post-downgrade rwlock in read mode.
498     pub fn read<U>(&self, blk: &fn(x: &T) -> U) -> U {
499         match *self {
500             RWReadMode {
501                 data: data,
502                 token: ref token
503             } => {
504                 do token.read { blk(data) }
505             }
506         }
507     }
508 }
509
510 /****************************************************************************
511  * Tests
512  ****************************************************************************/
513
514 #[cfg(test)]
515 mod tests {
516     use core::prelude::*;
517
518     use arc::*;
519
520     use core::vec;
521     use core::cell::Cell;
522     use core::comm;
523     use core::task;
524
525     #[test]
526     fn manually_share_arc() {
527         let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
528         let arc_v = ARC(v);
529
530         let (p, c) = comm::stream();
531
532         do task::spawn() || {
533             let p = comm::PortSet::new();
534             c.send(p.chan());
535
536             let arc_v : ARC<~[int]> = p.recv();
537
538             let v = copy (*arc_v.get());
539             assert_eq!(v[3], 4);
540         };
541
542         let c = p.recv();
543         c.send(arc_v.clone());
544
545         assert_eq!(arc_v.get()[2], 3);
546         assert_eq!(arc_v.get()[4], 5);
547
548         info!(arc_v);
549     }
550
551     #[test]
552     fn test_mutex_arc_condvar() {
553         unsafe {
554             let arc = ~MutexARC(false);
555             let arc2 = ~arc.clone();
556             let (p,c) = comm::oneshot();
557             let (c,p) = (Cell::new(c), Cell::new(p));
558             do task::spawn || {
559                 // wait until parent gets in
560                 comm::recv_one(p.take());
561                 do arc2.access_cond |state, cond| {
562                     *state = true;
563                     cond.signal();
564                 }
565             }
566             do arc.access_cond |state, cond| {
567                 comm::send_one(c.take(), ());
568                 assert!(!*state);
569                 while !*state {
570                     cond.wait();
571                 }
572             }
573         }
574     }
575     #[test] #[should_fail] #[ignore(cfg(windows))]
576     fn test_arc_condvar_poison() {
577         unsafe {
578             let arc = ~MutexARC(1);
579             let arc2 = ~arc.clone();
580             let (p, c) = comm::stream();
581
582             do task::spawn_unlinked || {
583                 let _ = p.recv();
584                 do arc2.access_cond |one, cond| {
585                     cond.signal();
586                     // Parent should fail when it wakes up.
587                     assert_eq!(*one, 0);
588                 }
589             }
590
591             do arc.access_cond |one, cond| {
592                 c.send(());
593                 while *one == 1 {
594                     cond.wait();
595                 }
596             }
597         }
598     }
599     #[test] #[should_fail] #[ignore(cfg(windows))]
600     fn test_mutex_arc_poison() {
601         unsafe {
602             let arc = ~MutexARC(1);
603             let arc2 = ~arc.clone();
604             do task::try || {
605                 do arc2.access |one| {
606                     assert_eq!(*one, 2);
607                 }
608             };
609             do arc.access |one| {
610                 assert_eq!(*one, 1);
611             }
612         }
613     }
614     #[test] #[should_fail] #[ignore(cfg(windows))]
615     fn test_rw_arc_poison_wr() {
616         let arc = ~RWARC(1);
617         let arc2 = (*arc).clone();
618         do task::try || {
619             do arc2.write |one| {
620                 assert_eq!(*one, 2);
621             }
622         };
623         do arc.read |one| {
624             assert_eq!(*one, 1);
625         }
626     }
627     #[test] #[should_fail] #[ignore(cfg(windows))]
628     fn test_rw_arc_poison_ww() {
629         let arc = ~RWARC(1);
630         let arc2 = (*arc).clone();
631         do task::try || {
632             do arc2.write |one| {
633                 assert_eq!(*one, 2);
634             }
635         };
636         do arc.write |one| {
637             assert_eq!(*one, 1);
638         }
639     }
640     #[test] #[should_fail] #[ignore(cfg(windows))]
641     fn test_rw_arc_poison_dw() {
642         let arc = ~RWARC(1);
643         let arc2 = (*arc).clone();
644         do task::try || {
645             do arc2.write_downgrade |mut write_mode| {
646                 do write_mode.write |one| {
647                     assert_eq!(*one, 2);
648                 }
649             }
650         };
651         do arc.write |one| {
652             assert_eq!(*one, 1);
653         }
654     }
655     #[test] #[ignore(cfg(windows))]
656     fn test_rw_arc_no_poison_rr() {
657         let arc = ~RWARC(1);
658         let arc2 = (*arc).clone();
659         do task::try || {
660             do arc2.read |one| {
661                 assert_eq!(*one, 2);
662             }
663         };
664         do arc.read |one| {
665             assert_eq!(*one, 1);
666         }
667     }
668     #[test] #[ignore(cfg(windows))]
669     fn test_rw_arc_no_poison_rw() {
670         let arc = ~RWARC(1);
671         let arc2 = (*arc).clone();
672         do task::try || {
673             do arc2.read |one| {
674                 assert_eq!(*one, 2);
675             }
676         };
677         do arc.write |one| {
678             assert_eq!(*one, 1);
679         }
680     }
681     #[test] #[ignore(cfg(windows))]
682     fn test_rw_arc_no_poison_dr() {
683         let arc = ~RWARC(1);
684         let arc2 = (*arc).clone();
685         do task::try || {
686             do arc2.write_downgrade |write_mode| {
687                 let read_mode = arc2.downgrade(write_mode);
688                 do (&read_mode).read |one| {
689                     assert_eq!(*one, 2);
690                 }
691             }
692         };
693         do arc.write |one| {
694             assert_eq!(*one, 1);
695         }
696     }
697     #[test]
698     fn test_rw_arc() {
699         let arc = ~RWARC(0);
700         let arc2 = (*arc).clone();
701         let (p,c) = comm::stream();
702
703         do task::spawn || {
704             do arc2.write |num| {
705                 for 10.times {
706                     let tmp = *num;
707                     *num = -1;
708                     task::yield();
709                     *num = tmp + 1;
710                 }
711                 c.send(());
712             }
713         }
714
715         // Readers try to catch the writer in the act
716         let mut children = ~[];
717         for 5.times {
718             let arc3 = (*arc).clone();
719             let mut builder = task::task();
720             builder.future_result(|r| children.push(r));
721             do builder.spawn {
722                 do arc3.read |num| {
723                     assert!(*num >= 0);
724                 }
725             }
726         }
727
728         // Wait for children to pass their asserts
729         for children.iter().advance |r| {
730             r.recv();
731         }
732
733         // Wait for writer to finish
734         p.recv();
735         do arc.read |num| {
736             assert_eq!(*num, 10);
737         }
738     }
739     #[test]
740     fn test_rw_downgrade() {
741         // (1) A downgrader gets in write mode and does cond.wait.
742         // (2) A writer gets in write mode, sets state to 42, and does signal.
743         // (3) Downgrader wakes, sets state to 31337.
744         // (4) tells writer and all other readers to contend as it downgrades.
745         // (5) Writer attempts to set state back to 42, while downgraded task
746         //     and all reader tasks assert that it's 31337.
747         let arc = ~RWARC(0);
748
749         // Reader tasks
750         let mut reader_convos = ~[];
751         for 10.times {
752             let ((rp1,rc1),(rp2,rc2)) = (comm::stream(),comm::stream());
753             reader_convos.push((rc1, rp2));
754             let arcn = (*arc).clone();
755             do task::spawn || {
756                 rp1.recv(); // wait for downgrader to give go-ahead
757                 do arcn.read |state| {
758                     assert_eq!(*state, 31337);
759                     rc2.send(());
760                 }
761             }
762         }
763
764         // Writer task
765         let arc2 = (*arc).clone();
766         let ((wp1,wc1),(wp2,wc2)) = (comm::stream(),comm::stream());
767         do task::spawn || {
768             wp1.recv();
769             do arc2.write_cond |state, cond| {
770                 assert_eq!(*state, 0);
771                 *state = 42;
772                 cond.signal();
773             }
774             wp1.recv();
775             do arc2.write |state| {
776                 // This shouldn't happen until after the downgrade read
777                 // section, and all other readers, finish.
778                 assert_eq!(*state, 31337);
779                 *state = 42;
780             }
781             wc2.send(());
782         }
783
784         // Downgrader (us)
785         do arc.write_downgrade |mut write_mode| {
786             do write_mode.write_cond |state, cond| {
787                 wc1.send(()); // send to another writer who will wake us up
788                 while *state == 0 {
789                     cond.wait();
790                 }
791                 assert_eq!(*state, 42);
792                 *state = 31337;
793                 // send to other readers
794                 for vec::each(reader_convos) |x| {
795                     match *x {
796                         (ref rc, _) => rc.send(()),
797                     }
798                 }
799             }
800             let read_mode = arc.downgrade(write_mode);
801             do (&read_mode).read |state| {
802                 // complete handshake with other readers
803                 for vec::each(reader_convos) |x| {
804                     match *x {
805                         (_, ref rp) => rp.recv(),
806                     }
807                 }
808                 wc1.send(()); // tell writer to try again
809                 assert_eq!(*state, 31337);
810             }
811         }
812
813         wp2.recv(); // complete handshake with writer
814     }
815     #[cfg(test)]
816     fn test_rw_write_cond_downgrade_read_race_helper() {
817         // Tests that when a downgrader hands off the "reader cloud" lock
818         // because of a contending reader, a writer can't race to get it
819         // instead, which would result in readers_and_writers. This tests
820         // the sync module rather than this one, but it's here because an
821         // rwarc gives us extra shared state to help check for the race.
822         // If you want to see this test fail, go to sync.rs and replace the
823         // line in RWlock::write_cond() that looks like:
824         //     "blk(&Condvar { order: opt_lock, ..*cond })"
825         // with just "blk(cond)".
826         let x = ~RWARC(true);
827         let (wp, wc) = comm::stream();
828
829         // writer task
830         let xw = (*x).clone();
831         do task::spawn {
832             do xw.write_cond |state, c| {
833                 wc.send(()); // tell downgrader it's ok to go
834                 c.wait();
835                 // The core of the test is here: the condvar reacquire path
836                 // must involve order_lock, so that it cannot race with a reader
837                 // trying to receive the "reader cloud lock hand-off".
838                 *state = false;
839             }
840         }
841
842         wp.recv(); // wait for writer to get in
843
844         do x.write_downgrade |mut write_mode| {
845             do write_mode.write_cond |state, c| {
846                 assert!(*state);
847                 // make writer contend in the cond-reacquire path
848                 c.signal();
849             }
850             // make a reader task to trigger the "reader cloud lock" handoff
851             let xr = (*x).clone();
852             let (rp, rc) = comm::stream();
853             do task::spawn {
854                 rc.send(());
855                 do xr.read |_state| { }
856             }
857             rp.recv(); // wait for reader task to exist
858
859             let read_mode = x.downgrade(write_mode);
860             do read_mode.read |state| {
861                 // if writer mistakenly got in, make sure it mutates state
862                 // before we assert on it
863                 for 5.times { task::yield(); }
864                 // make sure writer didn't get in.
865                 assert!(*state);
866             }
867         }
868     }
869     #[test]
870     fn test_rw_write_cond_downgrade_read_race() {
871         // Ideally the above test case would have yield statements in it that
872         // helped to expose the race nearly 100% of the time... but adding
873         // yields in the intuitively-right locations made it even less likely,
874         // and I wasn't sure why :( . This is a mediocre "next best" option.
875         for 8.times { test_rw_write_cond_downgrade_read_race_helper() }
876     }
877 }