]> git.lizzy.rs Git - rust.git/blob - src/libextra/arc.rs
remove useless `transmute_immut` function
[rust.git] / src / libextra / arc.rs
1 // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /*!
12  * Concurrency-enabled mechanisms for sharing mutable and/or immutable state
13  * between tasks.
14  *
15  * # Example
16  *
17  * In this example, a large vector of floats is shared between several tasks.
18  * With simple pipes, without Arc, a copy would have to be made for each task.
19  *
20  * ```rust
21  * extern mod std;
22  * use extra::arc;
23  * let numbers=vec::from_fn(100, |ind| (ind as float)*rand::random());
24  * let shared_numbers=arc::Arc::new(numbers);
25  *
26  *   do 10.times {
27  *       let (port, chan)  = stream();
28  *       chan.send(shared_numbers.clone());
29  *
30  *       do spawn {
31  *           let shared_numbers=port.recv();
32  *           let local_numbers=shared_numbers.get();
33  *
34  *           // Work with the local numbers
35  *       }
36  *   }
37  * ```
38  */
39
40 #[allow(missing_doc)];
41
42
43 use sync;
44 use sync::{Mutex, RWLock};
45
46 use std::cast;
47 use std::unstable::sync::UnsafeArc;
48 use std::task;
49 use std::borrow;
50
51 /// As sync::condvar, a mechanism for unlock-and-descheduling and signaling.
52 pub struct Condvar<'self> {
53     priv is_mutex: bool,
54     priv failed: &'self mut bool,
55     priv cond: &'self sync::Condvar<'self>
56 }
57
58 impl<'self> Condvar<'self> {
59     /// Atomically exit the associated Arc and block until a signal is sent.
60     #[inline]
61     pub fn wait(&self) { self.wait_on(0) }
62
63     /**
64      * Atomically exit the associated Arc and block on a specified condvar
65      * until a signal is sent on that same condvar (as sync::cond.wait_on).
66      *
67      * wait() is equivalent to wait_on(0).
68      */
69     #[inline]
70     pub fn wait_on(&self, condvar_id: uint) {
71         assert!(!*self.failed);
72         self.cond.wait_on(condvar_id);
73         // This is why we need to wrap sync::condvar.
74         check_poison(self.is_mutex, *self.failed);
75     }
76
77     /// Wake up a blocked task. Returns false if there was no blocked task.
78     #[inline]
79     pub fn signal(&self) -> bool { self.signal_on(0) }
80
81     /**
82      * Wake up a blocked task on a specified condvar (as
83      * sync::cond.signal_on). Returns false if there was no blocked task.
84      */
85     #[inline]
86     pub fn signal_on(&self, condvar_id: uint) -> bool {
87         assert!(!*self.failed);
88         self.cond.signal_on(condvar_id)
89     }
90
91     /// Wake up all blocked tasks. Returns the number of tasks woken.
92     #[inline]
93     pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
94
95     /**
96      * Wake up all blocked tasks on a specified condvar (as
97      * sync::cond.broadcast_on). Returns the number of tasks woken.
98      */
99     #[inline]
100     pub fn broadcast_on(&self, condvar_id: uint) -> uint {
101         assert!(!*self.failed);
102         self.cond.broadcast_on(condvar_id)
103     }
104 }
105
106 /****************************************************************************
107  * Immutable Arc
108  ****************************************************************************/
109
110 /// An atomically reference counted wrapper for shared immutable state.
111 pub struct Arc<T> { priv x: UnsafeArc<T> }
112
113
114 /**
115  * Access the underlying data in an atomically reference counted
116  * wrapper.
117  */
118 impl<T:Freeze+Send> Arc<T> {
119     /// Create an atomically reference counted wrapper.
120     #[inline]
121     pub fn new(data: T) -> Arc<T> {
122         Arc { x: UnsafeArc::new(data) }
123     }
124
125     #[inline]
126     pub fn get<'a>(&'a self) -> &'a T {
127         unsafe { &*self.x.get_immut() }
128     }
129
130     /**
131      * Retrieve the data back out of the Arc. This function blocks until the
132      * reference given to it is the last existing one, and then unwrap the data
133      * instead of destroying it.
134      *
135      * If multiple tasks call unwrap, all but the first will fail. Do not call
136      * unwrap from a task that holds another reference to the same Arc; it is
137      * guaranteed to deadlock.
138      */
139     pub fn unwrap(self) -> T {
140         let Arc { x: x } = self;
141         x.unwrap()
142     }
143 }
144
145 impl<T:Freeze + Send> Clone for Arc<T> {
146     /**
147     * Duplicate an atomically reference counted wrapper.
148     *
149     * The resulting two `arc` objects will point to the same underlying data
150     * object. However, one of the `arc` objects can be sent to another task,
151     * allowing them to share the underlying data.
152     */
153     #[inline]
154     fn clone(&self) -> Arc<T> {
155         Arc { x: self.x.clone() }
156     }
157 }
158
159 /****************************************************************************
160  * Mutex protected Arc (unsafe)
161  ****************************************************************************/
162
163 #[doc(hidden)]
164 struct MutexArcInner<T> { priv lock: Mutex, priv failed: bool, priv data: T }
165
166 /// An Arc with mutable data protected by a blocking mutex.
167 #[no_freeze]
168 pub struct MutexArc<T> { priv x: UnsafeArc<MutexArcInner<T>> }
169
170
171 impl<T:Send> Clone for MutexArc<T> {
172     /// Duplicate a mutex-protected Arc. See arc::clone for more details.
173     #[inline]
174     fn clone(&self) -> MutexArc<T> {
175         // NB: Cloning the underlying mutex is not necessary. Its reference
176         // count would be exactly the same as the shared state's.
177         MutexArc { x: self.x.clone() }
178     }
179 }
180
181 impl<T:Send> MutexArc<T> {
182     /// Create a mutex-protected Arc with the supplied data.
183     pub fn new(user_data: T) -> MutexArc<T> {
184         MutexArc::new_with_condvars(user_data, 1)
185     }
186
187     /**
188      * Create a mutex-protected Arc with the supplied data and a specified number
189      * of condvars (as sync::Mutex::new_with_condvars).
190      */
191     pub fn new_with_condvars(user_data: T, num_condvars: uint) -> MutexArc<T> {
192         let data = MutexArcInner {
193             lock: Mutex::new_with_condvars(num_condvars),
194             failed: false, data: user_data
195         };
196         MutexArc { x: UnsafeArc::new(data) }
197     }
198
199     /**
200      * Access the underlying mutable data with mutual exclusion from other
201      * tasks. The argument closure will be run with the mutex locked; all
202      * other tasks wishing to access the data will block until the closure
203      * finishes running.
204      *
205      * The reason this function is 'unsafe' is because it is possible to
206      * construct a circular reference among multiple Arcs by mutating the
207      * underlying data. This creates potential for deadlock, but worse, this
208      * will guarantee a memory leak of all involved Arcs. Using MutexArcs
209      * inside of other Arcs is safe in absence of circular references.
210      *
211      * If you wish to nest MutexArcs, one strategy for ensuring safety at
212      * runtime is to add a "nesting level counter" inside the stored data, and
213      * when traversing the arcs, assert that they monotonically decrease.
214      *
215      * # Failure
216      *
217      * Failing while inside the Arc will unlock the Arc while unwinding, so
218      * that other tasks won't block forever. It will also poison the Arc:
219      * any tasks that subsequently try to access it (including those already
220      * blocked on the mutex) will also fail immediately.
221      */
222     #[inline]
223     pub unsafe fn unsafe_access<U>(&self, blk: |x: &mut T| -> U) -> U {
224         let state = self.x.get();
225         // Borrowck would complain about this if the function were
226         // not already unsafe. See borrow_rwlock, far below.
227         (&(*state).lock).lock(|| {
228             check_poison(true, (*state).failed);
229             let _z = PoisonOnFail(&mut (*state).failed);
230             blk(&mut (*state).data)
231         })
232     }
233
234     /// As unsafe_access(), but with a condvar, as sync::mutex.lock_cond().
235     #[inline]
236     pub unsafe fn unsafe_access_cond<U>(&self,
237                                         blk: |x: &mut T, c: &Condvar| -> U)
238                                         -> U {
239         let state = self.x.get();
240         (&(*state).lock).lock_cond(|cond| {
241             check_poison(true, (*state).failed);
242             let _z = PoisonOnFail(&mut (*state).failed);
243             blk(&mut (*state).data,
244                 &Condvar {is_mutex: true,
245                           failed: &mut (*state).failed,
246                           cond: cond })
247         })
248     }
249
250     /**
251      * Retrieves the data, blocking until all other references are dropped,
252      * exactly as arc::unwrap.
253      *
254      * Will additionally fail if another task has failed while accessing the arc.
255      */
256     pub fn unwrap(self) -> T {
257         let MutexArc { x: x } = self;
258         let inner = x.unwrap();
259         let MutexArcInner { failed: failed, data: data, .. } = inner;
260         if failed {
261             fail!("Can't unwrap poisoned MutexArc - another task failed inside!");
262         }
263         data
264     }
265 }
266
267 impl<T:Freeze + Send> MutexArc<T> {
268
269     /**
270      * As unsafe_access.
271      *
272      * The difference between access and unsafe_access is that the former
273      * forbids mutexes to be nested. While unsafe_access can be used on
274      * MutexArcs without freezable interiors, this safe version of access
275      * requires the Freeze bound, which prohibits access on MutexArcs which
276      * might contain nested MutexArcs inside.
277      *
278      * The purpose of this is to offer a safe implementation of MutexArc to be
279      * used instead of RWArc in cases where no readers are needed and sightly
280      * better performance is required.
281      *
282      * Both methods have the same failure behaviour as unsafe_access and
283      * unsafe_access_cond.
284      */
285     #[inline]
286     pub fn access<U>(&self, blk: |x: &mut T| -> U) -> U {
287         unsafe { self.unsafe_access(blk) }
288     }
289
290     /// As unsafe_access_cond but safe and Freeze.
291     #[inline]
292     pub fn access_cond<U>(&self,
293                           blk: |x: &mut T, c: &Condvar| -> U)
294                           -> U {
295         unsafe { self.unsafe_access_cond(blk) }
296     }
297 }
298
299 // Common code for {mutex.access,rwlock.write}{,_cond}.
300 #[inline]
301 #[doc(hidden)]
302 fn check_poison(is_mutex: bool, failed: bool) {
303     if failed {
304         if is_mutex {
305             fail!("Poisoned MutexArc - another task failed inside!");
306         } else {
307             fail!("Poisoned rw_arc - another task failed inside!");
308         }
309     }
310 }
311
312 #[doc(hidden)]
313 struct PoisonOnFail {
314     failed: *mut bool,
315 }
316
317 impl Drop for PoisonOnFail {
318     fn drop(&mut self) {
319         unsafe {
320             /* assert!(!*self.failed);
321                -- might be false in case of cond.wait() */
322             if task::failing() {
323                 *self.failed = true;
324             }
325         }
326     }
327 }
328
329 fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail {
330     PoisonOnFail {
331         failed: failed
332     }
333 }
334
335 /****************************************************************************
336  * R/W lock protected Arc
337  ****************************************************************************/
338
339 #[doc(hidden)]
340 struct RWArcInner<T> { priv lock: RWLock, priv failed: bool, priv data: T }
341 /**
342  * A dual-mode Arc protected by a reader-writer lock. The data can be accessed
343  * mutably or immutably, and immutably-accessing tasks may run concurrently.
344  *
345  * Unlike mutex_arcs, rw_arcs are safe, because they cannot be nested.
346  */
347 #[no_freeze]
348 pub struct RWArc<T> {
349     priv x: UnsafeArc<RWArcInner<T>>,
350 }
351
352 impl<T:Freeze + Send> Clone for RWArc<T> {
353     /// Duplicate a rwlock-protected Arc. See arc::clone for more details.
354     #[inline]
355     fn clone(&self) -> RWArc<T> {
356         RWArc { x: self.x.clone() }
357     }
358
359 }
360
361 impl<T:Freeze + Send> RWArc<T> {
362     /// Create a reader/writer Arc with the supplied data.
363     pub fn new(user_data: T) -> RWArc<T> {
364         RWArc::new_with_condvars(user_data, 1)
365     }
366
367     /**
368      * Create a reader/writer Arc with the supplied data and a specified number
369      * of condvars (as sync::RWLock::new_with_condvars).
370      */
371     pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWArc<T> {
372         let data = RWArcInner {
373             lock: RWLock::new_with_condvars(num_condvars),
374             failed: false, data: user_data
375         };
376         RWArc { x: UnsafeArc::new(data), }
377     }
378
379     /**
380      * Access the underlying data mutably. Locks the rwlock in write mode;
381      * other readers and writers will block.
382      *
383      * # Failure
384      *
385      * Failing while inside the Arc will unlock the Arc while unwinding, so
386      * that other tasks won't block forever. As MutexArc.access, it will also
387      * poison the Arc, so subsequent readers and writers will both also fail.
388      */
389     #[inline]
390     pub fn write<U>(&self, blk: |x: &mut T| -> U) -> U {
391         unsafe {
392             let state = self.x.get();
393             (*borrow_rwlock(state)).write(|| {
394                 check_poison(false, (*state).failed);
395                 let _z = PoisonOnFail(&mut (*state).failed);
396                 blk(&mut (*state).data)
397             })
398         }
399     }
400
401     /// As write(), but with a condvar, as sync::rwlock.write_cond().
402     #[inline]
403     pub fn write_cond<U>(&self,
404                          blk: |x: &mut T, c: &Condvar| -> U)
405                          -> U {
406         unsafe {
407             let state = self.x.get();
408             (*borrow_rwlock(state)).write_cond(|cond| {
409                 check_poison(false, (*state).failed);
410                 let _z = PoisonOnFail(&mut (*state).failed);
411                 blk(&mut (*state).data,
412                     &Condvar {is_mutex: false,
413                               failed: &mut (*state).failed,
414                               cond: cond})
415             })
416         }
417     }
418
419     /**
420      * Access the underlying data immutably. May run concurrently with other
421      * reading tasks.
422      *
423      * # Failure
424      *
425      * Failing will unlock the Arc while unwinding. However, unlike all other
426      * access modes, this will not poison the Arc.
427      */
428     pub fn read<U>(&self, blk: |x: &T| -> U) -> U {
429         unsafe {
430             let state = self.x.get();
431             (*state).lock.read(|| {
432                 check_poison(false, (*state).failed);
433                 blk(&(*state).data)
434             })
435         }
436     }
437
438     /**
439      * As write(), but with the ability to atomically 'downgrade' the lock.
440      * See sync::rwlock.write_downgrade(). The RWWriteMode token must be used
441      * to obtain the &mut T, and can be transformed into a RWReadMode token by
442      * calling downgrade(), after which a &T can be obtained instead.
443      *
444      * # Example
445      *
446      * ```rust
447      * do arc.write_downgrade |mut write_token| {
448      *     do write_token.write_cond |state, condvar| {
449      *         ... exclusive access with mutable state ...
450      *     }
451      *     let read_token = arc.downgrade(write_token);
452      *     do read_token.read |state| {
453      *         ... shared access with immutable state ...
454      *     }
455      * }
456      * ```
457      */
458     pub fn write_downgrade<U>(&self, blk: |v: RWWriteMode<T>| -> U) -> U {
459         unsafe {
460             let state = self.x.get();
461             (*borrow_rwlock(state)).write_downgrade(|write_mode| {
462                 check_poison(false, (*state).failed);
463                 blk(RWWriteMode {
464                     data: &mut (*state).data,
465                     token: write_mode,
466                     poison: PoisonOnFail(&mut (*state).failed)
467                 })
468             })
469         }
470     }
471
472     /// To be called inside of the write_downgrade block.
473     pub fn downgrade<'a>(&self, token: RWWriteMode<'a, T>)
474                          -> RWReadMode<'a, T> {
475         unsafe {
476             // The rwlock should assert that the token belongs to us for us.
477             let state = self.x.get();
478             let RWWriteMode {
479                 data: data,
480                 token: t,
481                 poison: _poison
482             } = token;
483             // Let readers in
484             let new_token = (*state).lock.downgrade(t);
485             // Whatever region the input reference had, it will be safe to use
486             // the same region for the output reference. (The only 'unsafe' part
487             // of this cast is removing the mutability.)
488             let new_data = data;
489             // Downgrade ensured the token belonged to us. Just a sanity check.
490             assert!(borrow::ref_eq(&(*state).data, new_data));
491             // Produce new token
492             RWReadMode {
493                 data: new_data,
494                 token: new_token,
495             }
496         }
497     }
498
499     /**
500      * Retrieves the data, blocking until all other references are dropped,
501      * exactly as arc::unwrap.
502      *
503      * Will additionally fail if another task has failed while accessing the arc
504      * in write mode.
505      */
506     pub fn unwrap(self) -> T {
507         let RWArc { x: x, .. } = self;
508         let inner = x.unwrap();
509         let RWArcInner { failed: failed, data: data, .. } = inner;
510         if failed {
511             fail!("Can't unwrap poisoned RWArc - another task failed inside!")
512         }
513         data
514     }
515 }
516
517 // Borrowck rightly complains about immutably aliasing the rwlock in order to
518 // lock it. This wraps the unsafety, with the justification that the 'lock'
519 // field is never overwritten; only 'failed' and 'data'.
520 #[doc(hidden)]
521 fn borrow_rwlock<T:Freeze + Send>(state: *mut RWArcInner<T>) -> *RWLock {
522     unsafe { cast::transmute(&(*state).lock) }
523 }
524
525 /// The "write permission" token used for RWArc.write_downgrade().
526 pub struct RWWriteMode<'self, T> {
527     priv data: &'self mut T,
528     priv token: sync::RWLockWriteMode<'self>,
529     priv poison: PoisonOnFail,
530 }
531
532 /// The "read permission" token used for RWArc.write_downgrade().
533 pub struct RWReadMode<'self, T> {
534     priv data: &'self T,
535     priv token: sync::RWLockReadMode<'self>,
536 }
537
538 impl<'self, T:Freeze + Send> RWWriteMode<'self, T> {
539     /// Access the pre-downgrade RWArc in write mode.
540     pub fn write<U>(&mut self, blk: |x: &mut T| -> U) -> U {
541         match *self {
542             RWWriteMode {
543                 data: &ref mut data,
544                 token: ref token,
545                 poison: _
546             } => {
547                 token.write(|| blk(data))
548             }
549         }
550     }
551
552     /// Access the pre-downgrade RWArc in write mode with a condvar.
553     pub fn write_cond<U>(&mut self,
554                          blk: |x: &mut T, c: &Condvar| -> U)
555                          -> U {
556         match *self {
557             RWWriteMode {
558                 data: &ref mut data,
559                 token: ref token,
560                 poison: ref poison
561             } => {
562                 token.write_cond(|cond| {
563                     unsafe {
564                         let cvar = Condvar {
565                             is_mutex: false,
566                             failed: &mut *poison.failed,
567                             cond: cond
568                         };
569                         blk(data, &cvar)
570                     }
571                 })
572             }
573         }
574     }
575 }
576
577 impl<'self, T:Freeze + Send> RWReadMode<'self, T> {
578     /// Access the post-downgrade rwlock in read mode.
579     pub fn read<U>(&self, blk: |x: &T| -> U) -> U {
580         match *self {
581             RWReadMode {
582                 data: data,
583                 token: ref token
584             } => {
585                 token.read(|| blk(data))
586             }
587         }
588     }
589 }
590
591 /****************************************************************************
592  * Tests
593  ****************************************************************************/
594
595 #[cfg(test)]
596 mod tests {
597
598     use arc::*;
599
600     use std::cell::Cell;
601     use std::comm;
602     use std::task;
603
604     #[test]
605     fn manually_share_arc() {
606         let v = ~[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
607         let arc_v = Arc::new(v);
608
609         let (p, c) = comm::stream();
610
611         do task::spawn {
612             let arc_v: Arc<~[int]> = p.recv();
613
614             let v = arc_v.get().clone();
615             assert_eq!(v[3], 4);
616         };
617
618         c.send(arc_v.clone());
619
620         assert_eq!(arc_v.get()[2], 3);
621         assert_eq!(arc_v.get()[4], 5);
622
623         info!("{:?}", arc_v);
624     }
625
626     #[test]
627     fn test_mutex_arc_condvar() {
628         let arc = ~MutexArc::new(false);
629         let arc2 = ~arc.clone();
630         let (p,c) = comm::oneshot();
631         let (c,p) = (Cell::new(c), Cell::new(p));
632         do task::spawn || {
633             // wait until parent gets in
634             p.take().recv();
635             arc2.access_cond(|state, cond| {
636                 *state = true;
637                 cond.signal();
638             })
639         }
640
641         arc.access_cond(|state, cond| {
642             c.take().send(());
643             assert!(!*state);
644             while !*state {
645                 cond.wait();
646             }
647         })
648     }
649
650     #[test] #[should_fail]
651     fn test_arc_condvar_poison() {
652         let arc = ~MutexArc::new(1);
653         let arc2 = ~arc.clone();
654         let (p, c) = comm::stream();
655
656         do spawn {
657             let _ = p.recv();
658             arc2.access_cond(|one, cond| {
659                 cond.signal();
660                 // Parent should fail when it wakes up.
661                 assert_eq!(*one, 0);
662             })
663         }
664
665         arc.access_cond(|one, cond| {
666             c.send(());
667             while *one == 1 {
668                 cond.wait();
669             }
670         })
671     }
672
673     #[test] #[should_fail]
674     fn test_mutex_arc_poison() {
675         let arc = ~MutexArc::new(1);
676         let arc2 = ~arc.clone();
677         do task::try || {
678             arc2.access(|one| {
679                 assert_eq!(*one, 2);
680             })
681         };
682         arc.access(|one| {
683             assert_eq!(*one, 1);
684         })
685     }
686
687     #[test] #[should_fail]
688     pub fn test_mutex_arc_unwrap_poison() {
689         let arc = MutexArc::new(1);
690         let arc2 = ~(&arc).clone();
691         let (p, c) = comm::stream();
692         do task::spawn {
693             arc2.access(|one| {
694                 c.send(());
695                 assert!(*one == 2);
696             })
697         }
698         let _ = p.recv();
699         let one = arc.unwrap();
700         assert!(one == 1);
701     }
702
703     #[test]
704     fn test_unsafe_mutex_arc_nested() {
705         unsafe {
706             // Tests nested mutexes and access
707             // to underlaying data.
708             let arc = ~MutexArc::new(1);
709             let arc2 = ~MutexArc::new(*arc);
710             do task::spawn || {
711                 (*arc2).unsafe_access(|mutex| {
712                     (*mutex).access(|one| {
713                         assert!(*one == 1);
714                     })
715                 })
716             };
717         }
718     }
719
720     #[test] #[should_fail]
721     fn test_rw_arc_poison_wr() {
722         let arc = RWArc::new(1);
723         let arc2 = arc.clone();
724         do task::try {
725             arc2.write(|one| {
726                 assert_eq!(*one, 2);
727             })
728         };
729         arc.read(|one| {
730             assert_eq!(*one, 1);
731         })
732     }
733
734     #[test] #[should_fail]
735     fn test_rw_arc_poison_ww() {
736         let arc = RWArc::new(1);
737         let arc2 = arc.clone();
738         do task::try {
739             arc2.write(|one| {
740                 assert_eq!(*one, 2);
741             })
742         };
743         arc.write(|one| {
744             assert_eq!(*one, 1);
745         })
746     }
747     #[test] #[should_fail]
748     fn test_rw_arc_poison_dw() {
749         let arc = RWArc::new(1);
750         let arc2 = arc.clone();
751         do task::try {
752             arc2.write_downgrade(|mut write_mode| {
753                 write_mode.write(|one| {
754                     assert_eq!(*one, 2);
755                 })
756             })
757         };
758         arc.write(|one| {
759             assert_eq!(*one, 1);
760         })
761     }
762     #[test]
763     fn test_rw_arc_no_poison_rr() {
764         let arc = RWArc::new(1);
765         let arc2 = arc.clone();
766         do task::try {
767             arc2.read(|one| {
768                 assert_eq!(*one, 2);
769             })
770         };
771         arc.read(|one| {
772             assert_eq!(*one, 1);
773         })
774     }
775     #[test]
776     fn test_rw_arc_no_poison_rw() {
777         let arc = RWArc::new(1);
778         let arc2 = arc.clone();
779         do task::try {
780             arc2.read(|one| {
781                 assert_eq!(*one, 2);
782             })
783         };
784         arc.write(|one| {
785             assert_eq!(*one, 1);
786         })
787     }
788     #[test]
789     fn test_rw_arc_no_poison_dr() {
790         let arc = RWArc::new(1);
791         let arc2 = arc.clone();
792         do task::try {
793             arc2.write_downgrade(|write_mode| {
794                 let read_mode = arc2.downgrade(write_mode);
795                 read_mode.read(|one| {
796                     assert_eq!(*one, 2);
797                 })
798             })
799         };
800         arc.write(|one| {
801             assert_eq!(*one, 1);
802         })
803     }
804     #[test]
805     fn test_rw_arc() {
806         let arc = RWArc::new(0);
807         let arc2 = arc.clone();
808         let (p, c) = comm::stream();
809
810         do task::spawn {
811             arc2.write(|num| {
812                 10.times(|| {
813                     let tmp = *num;
814                     *num = -1;
815                     task::deschedule();
816                     *num = tmp + 1;
817                 });
818                 c.send(());
819             })
820         }
821
822         // Readers try to catch the writer in the act
823         let mut children = ~[];
824         5.times(|| {
825             let arc3 = arc.clone();
826             let mut builder = task::task();
827             children.push(builder.future_result());
828             do builder.spawn {
829                 arc3.read(|num| {
830                     assert!(*num >= 0);
831                 })
832             }
833         });
834
835         // Wait for children to pass their asserts
836         for r in children.iter() {
837             r.recv();
838         }
839
840         // Wait for writer to finish
841         p.recv();
842         arc.read(|num| {
843             assert_eq!(*num, 10);
844         })
845     }
846     #[test]
847     fn test_rw_downgrade() {
848         // (1) A downgrader gets in write mode and does cond.wait.
849         // (2) A writer gets in write mode, sets state to 42, and does signal.
850         // (3) Downgrader wakes, sets state to 31337.
851         // (4) tells writer and all other readers to contend as it downgrades.
852         // (5) Writer attempts to set state back to 42, while downgraded task
853         //     and all reader tasks assert that it's 31337.
854         let arc = RWArc::new(0);
855
856         // Reader tasks
857         let mut reader_convos = ~[];
858         10.times(|| {
859             let ((rp1, rc1), (rp2, rc2)) = (comm::stream(), comm::stream());
860             reader_convos.push((rc1, rp2));
861             let arcn = arc.clone();
862             do task::spawn {
863                 rp1.recv(); // wait for downgrader to give go-ahead
864                 arcn.read(|state| {
865                     assert_eq!(*state, 31337);
866                     rc2.send(());
867                 })
868             }
869         });
870
871         // Writer task
872         let arc2 = arc.clone();
873         let ((wp1, wc1), (wp2, wc2)) = (comm::stream(), comm::stream());
874         do task::spawn || {
875             wp1.recv();
876             arc2.write_cond(|state, cond| {
877                 assert_eq!(*state, 0);
878                 *state = 42;
879                 cond.signal();
880             });
881             wp1.recv();
882             arc2.write(|state| {
883                 // This shouldn't happen until after the downgrade read
884                 // section, and all other readers, finish.
885                 assert_eq!(*state, 31337);
886                 *state = 42;
887             });
888             wc2.send(());
889         }
890
891         // Downgrader (us)
892         arc.write_downgrade(|mut write_mode| {
893             write_mode.write_cond(|state, cond| {
894                 wc1.send(()); // send to another writer who will wake us up
895                 while *state == 0 {
896                     cond.wait();
897                 }
898                 assert_eq!(*state, 42);
899                 *state = 31337;
900                 // send to other readers
901                 for &(ref rc, _) in reader_convos.iter() {
902                     rc.send(())
903                 }
904             });
905             let read_mode = arc.downgrade(write_mode);
906             read_mode.read(|state| {
907                 // complete handshake with other readers
908                 for &(_, ref rp) in reader_convos.iter() {
909                     rp.recv()
910                 }
911                 wc1.send(()); // tell writer to try again
912                 assert_eq!(*state, 31337);
913             });
914         });
915
916         wp2.recv(); // complete handshake with writer
917     }
918     #[cfg(test)]
919     fn test_rw_write_cond_downgrade_read_race_helper() {
920         // Tests that when a downgrader hands off the "reader cloud" lock
921         // because of a contending reader, a writer can't race to get it
922         // instead, which would result in readers_and_writers. This tests
923         // the sync module rather than this one, but it's here because an
924         // rwarc gives us extra shared state to help check for the race.
925         // If you want to see this test fail, go to sync.rs and replace the
926         // line in RWLock::write_cond() that looks like:
927         //     "blk(&Condvar { order: opt_lock, ..*cond })"
928         // with just "blk(cond)".
929         let x = RWArc::new(true);
930         let (wp, wc) = comm::stream();
931
932         // writer task
933         let xw = x.clone();
934         do task::spawn {
935             xw.write_cond(|state, c| {
936                 wc.send(()); // tell downgrader it's ok to go
937                 c.wait();
938                 // The core of the test is here: the condvar reacquire path
939                 // must involve order_lock, so that it cannot race with a reader
940                 // trying to receive the "reader cloud lock hand-off".
941                 *state = false;
942             })
943         }
944
945         wp.recv(); // wait for writer to get in
946
947         x.write_downgrade(|mut write_mode| {
948             write_mode.write_cond(|state, c| {
949                 assert!(*state);
950                 // make writer contend in the cond-reacquire path
951                 c.signal();
952             });
953             // make a reader task to trigger the "reader cloud lock" handoff
954             let xr = x.clone();
955             let (rp, rc) = comm::stream();
956             do task::spawn {
957                 rc.send(());
958                 xr.read(|_state| { })
959             }
960             rp.recv(); // wait for reader task to exist
961
962             let read_mode = x.downgrade(write_mode);
963             read_mode.read(|state| {
964                 // if writer mistakenly got in, make sure it mutates state
965                 // before we assert on it
966                 5.times(|| task::deschedule());
967                 // make sure writer didn't get in.
968                 assert!(*state);
969             })
970         });
971     }
972     #[test]
973     fn test_rw_write_cond_downgrade_read_race() {
974         // Ideally the above test case would have deschedule statements in it that
975         // helped to expose the race nearly 100% of the time... but adding
976         // deschedules in the intuitively-right locations made it even less likely,
977         // and I wasn't sure why :( . This is a mediocre "next best" option.
978         8.times(|| test_rw_write_cond_downgrade_read_race_helper());
979     }
980 }