]> git.lizzy.rs Git - rust.git/blob - src/libsync/lock.rs
librustc: Remove the fallback to `int` from typechecking.
[rust.git] / src / libsync / lock.rs
1 // Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Wrappers for safe, shared, mutable memory between tasks
12 //!
13 //! The wrappers in this module build on the primitives from `sync::raw` to
14 //! provide safe interfaces around using the primitive locks. These primitives
15 //! implement a technique called "poisoning" where when a task failed with a
16 //! held lock, all future attempts to use the lock will fail.
17 //!
18 //! For example, if two tasks are contending on a mutex and one of them fails
19 //! after grabbing the lock, the second task will immediately fail because the
20 //! lock is now poisoned.
21
22 use core::prelude::*;
23
24 use core::ty::Unsafe;
25 use rustrt::local::Local;
26 use rustrt::task::Task;
27
28 use raw;
29
30 /****************************************************************************
31  * Poisoning helpers
32  ****************************************************************************/
33
34 struct PoisonOnFail<'a> {
35     flag: &'a mut bool,
36     failed: bool,
37 }
38
39 fn failing() -> bool {
40     Local::borrow(None::<Task>).unwinder.unwinding()
41 }
42
43 impl<'a> PoisonOnFail<'a> {
44     fn check(flag: bool, name: &str) {
45         if flag {
46             fail!("Poisoned {} - another task failed inside!", name);
47         }
48     }
49
50     fn new<'a>(flag: &'a mut bool, name: &str) -> PoisonOnFail<'a> {
51         PoisonOnFail::check(*flag, name);
52         PoisonOnFail {
53             flag: flag,
54             failed: failing()
55         }
56     }
57 }
58
59 #[unsafe_destructor]
60 impl<'a> Drop for PoisonOnFail<'a> {
61     fn drop(&mut self) {
62         if !self.failed && failing() {
63             *self.flag = true;
64         }
65     }
66 }
67
68 /****************************************************************************
69  * Condvar
70  ****************************************************************************/
71
72 enum Inner<'a> {
73     InnerMutex(raw::MutexGuard<'a>),
74     InnerRWLock(raw::RWLockWriteGuard<'a>),
75 }
76
77 impl<'b> Inner<'b> {
78     fn cond<'a>(&'a self) -> &'a raw::Condvar<'b> {
79         match *self {
80             InnerMutex(ref m) => &m.cond,
81             InnerRWLock(ref m) => &m.cond,
82         }
83     }
84 }
85
86 /// A condition variable, a mechanism for unlock-and-descheduling and
87 /// signaling, for use with the lock types.
88 pub struct Condvar<'a> {
89     name: &'static str,
90     // n.b. Inner must be after PoisonOnFail because we must set the poison flag
91     //      *inside* the mutex, and struct fields are destroyed top-to-bottom
92     //      (destroy the lock guard last).
93     poison: PoisonOnFail<'a>,
94     inner: Inner<'a>,
95 }
96
97 impl<'a> Condvar<'a> {
98     /// Atomically exit the associated lock and block until a signal is sent.
99     ///
100     /// wait() is equivalent to wait_on(0).
101     ///
102     /// # Failure
103     ///
104     /// A task which is killed while waiting on a condition variable will wake
105     /// up, fail, and unlock the associated lock as it unwinds.
106     #[inline]
107     pub fn wait(&self) { self.wait_on(0) }
108
109     /// Atomically exit the associated lock and block on a specified condvar
110     /// until a signal is sent on that same condvar.
111     ///
112     /// The associated lock must have been initialised with an appropriate
113     /// number of condvars. The condvar_id must be between 0 and num_condvars-1
114     /// or else this call will fail.
115     #[inline]
116     pub fn wait_on(&self, condvar_id: uint) {
117         assert!(!*self.poison.flag);
118         self.inner.cond().wait_on(condvar_id);
119         // This is why we need to wrap sync::condvar.
120         PoisonOnFail::check(*self.poison.flag, self.name);
121     }
122
123     /// Wake up a blocked task. Returns false if there was no blocked task.
124     #[inline]
125     pub fn signal(&self) -> bool { self.signal_on(0) }
126
127     /// Wake up a blocked task on a specified condvar (as
128     /// sync::cond.signal_on). Returns false if there was no blocked task.
129     #[inline]
130     pub fn signal_on(&self, condvar_id: uint) -> bool {
131         assert!(!*self.poison.flag);
132         self.inner.cond().signal_on(condvar_id)
133     }
134
135     /// Wake up all blocked tasks. Returns the number of tasks woken.
136     #[inline]
137     pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
138
139     /// Wake up all blocked tasks on a specified condvar (as
140     /// sync::cond.broadcast_on). Returns the number of tasks woken.
141     #[inline]
142     pub fn broadcast_on(&self, condvar_id: uint) -> uint {
143         assert!(!*self.poison.flag);
144         self.inner.cond().broadcast_on(condvar_id)
145     }
146 }
147
148 /****************************************************************************
149  * Mutex
150  ****************************************************************************/
151
152 /// A wrapper type which provides synchronized access to the underlying data, of
153 /// type `T`. A mutex always provides exclusive access, and concurrent requests
154 /// will block while the mutex is already locked.
155 ///
156 /// # Example
157 ///
158 /// ```
159 /// use sync::{Mutex, Arc};
160 ///
161 /// let mutex = Arc::new(Mutex::new(1));
162 /// let mutex2 = mutex.clone();
163 ///
164 /// spawn(proc() {
165 ///     let mut val = mutex2.lock();
166 ///     *val += 1;
167 ///     val.cond.signal();
168 /// });
169 ///
170 /// let mut value = mutex.lock();
171 /// while *value != 2 {
172 ///     value.cond.wait();
173 /// }
174 /// ```
175 pub struct Mutex<T> {
176     lock: raw::Mutex,
177     failed: Unsafe<bool>,
178     data: Unsafe<T>,
179 }
180
181 /// An guard which is created by locking a mutex. Through this guard the
182 /// underlying data can be accessed.
183 pub struct MutexGuard<'a, T> {
184     // FIXME #12808: strange name to try to avoid interfering with
185     // field accesses of the contained type via Deref
186     _data: &'a mut T,
187     /// Inner condition variable connected to the locked mutex that this guard
188     /// was created from. This can be used for atomic-unlock-and-deschedule.
189     pub cond: Condvar<'a>,
190 }
191
192 impl<T: Send> Mutex<T> {
193     /// Creates a new mutex to protect the user-supplied data.
194     pub fn new(user_data: T) -> Mutex<T> {
195         Mutex::new_with_condvars(user_data, 1)
196     }
197
198     /// Create a new mutex, with a specified number of associated condvars.
199     ///
200     /// This will allow calling wait_on/signal_on/broadcast_on with condvar IDs
201     /// between 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be
202     /// allowed but any operations on the condvar will fail.)
203     pub fn new_with_condvars(user_data: T, num_condvars: uint) -> Mutex<T> {
204         Mutex {
205             lock: raw::Mutex::new_with_condvars(num_condvars),
206             failed: Unsafe::new(false),
207             data: Unsafe::new(user_data),
208         }
209     }
210
211     /// Access the underlying mutable data with mutual exclusion from other
212     /// tasks. The returned value is an RAII guard which will unlock the mutex
213     /// when dropped. All concurrent tasks attempting to lock the mutex will
214     /// block while the returned value is still alive.
215     ///
216     /// # Failure
217     ///
218     /// Failing while inside the Mutex will unlock the Mutex while unwinding, so
219     /// that other tasks won't block forever. It will also poison the Mutex:
220     /// any tasks that subsequently try to access it (including those already
221     /// blocked on the mutex) will also fail immediately.
222     #[inline]
223     pub fn lock<'a>(&'a self) -> MutexGuard<'a, T> {
224         let guard = self.lock.lock();
225
226         // These two accesses are safe because we're guranteed at this point
227         // that we have exclusive access to this mutex. We are indeed able to
228         // promote ourselves from &Mutex to `&mut T`
229         let poison = unsafe { &mut *self.failed.get() };
230         let data = unsafe { &mut *self.data.get() };
231
232         MutexGuard {
233             _data: data,
234             cond: Condvar {
235                 name: "Mutex",
236                 poison: PoisonOnFail::new(poison, "Mutex"),
237                 inner: InnerMutex(guard),
238             },
239         }
240     }
241 }
242
243 impl<'a, T: Send> Deref<T> for MutexGuard<'a, T> {
244     fn deref<'a>(&'a self) -> &'a T { &*self._data }
245 }
246 impl<'a, T: Send> DerefMut<T> for MutexGuard<'a, T> {
247     fn deref_mut<'a>(&'a mut self) -> &'a mut T { &mut *self._data }
248 }
249
250 /****************************************************************************
251  * R/W lock protected lock
252  ****************************************************************************/
253
254 /// A dual-mode reader-writer lock. The data can be accessed mutably or
255 /// immutably, and immutably-accessing tasks may run concurrently.
256 ///
257 /// # Example
258 ///
259 /// ```
260 /// use sync::{RWLock, Arc};
261 ///
262 /// let lock1 = Arc::new(RWLock::new(1i));
263 /// let lock2 = lock1.clone();
264 ///
265 /// spawn(proc() {
266 ///     let mut val = lock2.write();
267 ///     *val = 3;
268 ///     let val = val.downgrade();
269 ///     println!("{}", *val);
270 /// });
271 ///
272 /// let val = lock1.read();
273 /// println!("{}", *val);
274 /// ```
275 pub struct RWLock<T> {
276     lock: raw::RWLock,
277     failed: Unsafe<bool>,
278     data: Unsafe<T>,
279 }
280
281 /// A guard which is created by locking an rwlock in write mode. Through this
282 /// guard the underlying data can be accessed.
283 pub struct RWLockWriteGuard<'a, T> {
284     // FIXME #12808: strange name to try to avoid interfering with
285     // field accesses of the contained type via Deref
286     _data: &'a mut T,
287     /// Inner condition variable that can be used to sleep on the write mode of
288     /// this rwlock.
289     pub cond: Condvar<'a>,
290 }
291
292 /// A guard which is created by locking an rwlock in read mode. Through this
293 /// guard the underlying data can be accessed.
294 pub struct RWLockReadGuard<'a, T> {
295     // FIXME #12808: strange names to try to avoid interfering with
296     // field accesses of the contained type via Deref
297     _data: &'a T,
298     _guard: raw::RWLockReadGuard<'a>,
299 }
300
301 impl<T: Send + Share> RWLock<T> {
302     /// Create a reader/writer lock with the supplied data.
303     pub fn new(user_data: T) -> RWLock<T> {
304         RWLock::new_with_condvars(user_data, 1)
305     }
306
307     /// Create a reader/writer lock with the supplied data and a specified number
308     /// of condvars (as sync::RWLock::new_with_condvars).
309     pub fn new_with_condvars(user_data: T, num_condvars: uint) -> RWLock<T> {
310         RWLock {
311             lock: raw::RWLock::new_with_condvars(num_condvars),
312             failed: Unsafe::new(false),
313             data: Unsafe::new(user_data),
314         }
315     }
316
317     /// Access the underlying data mutably. Locks the rwlock in write mode;
318     /// other readers and writers will block.
319     ///
320     /// # Failure
321     ///
322     /// Failing while inside the lock will unlock the lock while unwinding, so
323     /// that other tasks won't block forever. As Mutex.lock, it will also poison
324     /// the lock, so subsequent readers and writers will both also fail.
325     #[inline]
326     pub fn write<'a>(&'a self) -> RWLockWriteGuard<'a, T> {
327         let guard = self.lock.write();
328
329         // These two accesses are safe because we're guranteed at this point
330         // that we have exclusive access to this rwlock. We are indeed able to
331         // promote ourselves from &RWLock to `&mut T`
332         let poison = unsafe { &mut *self.failed.get() };
333         let data = unsafe { &mut *self.data.get() };
334
335         RWLockWriteGuard {
336             _data: data,
337             cond: Condvar {
338                 name: "RWLock",
339                 poison: PoisonOnFail::new(poison, "RWLock"),
340                 inner: InnerRWLock(guard),
341             },
342         }
343     }
344
345     /// Access the underlying data immutably. May run concurrently with other
346     /// reading tasks.
347     ///
348     /// # Failure
349     ///
350     /// Failing will unlock the lock while unwinding. However, unlike all other
351     /// access modes, this will not poison the lock.
352     pub fn read<'a>(&'a self) -> RWLockReadGuard<'a, T> {
353         let guard = self.lock.read();
354         PoisonOnFail::check(unsafe { *self.failed.get() }, "RWLock");
355         RWLockReadGuard {
356             _guard: guard,
357             _data: unsafe { &*self.data.get() },
358         }
359     }
360 }
361
362 impl<'a, T: Send + Share> RWLockWriteGuard<'a, T> {
363     /// Consumes this write lock token, returning a new read lock token.
364     ///
365     /// This will allow pending readers to come into the lock.
366     pub fn downgrade(self) -> RWLockReadGuard<'a, T> {
367         let RWLockWriteGuard { _data, cond } = self;
368         // convert the data to read-only explicitly
369         let data = &*_data;
370         let guard = match cond.inner {
371             InnerMutex(..) => unreachable!(),
372             InnerRWLock(guard) => guard.downgrade()
373         };
374         RWLockReadGuard { _guard: guard, _data: data }
375     }
376 }
377
378 impl<'a, T: Send + Share> Deref<T> for RWLockReadGuard<'a, T> {
379     fn deref<'a>(&'a self) -> &'a T { self._data }
380 }
381 impl<'a, T: Send + Share> Deref<T> for RWLockWriteGuard<'a, T> {
382     fn deref<'a>(&'a self) -> &'a T { &*self._data }
383 }
384 impl<'a, T: Send + Share> DerefMut<T> for RWLockWriteGuard<'a, T> {
385     fn deref_mut<'a>(&'a mut self) -> &'a mut T { &mut *self._data }
386 }
387
388 /****************************************************************************
389  * Barrier
390  ****************************************************************************/
391
392 /// A barrier enables multiple tasks to synchronize the beginning
393 /// of some computation.
394 ///
395 /// ```rust
396 /// use sync::{Arc, Barrier};
397 ///
398 /// let barrier = Arc::new(Barrier::new(10));
399 /// for _ in range(0u, 10) {
400 ///     let c = barrier.clone();
401 ///     // The same messages will be printed together.
402 ///     // You will NOT see any interleaving.
403 ///     spawn(proc() {
404 ///         println!("before wait");
405 ///         c.wait();
406 ///         println!("after wait");
407 ///     });
408 /// }
409 /// ```
410 pub struct Barrier {
411     lock: Mutex<BarrierState>,
412     num_tasks: uint,
413 }
414
415 // The inner state of a double barrier
416 struct BarrierState {
417     count: uint,
418     generation_id: uint,
419 }
420
421 impl Barrier {
422     /// Create a new barrier that can block a given number of tasks.
423     pub fn new(num_tasks: uint) -> Barrier {
424         Barrier {
425             lock: Mutex::new(BarrierState {
426                 count: 0,
427                 generation_id: 0,
428             }),
429             num_tasks: num_tasks,
430         }
431     }
432
433     /// Block the current task until a certain number of tasks is waiting.
434     pub fn wait(&self) {
435         let mut lock = self.lock.lock();
436         let local_gen = lock.generation_id;
437         lock.count += 1;
438         if lock.count < self.num_tasks {
439             // We need a while loop to guard against spurious wakeups.
440             // http://en.wikipedia.org/wiki/Spurious_wakeup
441             while local_gen == lock.generation_id &&
442                   lock.count < self.num_tasks {
443                 lock.cond.wait();
444             }
445         } else {
446             lock.count = 0;
447             lock.generation_id += 1;
448             lock.cond.broadcast();
449         }
450     }
451 }
452
453 /****************************************************************************
454  * Tests
455  ****************************************************************************/
456
457 #[cfg(test)]
458 mod tests {
459     use std::prelude::*;
460     use std::comm::Empty;
461     use std::task;
462     use std::task::try_future;
463
464     use Arc;
465     use super::{Mutex, Barrier, RWLock};
466
467     #[test]
468     fn test_mutex_arc_condvar() {
469         let arc = Arc::new(Mutex::new(false));
470         let arc2 = arc.clone();
471         let (tx, rx) = channel();
472         task::spawn(proc() {
473             // wait until parent gets in
474             rx.recv();
475             let mut lock = arc2.lock();
476             *lock = true;
477             lock.cond.signal();
478         });
479
480         let lock = arc.lock();
481         tx.send(());
482         assert!(!*lock);
483         while !*lock {
484             lock.cond.wait();
485         }
486     }
487
488     #[test] #[should_fail]
489     fn test_arc_condvar_poison() {
490         let arc = Arc::new(Mutex::new(1));
491         let arc2 = arc.clone();
492         let (tx, rx) = channel();
493
494         spawn(proc() {
495             rx.recv();
496             let lock = arc2.lock();
497             lock.cond.signal();
498             // Parent should fail when it wakes up.
499             fail!();
500         });
501
502         let lock = arc.lock();
503         tx.send(());
504         while *lock == 1 {
505             lock.cond.wait();
506         }
507     }
508
509     #[test] #[should_fail]
510     fn test_mutex_arc_poison() {
511         let arc = Arc::new(Mutex::new(1i));
512         let arc2 = arc.clone();
513         let _ = task::try(proc() {
514             let lock = arc2.lock();
515             assert_eq!(*lock, 2);
516         });
517         let lock = arc.lock();
518         assert_eq!(*lock, 1);
519     }
520
521     #[test]
522     fn test_mutex_arc_nested() {
523         // Tests nested mutexes and access
524         // to underlying data.
525         let arc = Arc::new(Mutex::new(1i));
526         let arc2 = Arc::new(Mutex::new(arc));
527         task::spawn(proc() {
528             let lock = arc2.lock();
529             let lock2 = lock.deref().lock();
530             assert_eq!(*lock2, 1);
531         });
532     }
533
534     #[test]
535     fn test_mutex_arc_access_in_unwind() {
536         let arc = Arc::new(Mutex::new(1i));
537         let arc2 = arc.clone();
538         let _ = task::try::<()>(proc() {
539             struct Unwinder {
540                 i: Arc<Mutex<int>>,
541             }
542             impl Drop for Unwinder {
543                 fn drop(&mut self) {
544                     let mut lock = self.i.lock();
545                     *lock += 1;
546                 }
547             }
548             let _u = Unwinder { i: arc2 };
549             fail!();
550         });
551         let lock = arc.lock();
552         assert_eq!(*lock, 2);
553     }
554
555     #[test] #[should_fail]
556     fn test_rw_arc_poison_wr() {
557         let arc = Arc::new(RWLock::new(1i));
558         let arc2 = arc.clone();
559         let _ = task::try(proc() {
560             let lock = arc2.write();
561             assert_eq!(*lock, 2);
562         });
563         let lock = arc.read();
564         assert_eq!(*lock, 1);
565     }
566     #[test] #[should_fail]
567     fn test_rw_arc_poison_ww() {
568         let arc = Arc::new(RWLock::new(1i));
569         let arc2 = arc.clone();
570         let _ = task::try(proc() {
571             let lock = arc2.write();
572             assert_eq!(*lock, 2);
573         });
574         let lock = arc.write();
575         assert_eq!(*lock, 1);
576     }
577     #[test]
578     fn test_rw_arc_no_poison_rr() {
579         let arc = Arc::new(RWLock::new(1i));
580         let arc2 = arc.clone();
581         let _ = task::try(proc() {
582             let lock = arc2.read();
583             assert_eq!(*lock, 2);
584         });
585         let lock = arc.read();
586         assert_eq!(*lock, 1);
587     }
588     #[test]
589     fn test_rw_arc_no_poison_rw() {
590         let arc = Arc::new(RWLock::new(1i));
591         let arc2 = arc.clone();
592         let _ = task::try(proc() {
593             let lock = arc2.read();
594             assert_eq!(*lock, 2);
595         });
596         let lock = arc.write();
597         assert_eq!(*lock, 1);
598     }
599     #[test]
600     fn test_rw_arc_no_poison_dr() {
601         let arc = Arc::new(RWLock::new(1i));
602         let arc2 = arc.clone();
603         let _ = task::try(proc() {
604             let lock = arc2.write().downgrade();
605             assert_eq!(*lock, 2);
606         });
607         let lock = arc.write();
608         assert_eq!(*lock, 1);
609     }
610
611     #[test]
612     fn test_rw_arc() {
613         let arc = Arc::new(RWLock::new(0i));
614         let arc2 = arc.clone();
615         let (tx, rx) = channel();
616
617         task::spawn(proc() {
618             let mut lock = arc2.write();
619             for _ in range(0u, 10) {
620                 let tmp = *lock;
621                 *lock = -1;
622                 task::deschedule();
623                 *lock = tmp + 1;
624             }
625             tx.send(());
626         });
627
628         // Readers try to catch the writer in the act
629         let mut children = Vec::new();
630         for _ in range(0u, 5) {
631             let arc3 = arc.clone();
632             children.push(try_future(proc() {
633                 let lock = arc3.read();
634                 assert!(*lock >= 0);
635             }));
636         }
637
638         // Wait for children to pass their asserts
639         for r in children.mut_iter() {
640             assert!(r.get_ref().is_ok());
641         }
642
643         // Wait for writer to finish
644         rx.recv();
645         let lock = arc.read();
646         assert_eq!(*lock, 10);
647     }
648
649     #[test]
650     fn test_rw_arc_access_in_unwind() {
651         let arc = Arc::new(RWLock::new(1i));
652         let arc2 = arc.clone();
653         let _ = task::try::<()>(proc() {
654             struct Unwinder {
655                 i: Arc<RWLock<int>>,
656             }
657             impl Drop for Unwinder {
658                 fn drop(&mut self) {
659                     let mut lock = self.i.write();
660                     *lock += 1;
661                 }
662             }
663             let _u = Unwinder { i: arc2 };
664             fail!();
665         });
666         let lock = arc.read();
667         assert_eq!(*lock, 2);
668     }
669
670     #[test]
671     fn test_rw_downgrade() {
672         // (1) A downgrader gets in write mode and does cond.wait.
673         // (2) A writer gets in write mode, sets state to 42, and does signal.
674         // (3) Downgrader wakes, sets state to 31337.
675         // (4) tells writer and all other readers to contend as it downgrades.
676         // (5) Writer attempts to set state back to 42, while downgraded task
677         //     and all reader tasks assert that it's 31337.
678         let arc = Arc::new(RWLock::new(0i));
679
680         // Reader tasks
681         let mut reader_convos = Vec::new();
682         for _ in range(0u, 10) {
683             let ((tx1, rx1), (tx2, rx2)) = (channel(), channel());
684             reader_convos.push((tx1, rx2));
685             let arcn = arc.clone();
686             task::spawn(proc() {
687                 rx1.recv(); // wait for downgrader to give go-ahead
688                 let lock = arcn.read();
689                 assert_eq!(*lock, 31337);
690                 tx2.send(());
691             });
692         }
693
694         // Writer task
695         let arc2 = arc.clone();
696         let ((tx1, rx1), (tx2, rx2)) = (channel(), channel());
697         task::spawn(proc() {
698             rx1.recv();
699             {
700                 let mut lock = arc2.write();
701                 assert_eq!(*lock, 0);
702                 *lock = 42;
703                 lock.cond.signal();
704             }
705             rx1.recv();
706             {
707                 let mut lock = arc2.write();
708                 // This shouldn't happen until after the downgrade read
709                 // section, and all other readers, finish.
710                 assert_eq!(*lock, 31337);
711                 *lock = 42;
712             }
713             tx2.send(());
714         });
715
716         // Downgrader (us)
717         let mut lock = arc.write();
718         tx1.send(()); // send to another writer who will wake us up
719         while *lock == 0 {
720             lock.cond.wait();
721         }
722         assert_eq!(*lock, 42);
723         *lock = 31337;
724         // send to other readers
725         for &(ref mut rc, _) in reader_convos.mut_iter() {
726             rc.send(())
727         }
728         let lock = lock.downgrade();
729         // complete handshake with other readers
730         for &(_, ref mut rp) in reader_convos.mut_iter() {
731             rp.recv()
732         }
733         tx1.send(()); // tell writer to try again
734         assert_eq!(*lock, 31337);
735         drop(lock);
736
737         rx2.recv(); // complete handshake with writer
738     }
739
740     #[cfg(test)]
741     fn test_rw_write_cond_downgrade_read_race_helper() {
742         // Tests that when a downgrader hands off the "reader cloud" lock
743         // because of a contending reader, a writer can't race to get it
744         // instead, which would result in readers_and_writers. This tests
745         // the raw module rather than this one, but it's here because an
746         // rwarc gives us extra shared state to help check for the race.
747         let x = Arc::new(RWLock::new(true));
748         let (tx, rx) = channel();
749
750         // writer task
751         let xw = x.clone();
752         task::spawn(proc() {
753             let mut lock = xw.write();
754             tx.send(()); // tell downgrader it's ok to go
755             lock.cond.wait();
756             // The core of the test is here: the condvar reacquire path
757             // must involve order_lock, so that it cannot race with a reader
758             // trying to receive the "reader cloud lock hand-off".
759             *lock = false;
760         });
761
762         rx.recv(); // wait for writer to get in
763
764         let lock = x.write();
765         assert!(*lock);
766         // make writer contend in the cond-reacquire path
767         lock.cond.signal();
768         // make a reader task to trigger the "reader cloud lock" handoff
769         let xr = x.clone();
770         let (tx, rx) = channel();
771         task::spawn(proc() {
772             tx.send(());
773             drop(xr.read());
774         });
775         rx.recv(); // wait for reader task to exist
776
777         let lock = lock.downgrade();
778         // if writer mistakenly got in, make sure it mutates state
779         // before we assert on it
780         for _ in range(0u, 5) { task::deschedule(); }
781         // make sure writer didn't get in.
782         assert!(*lock);
783     }
784     #[test]
785     fn test_rw_write_cond_downgrade_read_race() {
786         // Ideally the above test case would have deschedule statements in it
787         // that helped to expose the race nearly 100% of the time... but adding
788         // deschedules in the intuitively-right locations made it even less
789         // likely, and I wasn't sure why :( . This is a mediocre "next best"
790         // option.
791         for _ in range(0u, 8) {
792             test_rw_write_cond_downgrade_read_race_helper();
793         }
794     }
795
796     /************************************************************************
797      * Barrier tests
798      ************************************************************************/
799     #[test]
800     fn test_barrier() {
801         let barrier = Arc::new(Barrier::new(10));
802         let (tx, rx) = channel();
803
804         for _ in range(0u, 9) {
805             let c = barrier.clone();
806             let tx = tx.clone();
807             spawn(proc() {
808                 c.wait();
809                 tx.send(true);
810             });
811         }
812
813         // At this point, all spawned tasks should be blocked,
814         // so we shouldn't get anything from the port
815         assert!(match rx.try_recv() {
816             Err(Empty) => true,
817             _ => false,
818         });
819
820         barrier.wait();
821         // Now, the barrier is cleared and we should get data.
822         for _ in range(0u, 9) {
823             rx.recv();
824         }
825     }
826 }