1 // Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! Raw concurrency primitives you know and love.
13 //! These primitives are not recommended for general use, but are provided for
14 //! flavorful use-cases. It is recommended to use the types at the top of the
15 //! `sync` crate which wrap values directly and provide safer abstractions for
21 use core::finally::Finally;
22 use core::kinds::marker;
24 use core::cell::UnsafeCell;
25 use collections::{Vec, MutableSeq};
28 use comm::{Receiver, Sender, channel};
30 /****************************************************************************
32 ****************************************************************************/
34 // Each waiting task receives on one of these.
35 type WaitEnd = Receiver<()>;
36 type SignalEnd = Sender<()>;
37 // A doubly-ended queue of waiting tasks.
39 head: Receiver<SignalEnd>,
40 tail: Sender<SignalEnd>,
44 fn new() -> WaitQueue {
45 let (block_tail, block_head) = channel();
46 WaitQueue { head: block_head, tail: block_tail }
49 // Signals one live task from the queue.
50 fn signal(&self) -> bool {
51 match self.head.try_recv() {
53 // Send a wakeup signal. If the waiter was killed, its port will
54 // have closed. Keep trying until we get a live task.
55 if ch.send_opt(()).is_ok() {
65 fn broadcast(&self) -> uint {
68 match self.head.try_recv() {
70 if ch.send_opt(()).is_ok() {
80 fn wait_end(&self) -> WaitEnd {
81 let (signal_end, wait_end) = channel();
82 self.tail.send(signal_end);
87 // The building-block used to make semaphores, mutexes, and rwlocks.
90 // n.b, we need Sem to be `Sync`, but the WaitQueue type is not send/share
91 // (for good reason). We have an internal invariant on this semaphore,
92 // however, that the queue is never accessed outside of a locked
94 inner: UnsafeCell<SemInner<Q>>
100 // Can be either unit or another waitqueue. Some sems shouldn't come with
101 // a condition variable attached, others should.
106 struct SemGuard<'a, Q:'a> {
110 impl<Q: Send> Sem<Q> {
111 fn new(count: int, q: Q) -> Sem<Q> {
113 "semaphores cannot be initialized with negative values");
115 lock: mutex::Mutex::new(),
116 inner: UnsafeCell::new(SemInner {
117 waiters: WaitQueue::new(),
124 unsafe fn with(&self, f: |&mut SemInner<Q>|) {
125 let _g = self.lock.lock();
126 // This &mut is safe because, due to the lock, we are the only one who can touch the data
127 f(&mut *self.inner.get())
130 pub fn acquire(&self) {
132 let mut waiter_nobe = None;
136 // Create waiter nobe, enqueue ourself, and tell
137 // outer scope we need to block.
138 waiter_nobe = Some(state.waiters.wait_end());
141 // Uncomment if you wish to test for sem races. Not
142 // valgrind-friendly.
143 /* for _ in range(0u, 1000) { task::deschedule(); } */
144 // Need to wait outside the exclusive.
145 if waiter_nobe.is_some() {
146 let _ = waiter_nobe.unwrap().recv();
151 pub fn release(&self) {
155 if state.count <= 0 {
156 state.waiters.signal();
162 pub fn access<'a>(&'a self) -> SemGuard<'a, Q> {
164 SemGuard { sem: self }
169 impl<'a, Q: Send> Drop for SemGuard<'a, Q> {
175 impl Sem<Vec<WaitQueue>> {
176 fn new_and_signal(count: int, num_condvars: uint) -> Sem<Vec<WaitQueue>> {
177 let mut queues = Vec::new();
178 for _ in range(0, num_condvars) { queues.push(WaitQueue::new()); }
179 Sem::new(count, queues)
182 // The only other places that condvars get built are rwlock.write_cond()
183 // and rwlock_write_mode.
184 pub fn access_cond<'a>(&'a self) -> SemCondGuard<'a> {
186 guard: self.access(),
187 cvar: Condvar { sem: self, order: Nothing, nocopy: marker::NoCopy },
192 // FIXME(#3598): Want to use an Option down below, but we need a custom enum
193 // that's not polymorphic to get around the fact that lifetimes are invariant
194 // inside of type parameters.
195 enum ReacquireOrderLock<'a> {
200 /// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
201 pub struct Condvar<'a> {
202 // The 'Sem' object associated with this condvar. This is the one that's
203 // atomically-unlocked-and-descheduled upon and reacquired during wakeup.
204 sem: &'a Sem<Vec<WaitQueue> >,
205 // This is (can be) an extra semaphore which is held around the reacquire
206 // operation on the first one. This is only used in cvars associated with
207 // rwlocks, and is needed to ensure that, when a downgrader is trying to
208 // hand off the access lock (which would be the first field, here), a 2nd
209 // writer waking up from a cvar wait can't race with a reader to steal it,
210 // See the comment in write_cond for more detail.
211 order: ReacquireOrderLock<'a>,
212 // Make sure condvars are non-copyable.
213 nocopy: marker::NoCopy,
216 impl<'a> Condvar<'a> {
217 /// Atomically drop the associated lock, and block until a signal is sent.
221 /// A task which is killed while waiting on a condition variable will wake
222 /// up, panic, and unlock the associated lock as it unwinds.
223 pub fn wait(&self) { self.wait_on(0) }
225 /// As wait(), but can specify which of multiple condition variables to
226 /// wait on. Only a signal_on() or broadcast_on() with the same condvar_id
227 /// will wake this thread.
229 /// The associated lock must have been initialised with an appropriate
230 /// number of condvars. The condvar_id must be between 0 and num_condvars-1
231 /// or else this call will panic.
233 /// wait() is equivalent to wait_on(0).
234 pub fn wait_on(&self, condvar_id: uint) {
235 let mut wait_end = None;
236 let mut out_of_bounds = None;
237 // Release lock, 'atomically' enqueuing ourselves in so doing.
239 self.sem.with(|state| {
240 if condvar_id < state.blocked.len() {
243 if state.count <= 0 {
244 state.waiters.signal();
246 // Create waiter nobe, and enqueue ourself to
247 // be woken up by a signaller.
248 wait_end = Some(state.blocked[condvar_id].wait_end());
250 out_of_bounds = Some(state.blocked.len());
255 // If deschedule checks start getting inserted anywhere, we can be
256 // killed before or after enqueueing.
257 check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()", || {
258 // Unconditionally "block". (Might not actually block if a
259 // signaller already sent -- I mean 'unconditionally' in contrast
262 let _ = wait_end.take().unwrap().recv();
264 // Reacquire the condvar.
267 let _g = lock.access();
270 Nothing => self.sem.acquire(),
276 /// Wake up a blocked task. Returns false if there was no blocked task.
277 pub fn signal(&self) -> bool { self.signal_on(0) }
279 /// As signal, but with a specified condvar_id. See wait_on.
280 pub fn signal_on(&self, condvar_id: uint) -> bool {
282 let mut out_of_bounds = None;
283 let mut result = false;
284 self.sem.with(|state| {
285 if condvar_id < state.blocked.len() {
286 result = state.blocked[condvar_id].signal();
288 out_of_bounds = Some(state.blocked.len());
291 check_cvar_bounds(out_of_bounds,
298 /// Wake up all blocked tasks. Returns the number of tasks woken.
299 pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
301 /// As broadcast, but with a specified condvar_id. See wait_on.
302 pub fn broadcast_on(&self, condvar_id: uint) -> uint {
303 let mut out_of_bounds = None;
304 let mut queue = None;
306 self.sem.with(|state| {
307 if condvar_id < state.blocked.len() {
308 // To avoid :broadcast_heavy, we make a new waitqueue,
309 // swap it out with the old one, and broadcast on the
310 // old one outside of the little-lock.
311 queue = Some(mem::replace(state.blocked.get_mut(condvar_id),
314 out_of_bounds = Some(state.blocked.len());
317 check_cvar_bounds(out_of_bounds,
321 queue.take().unwrap().broadcast()
327 // Checks whether a condvar ID was out of bounds, and panics if so, or does
328 // something else next on success.
330 fn check_cvar_bounds<U>(
331 out_of_bounds: Option<uint>,
336 match out_of_bounds {
338 panic!("{} with illegal ID {} - this lock has no condvars!", act, id),
340 panic!("{} with illegal ID {} - ID must be less than {}", act, id, length),
346 struct SemCondGuard<'a> {
347 guard: SemGuard<'a, Vec<WaitQueue>>,
351 /****************************************************************************
353 ****************************************************************************/
355 /// A counting, blocking, bounded-waiting semaphore.
356 pub struct Semaphore {
360 /// An RAII guard used to represent an acquired resource to a semaphore. When
361 /// dropped, this value will release the resource back to the semaphore.
363 pub struct SemaphoreGuard<'a> {
364 _guard: SemGuard<'a, ()>,
368 /// Create a new semaphore with the specified count.
372 /// This function will panic if `count` is negative.
373 pub fn new(count: int) -> Semaphore {
374 Semaphore { sem: Sem::new(count, ()) }
377 /// Acquire a resource represented by the semaphore. Blocks if necessary
378 /// until resource(s) become available.
379 pub fn acquire(&self) { self.sem.acquire() }
381 /// Release a held resource represented by the semaphore. Wakes a blocked
382 /// contending task, if any exist. Won't block the caller.
383 pub fn release(&self) { self.sem.release() }
385 /// Acquire a resource of this semaphore, returning an RAII guard which will
386 /// release the resource when dropped.
387 pub fn access<'a>(&'a self) -> SemaphoreGuard<'a> {
388 SemaphoreGuard { _guard: self.sem.access() }
392 /****************************************************************************
394 ****************************************************************************/
396 /// A blocking, bounded-waiting, mutual exclusion lock with an associated
397 /// FIFO condition variable.
401 /// A task which panicks while holding a mutex will unlock the mutex as it
404 sem: Sem<Vec<WaitQueue>>,
407 /// An RAII structure which is used to gain access to a mutex's condition
408 /// variable. Additionally, when a value of this type is dropped, the
409 /// corresponding mutex is also unlocked.
411 pub struct MutexGuard<'a> {
412 _guard: SemGuard<'a, Vec<WaitQueue>>,
413 /// Inner condition variable which is connected to the outer mutex, and can
414 /// be used for atomic-unlock-and-deschedule.
415 pub cond: Condvar<'a>,
419 /// Create a new mutex, with one associated condvar.
420 pub fn new() -> Mutex { Mutex::new_with_condvars(1) }
422 /// Create a new mutex, with a specified number of associated condvars. This
423 /// will allow calling wait_on/signal_on/broadcast_on with condvar IDs
424 /// between 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be
425 /// allowed but any operations on the condvar will panic.)
426 pub fn new_with_condvars(num_condvars: uint) -> Mutex {
427 Mutex { sem: Sem::new_and_signal(1, num_condvars) }
430 /// Acquires ownership of this mutex, returning an RAII guard which will
431 /// unlock the mutex when dropped. The associated condition variable can
432 /// also be accessed through the returned guard.
433 pub fn lock<'a>(&'a self) -> MutexGuard<'a> {
434 let SemCondGuard { guard, cvar } = self.sem.access_cond();
435 MutexGuard { _guard: guard, cond: cvar }
439 /****************************************************************************
440 * Reader-writer locks
441 ****************************************************************************/
443 // NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem
445 /// A blocking, no-starvation, reader-writer lock with an associated condvar.
449 /// A task which panics while holding an rwlock will unlock the rwlock as it
452 order_lock: Semaphore,
453 access_lock: Sem<Vec<WaitQueue>>,
455 // The only way the count flag is ever accessed is with xadd. Since it is
456 // a read-modify-write operation, multiple xadds on different cores will
457 // always be consistent with respect to each other, so a monotonic/relaxed
458 // consistency ordering suffices (i.e., no extra barriers are needed).
460 // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use
461 // acquire/release orderings superfluously. Change these someday.
462 read_count: atomic::AtomicUint,
465 /// An RAII helper which is created by acquiring a read lock on an RWLock. When
466 /// dropped, this will unlock the RWLock.
468 pub struct RWLockReadGuard<'a> {
472 /// An RAII helper which is created by acquiring a write lock on an RWLock. When
473 /// dropped, this will unlock the RWLock.
475 /// A value of this type can also be consumed to downgrade to a read-only lock.
477 pub struct RWLockWriteGuard<'a> {
479 /// Inner condition variable that is connected to the write-mode of the
481 pub cond: Condvar<'a>,
485 /// Create a new rwlock, with one associated condvar.
486 pub fn new() -> RWLock { RWLock::new_with_condvars(1) }
488 /// Create a new rwlock, with a specified number of associated condvars.
489 /// Similar to mutex_with_condvars.
490 pub fn new_with_condvars(num_condvars: uint) -> RWLock {
492 order_lock: Semaphore::new(1),
493 access_lock: Sem::new_and_signal(1, num_condvars),
494 read_count: atomic::AtomicUint::new(0),
498 /// Acquires a read-lock, returning an RAII guard that will unlock the lock
499 /// when dropped. Calls to 'read' from other tasks may run concurrently with
501 pub fn read<'a>(&'a self) -> RWLockReadGuard<'a> {
502 let _guard = self.order_lock.access();
503 let old_count = self.read_count.fetch_add(1, atomic::Acquire);
505 self.access_lock.acquire();
507 RWLockReadGuard { lock: self }
510 /// Acquire a write-lock, returning an RAII guard that will unlock the lock
511 /// when dropped. No calls to 'read' or 'write' from other tasks will run
512 /// concurrently with this one.
514 /// You can also downgrade a write to a read by calling the `downgrade`
515 /// method on the returned guard. Additionally, the guard will contain a
516 /// `Condvar` attached to this lock.
521 /// use sync::raw::RWLock;
523 /// let lock = RWLock::new();
524 /// let write = lock.write();
525 /// // ... exclusive access ...
526 /// let read = write.downgrade();
527 /// // ... shared access ...
530 pub fn write<'a>(&'a self) -> RWLockWriteGuard<'a> {
531 let _g = self.order_lock.access();
532 self.access_lock.acquire();
534 // It's important to thread our order lock into the condvar, so that
535 // when a cond.wait() wakes up, it uses it while reacquiring the
536 // access lock. If we permitted a waking-up writer to "cut in line",
537 // there could arise a subtle race when a downgrader attempts to hand
538 // off the reader cloud lock to a waiting reader. This race is tested
539 // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like:
540 // T1 (writer) T2 (downgrader) T3 (reader)
542 // [locks for writing]
543 // [holds access_lock]
544 // [is signalled, perhaps by
545 // downgrader or a 4th thread]
546 // tries to lock access(!)
548 // xadd read_count[0->1]
549 // tries to lock access
551 // xadd read_count[1->2]
553 // Since T1 contended on the access lock before T3 did, it will steal
554 // the lock handoff. Adding order_lock in the condvar reacquire path
555 // solves this because T1 will hold order_lock while waiting on access,
556 // which will cause T3 to have to wait until T1 finishes its write,
557 // which can't happen until T2 finishes the downgrade-read entirely.
558 // The astute reader will also note that making waking writers use the
559 // order_lock is better for not starving readers.
563 sem: &self.access_lock,
564 order: Just(&self.order_lock),
565 nocopy: marker::NoCopy,
571 impl<'a> RWLockWriteGuard<'a> {
572 /// Consumes this write lock and converts it into a read lock.
573 pub fn downgrade(self) -> RWLockReadGuard<'a> {
574 let lock = self.lock;
575 // Don't run the destructor of the write guard, we're in charge of
576 // things from now on
577 unsafe { mem::forget(self) }
579 let old_count = lock.read_count.fetch_add(1, atomic::Release);
580 // If another reader was already blocking, we need to hand-off
581 // the "reader cloud" access lock to them.
583 // Guaranteed not to let another writer in, because
584 // another reader was holding the order_lock. Hence they
585 // must be the one to get the access_lock (because all
586 // access_locks are acquired with order_lock held). See
587 // the comment in write_cond for more justification.
588 lock.access_lock.release();
590 RWLockReadGuard { lock: lock }
595 impl<'a> Drop for RWLockWriteGuard<'a> {
597 self.lock.access_lock.release();
602 impl<'a> Drop for RWLockReadGuard<'a> {
604 let old_count = self.lock.read_count.fetch_sub(1, atomic::Release);
605 assert!(old_count > 0);
607 // Note: this release used to be outside of a locked access
608 // to exclusive-protected state. If this code is ever
609 // converted back to such (instead of using atomic ops),
610 // this access MUST NOT go inside the exclusive access.
611 self.lock.access_lock.release();
616 /****************************************************************************
618 ****************************************************************************/
625 use super::{Semaphore, Mutex, RWLock, Condvar};
631 /************************************************************************
633 ************************************************************************/
635 fn test_sem_acquire_release() {
636 let s = Semaphore::new(1);
642 fn test_sem_basic() {
643 let s = Semaphore::new(1);
648 fn test_sem_basic2() {
652 fn test_sem_as_mutex() {
653 let s = Arc::new(Semaphore::new(1));
656 let _g = s2.access();
657 for _ in range(0u, 5) { task::deschedule(); }
660 for _ in range(0u, 5) { task::deschedule(); }
663 fn test_sem_as_cvar() {
664 /* Child waits and parent signals */
665 let (tx, rx) = channel();
666 let s = Arc::new(Semaphore::new(0));
672 for _ in range(0u, 5) { task::deschedule(); }
676 /* Parent waits and child signals */
677 let (tx, rx) = channel();
678 let s = Arc::new(Semaphore::new(0));
681 for _ in range(0u, 5) { task::deschedule(); }
689 fn test_sem_multi_resource() {
690 // Parent and child both get in the critical section at the same
691 // time, and shake hands.
692 let s = Arc::new(Semaphore::new(2));
694 let (tx1, rx1) = channel();
695 let (tx2, rx2) = channel();
697 let _g = s2.access();
706 fn test_sem_runtime_friendly_blocking() {
707 // Force the runtime to schedule two threads on the same sched_loop.
708 // When one blocks, it should schedule the other one.
709 let s = Arc::new(Semaphore::new(1));
711 let (tx, rx) = channel();
719 rx.recv(); // wait for child to come alive
720 for _ in range(0u, 5) { task::deschedule(); } // let the child contend
722 rx.recv(); // wait for child to be done
724 /************************************************************************
726 ************************************************************************/
728 fn test_mutex_lock() {
729 // Unsafely achieve shared state, and do the textbook
730 // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
731 let (tx, rx) = channel();
732 let m = Arc::new(Mutex::new());
734 let mut sharedstate = box 0;
736 let ptr: *mut int = &mut *sharedstate;
738 access_shared(ptr, &m2, 10);
743 access_shared(&mut *sharedstate, &m, 10);
746 assert_eq!(*sharedstate, 20);
749 fn access_shared(sharedstate: *mut int, m: &Arc<Mutex>, n: uint) {
750 for _ in range(0u, n) {
752 let oldval = unsafe { *sharedstate };
754 unsafe { *sharedstate = oldval + 1; }
759 fn test_mutex_cond_wait() {
760 let m = Arc::new(Mutex::new());
762 // Child wakes up parent
767 let lock = m2.lock();
768 let woken = lock.cond.signal();
773 // Parent wakes up child
774 let (tx, rx) = channel();
777 let lock = m3.lock();
782 rx.recv(); // Wait until child gets in the mutex
785 let woken = lock.cond.signal();
788 rx.recv(); // Wait until child wakes up
791 fn test_mutex_cond_broadcast_helper(num_waiters: uint) {
792 let m = Arc::new(Mutex::new());
793 let mut rxs = Vec::new();
795 for _ in range(0u, num_waiters) {
797 let (tx, rx) = channel();
800 let lock = mi.lock();
807 // wait until all children get in the mutex
808 for rx in rxs.iter_mut() { rx.recv(); }
811 let num_woken = lock.cond.broadcast();
812 assert_eq!(num_woken, num_waiters);
814 // wait until all children wake up
815 for rx in rxs.iter_mut() { rx.recv(); }
818 fn test_mutex_cond_broadcast() {
819 test_mutex_cond_broadcast_helper(12);
822 fn test_mutex_cond_broadcast_none() {
823 test_mutex_cond_broadcast_helper(0);
826 fn test_mutex_cond_no_waiter() {
827 let m = Arc::new(Mutex::new());
829 let _ = task::try(proc() {
832 let lock = m2.lock();
833 assert!(!lock.cond.signal());
836 fn test_mutex_killed_simple() {
839 // Mutex must get automatically unlocked if panicked/killed within.
840 let m = Arc::new(Mutex::new());
843 let result: result::Result<(), Box<Any + Send>> = task::try(proc() {
844 let _lock = m2.lock();
847 assert!(result.is_err());
848 // child task must have finished by the time try returns
852 fn test_mutex_cond_signal_on_0() {
853 // Tests that signal_on(0) is equivalent to signal().
854 let m = Arc::new(Mutex::new());
858 let lock = m2.lock();
859 lock.cond.signal_on(0);
864 fn test_mutex_no_condvars() {
865 let result = task::try(proc() {
866 let m = Mutex::new_with_condvars(0);
867 m.lock().cond.wait();
869 assert!(result.is_err());
870 let result = task::try(proc() {
871 let m = Mutex::new_with_condvars(0);
872 m.lock().cond.signal();
874 assert!(result.is_err());
875 let result = task::try(proc() {
876 let m = Mutex::new_with_condvars(0);
877 m.lock().cond.broadcast();
879 assert!(result.is_err());
881 /************************************************************************
882 * Reader/writer lock tests
883 ************************************************************************/
885 pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead }
887 fn lock_rwlock_in_mode(x: &Arc<RWLock>, mode: RWLockMode, blk: ||) {
889 Read => { let _g = x.read(); blk() }
890 Write => { let _g = x.write(); blk() }
891 Downgrade => { let _g = x.write(); blk() }
892 DowngradeRead => { let _g = x.write().downgrade(); blk() }
896 fn test_rwlock_exclusion(x: Arc<RWLock>,
899 // Test mutual exclusion between readers and writers. Just like the
900 // mutex mutual exclusion test, a ways above.
901 let (tx, rx) = channel();
903 let mut sharedstate = box 0;
905 let ptr: *const int = &*sharedstate;
907 let sharedstate: &mut int =
908 unsafe { mem::transmute(ptr) };
909 access_shared(sharedstate, &x2, mode1, 10);
914 access_shared(&mut *sharedstate, &x, mode2, 10);
917 assert_eq!(*sharedstate, 20);
920 fn access_shared(sharedstate: &mut int, x: &Arc<RWLock>,
921 mode: RWLockMode, n: uint) {
922 for _ in range(0u, n) {
923 lock_rwlock_in_mode(x, mode, || {
924 let oldval = *sharedstate;
926 *sharedstate = oldval + 1;
932 fn test_rwlock_readers_wont_modify_the_data() {
933 test_rwlock_exclusion(Arc::new(RWLock::new()), Read, Write);
934 test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Read);
935 test_rwlock_exclusion(Arc::new(RWLock::new()), Read, Downgrade);
936 test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Read);
937 test_rwlock_exclusion(Arc::new(RWLock::new()), Write, DowngradeRead);
938 test_rwlock_exclusion(Arc::new(RWLock::new()), DowngradeRead, Write);
941 fn test_rwlock_writers_and_writers() {
942 test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Write);
943 test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Downgrade);
944 test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Write);
945 test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Downgrade);
948 fn test_rwlock_handshake(x: Arc<RWLock>,
951 make_mode2_go_first: bool) {
952 // Much like sem_multi_resource.
954 let (tx1, rx1) = channel();
955 let (tx2, rx2) = channel();
957 if !make_mode2_go_first {
958 rx2.recv(); // parent sends to us once it locks, or ...
960 lock_rwlock_in_mode(&x2, mode2, || {
961 if make_mode2_go_first {
962 tx1.send(()); // ... we send to it once we lock
968 if make_mode2_go_first {
969 rx1.recv(); // child sends to us once it locks, or ...
971 lock_rwlock_in_mode(&x, mode1, || {
972 if !make_mode2_go_first {
973 tx2.send(()); // ... we send to it once we lock
980 fn test_rwlock_readers_and_readers() {
981 test_rwlock_handshake(Arc::new(RWLock::new()), Read, Read, false);
982 // The downgrader needs to get in before the reader gets in, otherwise
983 // they cannot end up reading at the same time.
984 test_rwlock_handshake(Arc::new(RWLock::new()), DowngradeRead, Read, false);
985 test_rwlock_handshake(Arc::new(RWLock::new()), Read, DowngradeRead, true);
986 // Two downgrade_reads can never both end up reading at the same time.
989 fn test_rwlock_downgrade_unlock() {
990 // Tests that downgrade can unlock the lock in both modes
991 let x = Arc::new(RWLock::new());
992 lock_rwlock_in_mode(&x, Downgrade, || { });
993 test_rwlock_handshake(x, Read, Read, false);
994 let y = Arc::new(RWLock::new());
995 lock_rwlock_in_mode(&y, DowngradeRead, || { });
996 test_rwlock_exclusion(y, Write, Write);
999 fn test_rwlock_read_recursive() {
1000 let x = RWLock::new();
1005 fn test_rwlock_cond_wait() {
1006 // As test_mutex_cond_wait above.
1007 let x = Arc::new(RWLock::new());
1009 // Child wakes up parent
1011 let lock = x.write();
1013 task::spawn(proc() {
1014 let lock = x2.write();
1015 assert!(lock.cond.signal());
1019 // Parent wakes up child
1020 let (tx, rx) = channel();
1022 task::spawn(proc() {
1023 let lock = x3.write();
1028 rx.recv(); // Wait until child gets in the rwlock
1029 drop(x.read()); // Must be able to get in as a reader
1032 assert!(x.cond.signal());
1034 rx.recv(); // Wait until child wakes up
1035 drop(x.read()); // Just for good measure
1038 fn test_rwlock_cond_broadcast_helper(num_waiters: uint) {
1039 // Much like the mutex broadcast test. Downgrade-enabled.
1040 fn lock_cond(x: &Arc<RWLock>, blk: |c: &Condvar|) {
1041 let lock = x.write();
1045 let x = Arc::new(RWLock::new());
1046 let mut rxs = Vec::new();
1048 for _ in range(0u, num_waiters) {
1050 let (tx, rx) = channel();
1052 task::spawn(proc() {
1053 lock_cond(&xi, |cond| {
1061 // wait until all children get in the mutex
1062 for rx in rxs.iter_mut() { let _ = rx.recv(); }
1063 lock_cond(&x, |cond| {
1064 let num_woken = cond.broadcast();
1065 assert_eq!(num_woken, num_waiters);
1067 // wait until all children wake up
1068 for rx in rxs.iter_mut() { let _ = rx.recv(); }
1071 fn test_rwlock_cond_broadcast() {
1072 test_rwlock_cond_broadcast_helper(0);
1073 test_rwlock_cond_broadcast_helper(12);
1076 fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) {
1079 // Mutex must get automatically unlocked if panicked/killed within.
1080 let x = Arc::new(RWLock::new());
1083 let result: result::Result<(), Box<Any + Send>> = task::try(proc() {
1084 lock_rwlock_in_mode(&x2, mode1, || {
1088 assert!(result.is_err());
1089 // child task must have finished by the time try returns
1090 lock_rwlock_in_mode(&x, mode2, || { })
1093 fn test_rwlock_reader_killed_writer() {
1094 rwlock_kill_helper(Read, Write);
1097 fn test_rwlock_writer_killed_reader() {
1098 rwlock_kill_helper(Write, Read);
1101 fn test_rwlock_reader_killed_reader() {
1102 rwlock_kill_helper(Read, Read);
1105 fn test_rwlock_writer_killed_writer() {
1106 rwlock_kill_helper(Write, Write);
1109 fn test_rwlock_kill_downgrader() {
1110 rwlock_kill_helper(Downgrade, Read);
1111 rwlock_kill_helper(Read, Downgrade);
1112 rwlock_kill_helper(Downgrade, Write);
1113 rwlock_kill_helper(Write, Downgrade);
1114 rwlock_kill_helper(DowngradeRead, Read);
1115 rwlock_kill_helper(Read, DowngradeRead);
1116 rwlock_kill_helper(DowngradeRead, Write);
1117 rwlock_kill_helper(Write, DowngradeRead);
1118 rwlock_kill_helper(DowngradeRead, Downgrade);
1119 rwlock_kill_helper(DowngradeRead, Downgrade);
1120 rwlock_kill_helper(Downgrade, DowngradeRead);
1121 rwlock_kill_helper(Downgrade, DowngradeRead);