1 // Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 #[allow(missing_doc)];
14 * The concurrency primitives you know and love.
16 * Maybe once we have a "core exports x only to std" mechanism, these can be
22 use std::kinds::marker;
23 use std::mem::replace;
24 use std::sync::arc::UnsafeArc;
25 use std::sync::atomics;
26 use std::unstable::finally::Finally;
30 /****************************************************************************
32 ****************************************************************************/
38 // Each waiting task receives on one of these.
40 type WaitEnd = Receiver<()>;
42 type SignalEnd = Sender<()>;
43 // A doubly-ended queue of waiting tasks.
45 struct WaitQueue { head: Receiver<SignalEnd>,
46 tail: Sender<SignalEnd> }
49 fn new() -> WaitQueue {
50 let (block_tail, block_head) = channel();
51 WaitQueue { head: block_head, tail: block_tail }
54 // Signals one live task from the queue.
55 fn signal(&self) -> bool {
56 match self.head.try_recv() {
58 // Send a wakeup signal. If the waiter was killed, its port will
59 // have closed. Keep trying until we get a live task.
70 fn broadcast(&self) -> uint {
73 match self.head.try_recv() {
85 fn wait_end(&self) -> WaitEnd {
86 let (signal_end, wait_end) = channel();
87 assert!(self.tail.try_send(signal_end));
92 // The building-block used to make semaphores, mutexes, and rwlocks.
97 // Can be either unit or another waitqueue. Some sems shouldn't come with
98 // a condition variable attached, others should.
102 struct Sem<Q>(UnsafeArc<SemInner<Q>>);
105 impl<Q:Send> Sem<Q> {
106 fn new(count: int, q: Q) -> Sem<Q> {
107 Sem(UnsafeArc::new(SemInner {
109 waiters: WaitQueue::new(),
111 lock: mutex::Mutex::new(),
115 unsafe fn with(&self, f: |&mut SemInner<Q>|) {
116 let Sem(ref arc) = *self;
117 let state = arc.get();
118 let _g = (*state).lock.lock();
119 f(cast::transmute(state));
122 pub fn acquire(&self) {
124 let mut waiter_nobe = None;
128 // Create waiter nobe, enqueue ourself, and tell
129 // outer scope we need to block.
130 waiter_nobe = Some(state.waiters.wait_end());
133 // Uncomment if you wish to test for sem races. Not valgrind-friendly.
134 /* for _ in range(0, 1000) { task::deschedule(); } */
135 // Need to wait outside the exclusive.
136 if waiter_nobe.is_some() {
137 let _ = waiter_nobe.unwrap().recv();
142 pub fn release(&self) {
146 if state.count <= 0 {
147 state.waiters.signal();
153 pub fn access<U>(&self, blk: || -> U) -> U {
164 impl Sem<~[WaitQueue]> {
165 fn new_and_signal(count: int, num_condvars: uint)
166 -> Sem<~[WaitQueue]> {
167 let mut queues = ~[];
168 for _ in range(0, num_condvars) { queues.push(WaitQueue::new()); }
169 Sem::new(count, queues)
173 // FIXME(#3598): Want to use an Option down below, but we need a custom enum
174 // that's not polymorphic to get around the fact that lifetimes are invariant
175 // inside of type parameters.
176 enum ReacquireOrderLock<'a> {
181 /// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
182 pub struct Condvar<'a> {
183 // The 'Sem' object associated with this condvar. This is the one that's
184 // atomically-unlocked-and-descheduled upon and reacquired during wakeup.
185 priv sem: &'a Sem<~[WaitQueue]>,
186 // This is (can be) an extra semaphore which is held around the reacquire
187 // operation on the first one. This is only used in cvars associated with
188 // rwlocks, and is needed to ensure that, when a downgrader is trying to
189 // hand off the access lock (which would be the first field, here), a 2nd
190 // writer waking up from a cvar wait can't race with a reader to steal it,
191 // See the comment in write_cond for more detail.
192 priv order: ReacquireOrderLock<'a>,
193 // Make sure condvars are non-copyable.
194 priv nopod: marker::NoPod,
197 impl<'a> Condvar<'a> {
199 * Atomically drop the associated lock, and block until a signal is sent.
202 * A task which is killed (i.e., by linked failure with another task)
203 * while waiting on a condition variable will wake up, fail, and unlock
204 * the associated lock as it unwinds.
206 pub fn wait(&self) { self.wait_on(0) }
209 * As wait(), but can specify which of multiple condition variables to
210 * wait on. Only a signal_on() or broadcast_on() with the same condvar_id
211 * will wake this thread.
213 * The associated lock must have been initialised with an appropriate
214 * number of condvars. The condvar_id must be between 0 and num_condvars-1
215 * or else this call will fail.
217 * wait() is equivalent to wait_on(0).
219 pub fn wait_on(&self, condvar_id: uint) {
220 let mut wait_end = None;
221 let mut out_of_bounds = None;
222 // Release lock, 'atomically' enqueuing ourselves in so doing.
224 self.sem.with(|state| {
225 if condvar_id < state.blocked.len() {
228 if state.count <= 0 {
229 state.waiters.signal();
231 // Create waiter nobe, and enqueue ourself to
232 // be woken up by a signaller.
233 wait_end = Some(state.blocked[condvar_id].wait_end());
235 out_of_bounds = Some(state.blocked.len());
240 // If deschedule checks start getting inserted anywhere, we can be
241 // killed before or after enqueueing.
242 check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()", || {
243 // Unconditionally "block". (Might not actually block if a
244 // signaller already sent -- I mean 'unconditionally' in contrast
247 let _ = wait_end.take_unwrap().recv();
249 // Reacquire the condvar.
251 Just(lock) => lock.access(|| self.sem.acquire()),
252 Nothing => self.sem.acquire(),
258 /// Wake up a blocked task. Returns false if there was no blocked task.
259 pub fn signal(&self) -> bool { self.signal_on(0) }
261 /// As signal, but with a specified condvar_id. See wait_on.
262 pub fn signal_on(&self, condvar_id: uint) -> bool {
264 let mut out_of_bounds = None;
265 let mut result = false;
266 self.sem.with(|state| {
267 if condvar_id < state.blocked.len() {
268 result = state.blocked[condvar_id].signal();
270 out_of_bounds = Some(state.blocked.len());
273 check_cvar_bounds(out_of_bounds,
280 /// Wake up all blocked tasks. Returns the number of tasks woken.
281 pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
283 /// As broadcast, but with a specified condvar_id. See wait_on.
284 pub fn broadcast_on(&self, condvar_id: uint) -> uint {
285 let mut out_of_bounds = None;
286 let mut queue = None;
288 self.sem.with(|state| {
289 if condvar_id < state.blocked.len() {
290 // To avoid :broadcast_heavy, we make a new waitqueue,
291 // swap it out with the old one, and broadcast on the
292 // old one outside of the little-lock.
293 queue = Some(replace(&mut state.blocked[condvar_id],
296 out_of_bounds = Some(state.blocked.len());
299 check_cvar_bounds(out_of_bounds,
303 queue.take_unwrap().broadcast()
309 // Checks whether a condvar ID was out of bounds, and fails if so, or does
310 // something else next on success.
313 fn check_cvar_bounds<U>(
314 out_of_bounds: Option<uint>,
319 match out_of_bounds {
321 fail!("{} with illegal ID {} - this lock has no condvars!", act, id),
323 fail!("{} with illegal ID {} - ID must be less than {}", act, id, length),
329 impl Sem<~[WaitQueue]> {
330 // The only other places that condvars get built are rwlock.write_cond()
331 // and rwlock_write_mode.
332 pub fn access_cond<U>(&self, blk: |c: &Condvar| -> U) -> U {
343 /****************************************************************************
345 ****************************************************************************/
347 /// A counting, blocking, bounded-waiting semaphore.
348 pub struct Semaphore { priv sem: Sem<()> }
351 impl Clone for Semaphore {
352 /// Create a new handle to the semaphore.
353 fn clone(&self) -> Semaphore {
354 let Sem(ref lock) = self.sem;
355 Semaphore { sem: Sem(lock.clone()) }
360 /// Create a new semaphore with the specified count.
361 pub fn new(count: int) -> Semaphore {
362 Semaphore { sem: Sem::new(count, ()) }
366 * Acquire a resource represented by the semaphore. Blocks if necessary
367 * until resource(s) become available.
369 pub fn acquire(&self) { (&self.sem).acquire() }
372 * Release a held resource represented by the semaphore. Wakes a blocked
373 * contending task, if any exist. Won't block the caller.
375 pub fn release(&self) { (&self.sem).release() }
377 /// Run a function with ownership of one of the semaphore's resources.
378 pub fn access<U>(&self, blk: || -> U) -> U { (&self.sem).access(blk) }
381 /****************************************************************************
383 ****************************************************************************/
386 * A blocking, bounded-waiting, mutual exclusion lock with an associated
387 * FIFO condition variable.
390 * A task which fails while holding a mutex will unlock the mutex as it
394 pub struct Mutex { priv sem: Sem<~[WaitQueue]> }
395 impl Clone for Mutex {
396 /// Create a new handle to the mutex.
397 fn clone(&self) -> Mutex {
398 let Sem(ref queue) = self.sem;
399 Mutex { sem: Sem(queue.clone()) } }
403 /// Create a new mutex, with one associated condvar.
404 pub fn new() -> Mutex { Mutex::new_with_condvars(1) }
407 * Create a new mutex, with a specified number of associated condvars. This
408 * will allow calling wait_on/signal_on/broadcast_on with condvar IDs between
409 * 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be allowed but
410 * any operations on the condvar will fail.)
412 pub fn new_with_condvars(num_condvars: uint) -> Mutex {
413 Mutex { sem: Sem::new_and_signal(1, num_condvars) }
417 /// Run a function with ownership of the mutex.
418 pub fn lock<U>(&self, blk: || -> U) -> U {
419 (&self.sem).access(blk)
422 /// Run a function with ownership of the mutex and a handle to a condvar.
423 pub fn lock_cond<U>(&self, blk: |c: &Condvar| -> U) -> U {
424 (&self.sem).access_cond(blk)
428 /****************************************************************************
429 * Reader-writer locks
430 ****************************************************************************/
432 // NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem
436 // You might ask, "Why don't you need to use an atomic for the mode flag?"
437 // This flag affects the behaviour of readers (for plain readers, they
438 // assert on it; for downgraders, they use it to decide which mode to
439 // unlock for). Consider that the flag is only unset when the very last
440 // reader exits; therefore, it can never be unset during a reader/reader
441 // (or reader/downgrader) race.
442 // By the way, if we didn't care about the assert in the read unlock path,
443 // we could instead store the mode flag in write_downgrade's stack frame,
444 // and have the downgrade tokens store a reference to it.
446 // The only way the count flag is ever accessed is with xadd. Since it is
447 // a read-modify-write operation, multiple xadds on different cores will
448 // always be consistent with respect to each other, so a monotonic/relaxed
449 // consistency ordering suffices (i.e., no extra barriers are needed).
450 // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use
451 // acquire/release orderings superfluously. Change these someday.
452 read_count: atomics::AtomicUint,
456 * A blocking, no-starvation, reader-writer lock with an associated condvar.
459 * A task which fails while holding an rwlock will unlock the rwlock as it
463 priv order_lock: Semaphore,
464 priv access_lock: Sem<~[WaitQueue]>,
465 priv state: UnsafeArc<RWLockInner>,
469 /// Create a new rwlock, with one associated condvar.
470 pub fn new() -> RWLock { RWLock::new_with_condvars(1) }
473 * Create a new rwlock, with a specified number of associated condvars.
474 * Similar to mutex_with_condvars.
476 pub fn new_with_condvars(num_condvars: uint) -> RWLock {
477 let state = UnsafeArc::new(RWLockInner {
479 read_count: atomics::AtomicUint::new(0),
481 RWLock { order_lock: Semaphore::new(1),
482 access_lock: Sem::new_and_signal(1, num_condvars),
486 /// Create a new handle to the rwlock.
487 pub fn clone(&self) -> RWLock {
488 let Sem(ref access_lock_queue) = self.access_lock;
489 RWLock { order_lock: (&(self.order_lock)).clone(),
490 access_lock: Sem(access_lock_queue.clone()),
491 state: self.state.clone() }
495 * Run a function with the rwlock in read mode. Calls to 'read' from other
496 * tasks may run concurrently with this one.
498 pub fn read<U>(&self, blk: || -> U) -> U {
500 (&self.order_lock).access(|| {
501 let state = &mut *self.state.get();
502 let old_count = state.read_count.fetch_add(1, atomics::Acquire);
504 (&self.access_lock).acquire();
505 state.read_mode = true;
511 let state = &mut *self.state.get();
512 assert!(state.read_mode);
513 let old_count = state.read_count.fetch_sub(1, atomics::Release);
514 assert!(old_count > 0);
516 state.read_mode = false;
517 // Note: this release used to be outside of a locked access
518 // to exclusive-protected state. If this code is ever
519 // converted back to such (instead of using atomic ops),
520 // this access MUST NOT go inside the exclusive access.
521 (&self.access_lock).release();
528 * Run a function with the rwlock in write mode. No calls to 'read' or
529 * 'write' from other tasks will run concurrently with this one.
531 pub fn write<U>(&self, blk: || -> U) -> U {
532 (&self.order_lock).acquire();
533 (&self.access_lock).access(|| {
534 (&self.order_lock).release();
540 * As write(), but also with a handle to a condvar. Waiting on this
541 * condvar will allow readers and writers alike to take the rwlock before
542 * the waiting task is signalled. (Note: a writer that waited and then
543 * was signalled might reacquire the lock before other waiting writers.)
545 pub fn write_cond<U>(&self, blk: |c: &Condvar| -> U) -> U {
546 // It's important to thread our order lock into the condvar, so that
547 // when a cond.wait() wakes up, it uses it while reacquiring the
548 // access lock. If we permitted a waking-up writer to "cut in line",
549 // there could arise a subtle race when a downgrader attempts to hand
550 // off the reader cloud lock to a waiting reader. This race is tested
551 // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like:
552 // T1 (writer) T2 (downgrader) T3 (reader)
554 // [locks for writing]
555 // [holds access_lock]
556 // [is signalled, perhaps by
557 // downgrader or a 4th thread]
558 // tries to lock access(!)
560 // xadd read_count[0->1]
561 // tries to lock access
563 // xadd read_count[1->2]
565 // Since T1 contended on the access lock before T3 did, it will steal
566 // the lock handoff. Adding order_lock in the condvar reacquire path
567 // solves this because T1 will hold order_lock while waiting on access,
568 // which will cause T3 to have to wait until T1 finishes its write,
569 // which can't happen until T2 finishes the downgrade-read entirely.
570 // The astute reader will also note that making waking writers use the
571 // order_lock is better for not starving readers.
572 (&self.order_lock).acquire();
573 (&self.access_lock).access_cond(|cond| {
574 (&self.order_lock).release();
575 let opt_lock = Just(&self.order_lock);
576 blk(&Condvar { sem: cond.sem, order: opt_lock,
577 nopod: marker::NoPod })
582 * As write(), but with the ability to atomically 'downgrade' the lock;
583 * i.e., to become a reader without letting other writers get the lock in
584 * the meantime (such as unlocking and then re-locking as a reader would
585 * do). The block takes a "write mode token" argument, which can be
586 * transformed into a "read mode token" by calling downgrade(). Example:
593 * let lock = RWLock::new();
594 * lock.write_downgrade(|mut write_token| {
595 * write_token.write_cond(|condvar| {
596 * // ... exclusive access ...
598 * let read_token = lock.downgrade(write_token);
599 * read_token.read(|| {
600 * // ... shared access ...
605 pub fn write_downgrade<U>(&self, blk: |v: RWLockWriteMode| -> U) -> U {
606 // Implementation slightly different from the slicker 'write's above.
607 // The exit path is conditional on whether the caller downgrades.
608 (&self.order_lock).acquire();
609 (&self.access_lock).acquire();
610 (&self.order_lock).release();
612 blk(RWLockWriteMode { lock: self, nopod: marker::NoPod })
614 let writer_or_last_reader;
615 // Check if we're releasing from read mode or from write mode.
616 let state = unsafe { &mut *self.state.get() };
618 // Releasing from read mode.
619 let old_count = state.read_count.fetch_sub(1, atomics::Release);
620 assert!(old_count > 0);
621 // Check if other readers remain.
623 // Case 1: Writer downgraded & was the last reader
624 writer_or_last_reader = true;
625 state.read_mode = false;
627 // Case 2: Writer downgraded & was not the last reader
628 writer_or_last_reader = false;
631 // Case 3: Writer did not downgrade
632 writer_or_last_reader = true;
634 if writer_or_last_reader {
635 // Nobody left inside; release the "reader cloud" lock.
636 (&self.access_lock).release();
641 /// To be called inside of the write_downgrade block.
642 pub fn downgrade<'a>(&self, token: RWLockWriteMode<'a>)
643 -> RWLockReadMode<'a> {
644 if !((self as *RWLock) == (token.lock as *RWLock)) {
645 fail!("Can't downgrade() with a different rwlock's write_mode!");
648 let state = &mut *self.state.get();
649 assert!(!state.read_mode);
650 state.read_mode = true;
651 // If a reader attempts to enter at this point, both the
652 // downgrader and reader will set the mode flag. This is fine.
653 let old_count = state.read_count.fetch_add(1, atomics::Release);
654 // If another reader was already blocking, we need to hand-off
655 // the "reader cloud" access lock to them.
657 // Guaranteed not to let another writer in, because
658 // another reader was holding the order_lock. Hence they
659 // must be the one to get the access_lock (because all
660 // access_locks are acquired with order_lock held). See
661 // the comment in write_cond for more justification.
662 (&self.access_lock).release();
665 RWLockReadMode { lock: token.lock, nopod: marker::NoPod }
669 /// The "write permission" token used for rwlock.write_downgrade().
671 pub struct RWLockWriteMode<'a> { priv lock: &'a RWLock, priv nopod: marker::NoPod }
672 /// The "read permission" token used for rwlock.write_downgrade().
673 pub struct RWLockReadMode<'a> { priv lock: &'a RWLock,
674 priv nopod: marker::NoPod }
676 impl<'a> RWLockWriteMode<'a> {
677 /// Access the pre-downgrade rwlock in write mode.
678 pub fn write<U>(&self, blk: || -> U) -> U { blk() }
679 /// Access the pre-downgrade rwlock in write mode with a condvar.
680 pub fn write_cond<U>(&self, blk: |c: &Condvar| -> U) -> U {
681 // Need to make the condvar use the order lock when reacquiring the
682 // access lock. See comment in RWLock::write_cond for why.
683 blk(&Condvar { sem: &self.lock.access_lock,
684 order: Just(&self.lock.order_lock),
685 nopod: marker::NoPod })
689 impl<'a> RWLockReadMode<'a> {
690 /// Access the post-downgrade rwlock in read mode.
691 pub fn read<U>(&self, blk: || -> U) -> U { blk() }
694 /// A barrier enables multiple tasks to synchronize the beginning
695 /// of some computation.
698 /// use sync::Barrier;
700 /// let barrier = Barrier::new(10);
701 /// for _ in range(0, 10) {
702 /// let c = barrier.clone();
703 /// // The same messages will be printed together.
704 /// // You will NOT see any interleaving.
706 /// println!("before wait");
708 /// println!("after wait");
714 priv arc: MutexArc<BarrierState>,
715 priv num_tasks: uint,
718 // The inner state of a double barrier
719 struct BarrierState {
725 /// Create a new barrier that can block a given number of tasks.
726 pub fn new(num_tasks: uint) -> Barrier {
728 arc: MutexArc::new(BarrierState {
732 num_tasks: num_tasks,
736 /// Block the current task until a certain number of tasks is waiting.
738 self.arc.access_cond(|state, cond| {
739 let local_gen = state.generation_id;
741 if state.count < self.num_tasks {
742 // We need a while loop to guard against spurious wakeups.
743 // http://en.wikipedia.org/wiki/Spurious_wakeup
744 while local_gen == state.generation_id && state.count < self.num_tasks {
749 state.generation_id += 1;
756 /****************************************************************************
758 ****************************************************************************/
762 use sync::{Semaphore, Mutex, RWLock, Barrier, Condvar};
767 use std::comm::Empty;
769 /************************************************************************
771 ************************************************************************/
773 fn test_sem_acquire_release() {
774 let s = Semaphore::new(1);
780 fn test_sem_basic() {
781 let s = Semaphore::new(1);
785 fn test_sem_as_mutex() {
786 let s = Semaphore::new(1);
790 for _ in range(0, 5) { task::deschedule(); }
794 for _ in range(0, 5) { task::deschedule(); }
798 fn test_sem_as_cvar() {
799 /* Child waits and parent signals */
800 let (tx, rx) = channel();
801 let s = Semaphore::new(0);
807 for _ in range(0, 5) { task::deschedule(); }
811 /* Parent waits and child signals */
812 let (tx, rx) = channel();
813 let s = Semaphore::new(0);
816 for _ in range(0, 5) { task::deschedule(); }
824 fn test_sem_multi_resource() {
825 // Parent and child both get in the critical section at the same
826 // time, and shake hands.
827 let s = Semaphore::new(2);
829 let (tx1, rx1) = channel();
830 let (tx2, rx2) = channel();
843 fn test_sem_runtime_friendly_blocking() {
844 // Force the runtime to schedule two threads on the same sched_loop.
845 // When one blocks, it should schedule the other one.
846 let s = Semaphore::new(1);
848 let (tx, rx) = channel();
849 let mut child_data = Some((s2, tx));
851 let (s2, tx) = child_data.take_unwrap();
857 let _ = rx.recv(); // wait for child to come alive
858 for _ in range(0, 5) { task::deschedule(); } // let the child contend
860 let _ = rx.recv(); // wait for child to be done
862 /************************************************************************
864 ************************************************************************/
866 fn test_mutex_lock() {
867 // Unsafely achieve shared state, and do the textbook
868 // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
869 let (tx, rx) = channel();
870 let m = Mutex::new();
872 let mut sharedstate = ~0;
874 let ptr: *int = &*sharedstate;
876 let sharedstate: &mut int =
877 unsafe { cast::transmute(ptr) };
878 access_shared(sharedstate, &m2, 10);
883 access_shared(sharedstate, &m, 10);
886 assert_eq!(*sharedstate, 20);
889 fn access_shared(sharedstate: &mut int, m: &Mutex, n: uint) {
890 for _ in range(0, n) {
892 let oldval = *sharedstate;
894 *sharedstate = oldval + 1;
900 fn test_mutex_cond_wait() {
901 let m = Mutex::new();
903 // Child wakes up parent
907 m2.lock_cond(|cond| {
908 let woken = cond.signal();
914 // Parent wakes up child
915 let (tx, rx) = channel();
918 m3.lock_cond(|cond| {
924 let _ = rx.recv(); // Wait until child gets in the mutex
926 let woken = cond.signal();
929 let _ = rx.recv(); // Wait until child wakes up
932 fn test_mutex_cond_broadcast_helper(num_waiters: uint) {
933 let m = Mutex::new();
936 for _ in range(0, num_waiters) {
938 let (tx, rx) = channel();
941 mi.lock_cond(|cond| {
949 // wait until all children get in the mutex
950 for rx in rxs.mut_iter() { let _ = rx.recv(); }
952 let num_woken = cond.broadcast();
953 assert_eq!(num_woken, num_waiters);
955 // wait until all children wake up
956 for rx in rxs.mut_iter() { let _ = rx.recv(); }
959 fn test_mutex_cond_broadcast() {
960 test_mutex_cond_broadcast_helper(12);
963 fn test_mutex_cond_broadcast_none() {
964 test_mutex_cond_broadcast_helper(0);
967 fn test_mutex_cond_no_waiter() {
968 let m = Mutex::new();
970 let _ = task::try(proc() {
971 m.lock_cond(|_x| { })
973 m2.lock_cond(|cond| {
974 assert!(!cond.signal());
978 fn test_mutex_killed_simple() {
981 // Mutex must get automatically unlocked if failed/killed within.
982 let m = Mutex::new();
985 let result: result::Result<(), ~Any> = task::try(proc() {
990 assert!(result.is_err());
991 // child task must have finished by the time try returns
995 fn test_mutex_cond_signal_on_0() {
996 // Tests that signal_on(0) is equivalent to signal().
997 let m = Mutex::new();
1000 task::spawn(proc() {
1001 m2.lock_cond(|cond| {
1009 fn test_mutex_no_condvars() {
1010 let result = task::try(proc() {
1011 let m = Mutex::new_with_condvars(0);
1012 m.lock_cond(|cond| { cond.wait(); })
1014 assert!(result.is_err());
1015 let result = task::try(proc() {
1016 let m = Mutex::new_with_condvars(0);
1017 m.lock_cond(|cond| { cond.signal(); })
1019 assert!(result.is_err());
1020 let result = task::try(proc() {
1021 let m = Mutex::new_with_condvars(0);
1022 m.lock_cond(|cond| { cond.broadcast(); })
1024 assert!(result.is_err());
1026 /************************************************************************
1027 * Reader/writer lock tests
1028 ************************************************************************/
1030 pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead }
1032 fn lock_rwlock_in_mode(x: &RWLock, mode: RWLockMode, blk: ||) {
1034 Read => x.read(blk),
1035 Write => x.write(blk),
1037 x.write_downgrade(|mode| {
1038 mode.write(|| { blk() });
1041 x.write_downgrade(|mode| {
1042 let mode = x.downgrade(mode);
1043 mode.read(|| { blk() });
1048 fn test_rwlock_exclusion(x: &RWLock,
1050 mode2: RWLockMode) {
1051 // Test mutual exclusion between readers and writers. Just like the
1052 // mutex mutual exclusion test, a ways above.
1053 let (tx, rx) = channel();
1055 let mut sharedstate = ~0;
1057 let ptr: *int = &*sharedstate;
1058 task::spawn(proc() {
1059 let sharedstate: &mut int =
1060 unsafe { cast::transmute(ptr) };
1061 access_shared(sharedstate, &x2, mode1, 10);
1066 access_shared(sharedstate, x, mode2, 10);
1069 assert_eq!(*sharedstate, 20);
1072 fn access_shared(sharedstate: &mut int, x: &RWLock, mode: RWLockMode,
1074 for _ in range(0, n) {
1075 lock_rwlock_in_mode(x, mode, || {
1076 let oldval = *sharedstate;
1078 *sharedstate = oldval + 1;
1084 fn test_rwlock_readers_wont_modify_the_data() {
1085 test_rwlock_exclusion(&RWLock::new(), Read, Write);
1086 test_rwlock_exclusion(&RWLock::new(), Write, Read);
1087 test_rwlock_exclusion(&RWLock::new(), Read, Downgrade);
1088 test_rwlock_exclusion(&RWLock::new(), Downgrade, Read);
1091 fn test_rwlock_writers_and_writers() {
1092 test_rwlock_exclusion(&RWLock::new(), Write, Write);
1093 test_rwlock_exclusion(&RWLock::new(), Write, Downgrade);
1094 test_rwlock_exclusion(&RWLock::new(), Downgrade, Write);
1095 test_rwlock_exclusion(&RWLock::new(), Downgrade, Downgrade);
1098 fn test_rwlock_handshake(x: &RWLock,
1101 make_mode2_go_first: bool) {
1102 // Much like sem_multi_resource.
1104 let (tx1, rx1) = channel();
1105 let (tx2, rx2) = channel();
1106 task::spawn(proc() {
1107 if !make_mode2_go_first {
1108 let _ = rx2.recv(); // parent sends to us once it locks, or ...
1110 lock_rwlock_in_mode(&x2, mode2, || {
1111 if make_mode2_go_first {
1112 tx1.send(()); // ... we send to it once we lock
1118 if make_mode2_go_first {
1119 let _ = rx1.recv(); // child sends to us once it locks, or ...
1121 lock_rwlock_in_mode(x, mode1, || {
1122 if !make_mode2_go_first {
1123 tx2.send(()); // ... we send to it once we lock
1130 fn test_rwlock_readers_and_readers() {
1131 test_rwlock_handshake(&RWLock::new(), Read, Read, false);
1132 // The downgrader needs to get in before the reader gets in, otherwise
1133 // they cannot end up reading at the same time.
1134 test_rwlock_handshake(&RWLock::new(), DowngradeRead, Read, false);
1135 test_rwlock_handshake(&RWLock::new(), Read, DowngradeRead, true);
1136 // Two downgrade_reads can never both end up reading at the same time.
1139 fn test_rwlock_downgrade_unlock() {
1140 // Tests that downgrade can unlock the lock in both modes
1141 let x = RWLock::new();
1142 lock_rwlock_in_mode(&x, Downgrade, || { });
1143 test_rwlock_handshake(&x, Read, Read, false);
1144 let y = RWLock::new();
1145 lock_rwlock_in_mode(&y, DowngradeRead, || { });
1146 test_rwlock_exclusion(&y, Write, Write);
1149 fn test_rwlock_read_recursive() {
1150 let x = RWLock::new();
1151 x.read(|| { x.read(|| { }) })
1154 fn test_rwlock_cond_wait() {
1155 // As test_mutex_cond_wait above.
1156 let x = RWLock::new();
1158 // Child wakes up parent
1159 x.write_cond(|cond| {
1161 task::spawn(proc() {
1162 x2.write_cond(|cond| {
1163 let woken = cond.signal();
1169 // Parent wakes up child
1170 let (tx, rx) = channel();
1172 task::spawn(proc() {
1173 x3.write_cond(|cond| {
1179 let _ = rx.recv(); // Wait until child gets in the rwlock
1180 x.read(|| { }); // Must be able to get in as a reader in the meantime
1181 x.write_cond(|cond| { // Or as another writer
1182 let woken = cond.signal();
1185 let _ = rx.recv(); // Wait until child wakes up
1186 x.read(|| { }); // Just for good measure
1189 fn test_rwlock_cond_broadcast_helper(num_waiters: uint,
1192 // Much like the mutex broadcast test. Downgrade-enabled.
1193 fn lock_cond(x: &RWLock, downgrade: bool, blk: |c: &Condvar|) {
1195 x.write_downgrade(|mode| {
1196 mode.write_cond(|c| { blk(c) });
1199 x.write_cond(|c| { blk(c) });
1202 let x = RWLock::new();
1205 for _ in range(0, num_waiters) {
1207 let (tx, rx) = channel();
1209 task::spawn(proc() {
1210 lock_cond(&xi, dg1, |cond| {
1218 // wait until all children get in the mutex
1219 for rx in rxs.mut_iter() { let _ = rx.recv(); }
1220 lock_cond(&x, dg2, |cond| {
1221 let num_woken = cond.broadcast();
1222 assert_eq!(num_woken, num_waiters);
1224 // wait until all children wake up
1225 for rx in rxs.mut_iter() { let _ = rx.recv(); }
1228 fn test_rwlock_cond_broadcast() {
1229 test_rwlock_cond_broadcast_helper(0, true, true);
1230 test_rwlock_cond_broadcast_helper(0, true, false);
1231 test_rwlock_cond_broadcast_helper(0, false, true);
1232 test_rwlock_cond_broadcast_helper(0, false, false);
1233 test_rwlock_cond_broadcast_helper(12, true, true);
1234 test_rwlock_cond_broadcast_helper(12, true, false);
1235 test_rwlock_cond_broadcast_helper(12, false, true);
1236 test_rwlock_cond_broadcast_helper(12, false, false);
1239 fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) {
1242 // Mutex must get automatically unlocked if failed/killed within.
1243 let x = RWLock::new();
1246 let result: result::Result<(), ~Any> = task::try(proc() {
1247 lock_rwlock_in_mode(&x2, mode1, || {
1251 assert!(result.is_err());
1252 // child task must have finished by the time try returns
1253 lock_rwlock_in_mode(&x, mode2, || { })
1256 fn test_rwlock_reader_killed_writer() {
1257 rwlock_kill_helper(Read, Write);
1260 fn test_rwlock_writer_killed_reader() {
1261 rwlock_kill_helper(Write, Read);
1264 fn test_rwlock_reader_killed_reader() {
1265 rwlock_kill_helper(Read, Read);
1268 fn test_rwlock_writer_killed_writer() {
1269 rwlock_kill_helper(Write, Write);
1272 fn test_rwlock_kill_downgrader() {
1273 rwlock_kill_helper(Downgrade, Read);
1274 rwlock_kill_helper(Read, Downgrade);
1275 rwlock_kill_helper(Downgrade, Write);
1276 rwlock_kill_helper(Write, Downgrade);
1277 rwlock_kill_helper(DowngradeRead, Read);
1278 rwlock_kill_helper(Read, DowngradeRead);
1279 rwlock_kill_helper(DowngradeRead, Write);
1280 rwlock_kill_helper(Write, DowngradeRead);
1281 rwlock_kill_helper(DowngradeRead, Downgrade);
1282 rwlock_kill_helper(DowngradeRead, Downgrade);
1283 rwlock_kill_helper(Downgrade, DowngradeRead);
1284 rwlock_kill_helper(Downgrade, DowngradeRead);
1286 #[test] #[should_fail]
1287 fn test_rwlock_downgrade_cant_swap() {
1288 // Tests that you can't downgrade with a different rwlock's token.
1289 let x = RWLock::new();
1290 let y = RWLock::new();
1291 x.write_downgrade(|xwrite| {
1292 let mut xopt = Some(xwrite);
1293 y.write_downgrade(|_ywrite| {
1294 y.downgrade(xopt.take_unwrap());
1295 error!("oops, y.downgrade(x) should have failed!");
1300 /************************************************************************
1302 ************************************************************************/
1305 let barrier = Barrier::new(10);
1306 let (tx, rx) = channel();
1308 for _ in range(0, 9) {
1309 let c = barrier.clone();
1310 let tx = tx.clone();
1317 // At this point, all spawned tasks should be blocked,
1318 // so we shouldn't get anything from the port
1319 assert!(match rx.try_recv() {
1325 // Now, the barrier is cleared and we should get data.
1326 for _ in range(0, 9) {