1 // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
12 * The concurrency primitives you know and love.
14 * Maybe once we have a "core exports x only to std" mechanism, these can be
21 use std::comm::SendDeferred;
23 use std::unstable::sync::{Exclusive, UnsafeAtomicRcBox};
24 use std::unstable::atomics;
25 use std::unstable::finally::Finally;
27 use std::util::NonCopyable;
29 /****************************************************************************
31 ****************************************************************************/
33 // Each waiting task receives on one of these.
35 type WaitEnd = comm::PortOne<()>;
37 type SignalEnd = comm::ChanOne<()>;
38 // A doubly-ended queue of waiting tasks.
40 struct WaitQueue { head: comm::Port<SignalEnd>,
41 tail: comm::Chan<SignalEnd> }
44 fn new() -> WaitQueue {
45 let (block_head, block_tail) = comm::stream();
46 WaitQueue { head: block_head, tail: block_tail }
49 // Signals one live task from the queue.
50 fn signal(&self) -> bool {
51 // The peek is mandatory to make sure recv doesn't block.
53 // Pop and send a wakeup signal. If the waiter was killed, its port
54 // will have closed. Keep trying until we get a live task.
55 if self.head.recv().try_send_deferred(()) {
65 fn broadcast(&self) -> uint {
67 while self.head.peek() {
68 if self.head.recv().try_send_deferred(()) {
76 // The building-block used to make semaphores, mutexes, and rwlocks.
81 // Can be either unit or another waitqueue. Some sems shouldn't come with
82 // a condition variable attached, others should.
87 struct Sem<Q>(Exclusive<SemInner<Q>>);
91 fn new(count: int, q: Q) -> Sem<Q> {
92 Sem(Exclusive::new(SemInner {
93 count: count, waiters: WaitQueue::new(), blocked: q }))
96 pub fn acquire(&self) {
98 let mut waiter_nobe = None;
99 do (**self).with |state| {
102 // Create waiter nobe.
103 let (WaitEnd, SignalEnd) = comm::oneshot();
104 // Tell outer scope we need to block.
105 waiter_nobe = Some(WaitEnd);
107 state.waiters.tail.send_deferred(SignalEnd);
110 // Uncomment if you wish to test for sem races. Not valgrind-friendly.
111 /* do 1000.times { task::yield(); } */
112 // Need to wait outside the exclusive.
113 if waiter_nobe.is_some() {
114 let _ = comm::recv_one(waiter_nobe.unwrap());
119 pub fn release(&self) {
121 do (**self).with |state| {
123 if state.count <= 0 {
124 state.waiters.signal();
130 pub fn access<U>(&self, blk: &fn() -> U) -> U {
131 do task::unkillable {
135 do task::rekillable { blk() }
145 impl Sem<~[WaitQueue]> {
146 fn new_and_signal(count: int, num_condvars: uint)
147 -> Sem<~[WaitQueue]> {
148 let mut queues = ~[];
149 do num_condvars.times {
150 queues.push(WaitQueue::new());
152 Sem::new(count, queues)
156 // FIXME(#3598): Want to use an Option down below, but we need a custom enum
157 // that's not polymorphic to get around the fact that lifetimes are invariant
158 // inside of type parameters.
159 enum ReacquireOrderLock<'self> {
161 Just(&'self Semaphore),
164 /// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
165 pub struct Condvar<'self> {
166 // The 'Sem' object associated with this condvar. This is the one that's
167 // atomically-unlocked-and-descheduled upon and reacquired during wakeup.
168 priv sem: &'self Sem<~[WaitQueue]>,
169 // This is (can be) an extra semaphore which is held around the reacquire
170 // operation on the first one. This is only used in cvars associated with
171 // rwlocks, and is needed to ensure that, when a downgrader is trying to
172 // hand off the access lock (which would be the first field, here), a 2nd
173 // writer waking up from a cvar wait can't race with a reader to steal it,
174 // See the comment in write_cond for more detail.
175 priv order: ReacquireOrderLock<'self>,
176 // Make sure condvars are non-copyable.
177 priv token: util::NonCopyable,
180 impl<'self> Condvar<'self> {
182 * Atomically drop the associated lock, and block until a signal is sent.
185 * A task which is killed (i.e., by linked failure with another task)
186 * while waiting on a condition variable will wake up, fail, and unlock
187 * the associated lock as it unwinds.
189 pub fn wait(&self) { self.wait_on(0) }
192 * As wait(), but can specify which of multiple condition variables to
193 * wait on. Only a signal_on() or broadcast_on() with the same condvar_id
194 * will wake this thread.
196 * The associated lock must have been initialised with an appropriate
197 * number of condvars. The condvar_id must be between 0 and num_condvars-1
198 * or else this call will fail.
200 * wait() is equivalent to wait_on(0).
202 pub fn wait_on(&self, condvar_id: uint) {
203 // Create waiter nobe.
204 let (WaitEnd, SignalEnd) = comm::oneshot();
205 let mut WaitEnd = Some(WaitEnd);
206 let mut SignalEnd = Some(SignalEnd);
207 let mut out_of_bounds = None;
208 do task::unkillable {
209 // Release lock, 'atomically' enqueuing ourselves in so doing.
211 do (**self.sem).with |state| {
212 if condvar_id < state.blocked.len() {
215 if state.count <= 0 {
216 state.waiters.signal();
218 // Enqueue ourself to be woken up by a signaller.
219 let SignalEnd = SignalEnd.take_unwrap();
220 state.blocked[condvar_id].tail.send_deferred(SignalEnd);
222 out_of_bounds = Some(state.blocked.len());
227 // If yield checks start getting inserted anywhere, we can be
228 // killed before or after enqueueing. Deciding whether to
229 // unkillably reacquire the lock needs to happen atomically
231 do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
232 // Unconditionally "block". (Might not actually block if a
233 // signaller already sent -- I mean 'unconditionally' in contrast
237 do task::rekillable {
238 let _ = comm::recv_one(WaitEnd.take_unwrap());
242 // Reacquire the condvar. Note this is back in the unkillable
243 // section; it needs to succeed, instead of itself dying.
245 Just(lock) => do lock.access {
257 /// Wake up a blocked task. Returns false if there was no blocked task.
258 pub fn signal(&self) -> bool { self.signal_on(0) }
260 /// As signal, but with a specified condvar_id. See wait_on.
261 pub fn signal_on(&self, condvar_id: uint) -> bool {
263 let mut out_of_bounds = None;
264 let mut result = false;
265 do (**self.sem).with |state| {
266 if condvar_id < state.blocked.len() {
267 result = state.blocked[condvar_id].signal();
269 out_of_bounds = Some(state.blocked.len());
272 do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
278 /// Wake up all blocked tasks. Returns the number of tasks woken.
279 pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
281 /// As broadcast, but with a specified condvar_id. See wait_on.
282 pub fn broadcast_on(&self, condvar_id: uint) -> uint {
283 let mut out_of_bounds = None;
284 let mut queue = None;
286 do (**self.sem).with |state| {
287 if condvar_id < state.blocked.len() {
288 // To avoid :broadcast_heavy, we make a new waitqueue,
289 // swap it out with the old one, and broadcast on the
290 // old one outside of the little-lock.
291 queue = Some(util::replace(&mut state.blocked[condvar_id],
294 out_of_bounds = Some(state.blocked.len());
297 do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
298 let queue = queue.take_unwrap();
305 // Checks whether a condvar ID was out of bounds, and fails if so, or does
306 // something else next on success.
309 fn check_cvar_bounds<U>(out_of_bounds: Option<uint>, id: uint, act: &str,
310 blk: &fn() -> U) -> U {
311 match out_of_bounds {
313 fail!("%s with illegal ID %u - this lock has no condvars!", act, id),
315 fail!("%s with illegal ID %u - ID must be less than %u", act, id, length),
321 impl Sem<~[WaitQueue]> {
322 // The only other places that condvars get built are rwlock.write_cond()
323 // and rwlock_write_mode.
324 pub fn access_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
326 blk(&Condvar { sem: self, order: Nothing, token: NonCopyable::new() })
331 /****************************************************************************
333 ****************************************************************************/
335 /// A counting, blocking, bounded-waiting semaphore.
336 struct Semaphore { priv sem: Sem<()> }
339 impl Clone for Semaphore {
340 /// Create a new handle to the semaphore.
341 fn clone(&self) -> Semaphore {
342 Semaphore { sem: Sem((*self.sem).clone()) }
347 /// Create a new semaphore with the specified count.
348 pub fn new(count: int) -> Semaphore {
349 Semaphore { sem: Sem::new(count, ()) }
353 * Acquire a resource represented by the semaphore. Blocks if necessary
354 * until resource(s) become available.
356 pub fn acquire(&self) { (&self.sem).acquire() }
359 * Release a held resource represented by the semaphore. Wakes a blocked
360 * contending task, if any exist. Won't block the caller.
362 pub fn release(&self) { (&self.sem).release() }
364 /// Run a function with ownership of one of the semaphore's resources.
365 pub fn access<U>(&self, blk: &fn() -> U) -> U { (&self.sem).access(blk) }
368 /****************************************************************************
370 ****************************************************************************/
373 * A blocking, bounded-waiting, mutual exclusion lock with an associated
374 * FIFO condition variable.
377 * A task which fails while holding a mutex will unlock the mutex as it
380 pub struct Mutex { priv sem: Sem<~[WaitQueue]> }
382 impl Clone for Mutex {
383 /// Create a new handle to the mutex.
384 fn clone(&self) -> Mutex { Mutex { sem: Sem((*self.sem).clone()) } }
388 /// Create a new mutex, with one associated condvar.
389 pub fn new() -> Mutex { Mutex::new_with_condvars(1) }
392 * Create a new mutex, with a specified number of associated condvars. This
393 * will allow calling wait_on/signal_on/broadcast_on with condvar IDs between
394 * 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be allowed but
395 * any operations on the condvar will fail.)
397 pub fn new_with_condvars(num_condvars: uint) -> Mutex {
398 Mutex { sem: Sem::new_and_signal(1, num_condvars) }
402 /// Run a function with ownership of the mutex.
403 pub fn lock<U>(&self, blk: &fn() -> U) -> U {
404 (&self.sem).access(blk)
407 /// Run a function with ownership of the mutex and a handle to a condvar.
408 pub fn lock_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
409 (&self.sem).access_cond(blk)
413 /****************************************************************************
414 * Reader-writer locks
415 ****************************************************************************/
417 // NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem
421 // You might ask, "Why don't you need to use an atomic for the mode flag?"
422 // This flag affects the behaviour of readers (for plain readers, they
423 // assert on it; for downgraders, they use it to decide which mode to
424 // unlock for). Consider that the flag is only unset when the very last
425 // reader exits; therefore, it can never be unset during a reader/reader
426 // (or reader/downgrader) race.
427 // By the way, if we didn't care about the assert in the read unlock path,
428 // we could instead store the mode flag in write_downgrade's stack frame,
429 // and have the downgrade tokens store a borrowed pointer to it.
431 // The only way the count flag is ever accessed is with xadd. Since it is
432 // a read-modify-write operation, multiple xadds on different cores will
433 // always be consistent with respect to each other, so a monotonic/relaxed
434 // consistency ordering suffices (i.e., no extra barriers are needed).
435 // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use
436 // acquire/release orderings superfluously. Change these someday.
437 read_count: atomics::AtomicUint,
441 * A blocking, no-starvation, reader-writer lock with an associated condvar.
444 * A task which fails while holding an rwlock will unlock the rwlock as it
448 priv order_lock: Semaphore,
449 priv access_lock: Sem<~[WaitQueue]>,
450 priv state: UnsafeAtomicRcBox<RWLockInner>,
454 /// Create a new rwlock, with one associated condvar.
455 pub fn new() -> RWLock { RWLock::new_with_condvars(1) }
458 * Create a new rwlock, with a specified number of associated condvars.
459 * Similar to mutex_with_condvars.
461 pub fn new_with_condvars(num_condvars: uint) -> RWLock {
462 let state = UnsafeAtomicRcBox::new(RWLockInner {
464 read_count: atomics::AtomicUint::new(0),
466 RWLock { order_lock: Semaphore::new(1),
467 access_lock: Sem::new_and_signal(1, num_condvars),
471 /// Create a new handle to the rwlock.
472 pub fn clone(&self) -> RWLock {
473 RWLock { order_lock: (&(self.order_lock)).clone(),
474 access_lock: Sem((*self.access_lock).clone()),
475 state: self.state.clone() }
479 * Run a function with the rwlock in read mode. Calls to 'read' from other
480 * tasks may run concurrently with this one.
482 pub fn read<U>(&self, blk: &fn() -> U) -> U {
484 do task::unkillable {
485 do (&self.order_lock).access {
486 let state = &mut *self.state.get();
487 let old_count = state.read_count.fetch_add(1, atomics::Acquire);
489 (&self.access_lock).acquire();
490 state.read_mode = true;
494 do task::rekillable { blk() }
496 let state = &mut *self.state.get();
497 assert!(state.read_mode);
498 let old_count = state.read_count.fetch_sub(1, atomics::Release);
499 assert!(old_count > 0);
501 state.read_mode = false;
502 // Note: this release used to be outside of a locked access
503 // to exclusive-protected state. If this code is ever
504 // converted back to such (instead of using atomic ops),
505 // this access MUST NOT go inside the exclusive access.
506 (&self.access_lock).release();
514 * Run a function with the rwlock in write mode. No calls to 'read' or
515 * 'write' from other tasks will run concurrently with this one.
517 pub fn write<U>(&self, blk: &fn() -> U) -> U {
519 do task::unkillable {
520 (&self.order_lock).acquire();
521 do (&self.access_lock).access {
522 (&self.order_lock).release();
523 do task::rekillable {
532 * As write(), but also with a handle to a condvar. Waiting on this
533 * condvar will allow readers and writers alike to take the rwlock before
534 * the waiting task is signalled. (Note: a writer that waited and then
535 * was signalled might reacquire the lock before other waiting writers.)
537 pub fn write_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
538 // It's important to thread our order lock into the condvar, so that
539 // when a cond.wait() wakes up, it uses it while reacquiring the
540 // access lock. If we permitted a waking-up writer to "cut in line",
541 // there could arise a subtle race when a downgrader attempts to hand
542 // off the reader cloud lock to a waiting reader. This race is tested
543 // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like:
544 // T1 (writer) T2 (downgrader) T3 (reader)
546 // [locks for writing]
547 // [holds access_lock]
548 // [is signalled, perhaps by
549 // downgrader or a 4th thread]
550 // tries to lock access(!)
552 // xadd read_count[0->1]
553 // tries to lock access
555 // xadd read_count[1->2]
557 // Since T1 contended on the access lock before T3 did, it will steal
558 // the lock handoff. Adding order_lock in the condvar reacquire path
559 // solves this because T1 will hold order_lock while waiting on access,
560 // which will cause T3 to have to wait until T1 finishes its write,
561 // which can't happen until T2 finishes the downgrade-read entirely.
562 // The astute reader will also note that making waking writers use the
563 // order_lock is better for not starving readers.
565 do task::unkillable {
566 (&self.order_lock).acquire();
567 do (&self.access_lock).access_cond |cond| {
568 (&self.order_lock).release();
569 do task::rekillable {
570 let opt_lock = Just(&self.order_lock);
571 blk(&Condvar { sem: cond.sem, order: opt_lock,
572 token: NonCopyable::new() })
580 * As write(), but with the ability to atomically 'downgrade' the lock;
581 * i.e., to become a reader without letting other writers get the lock in
582 * the meantime (such as unlocking and then re-locking as a reader would
583 * do). The block takes a "write mode token" argument, which can be
584 * transformed into a "read mode token" by calling downgrade(). Example:
589 * do lock.write_downgrade |mut write_token| {
590 * do write_token.write_cond |condvar| {
591 * ... exclusive access ...
593 * let read_token = lock.downgrade(write_token);
594 * do read_token.read {
595 * ... shared access ...
600 pub fn write_downgrade<U>(&self, blk: &fn(v: RWLockWriteMode) -> U) -> U {
601 // Implementation slightly different from the slicker 'write's above.
602 // The exit path is conditional on whether the caller downgrades.
603 do task::unkillable {
604 (&self.order_lock).acquire();
605 (&self.access_lock).acquire();
606 (&self.order_lock).release();
609 do task::rekillable {
610 blk(RWLockWriteMode { lock: self, token: NonCopyable::new() })
614 let writer_or_last_reader;
615 // Check if we're releasing from read mode or from write mode.
616 let state = unsafe { &mut *self.state.get() };
618 // Releasing from read mode.
619 let old_count = state.read_count.fetch_sub(1, atomics::Release);
620 assert!(old_count > 0);
621 // Check if other readers remain.
623 // Case 1: Writer downgraded & was the last reader
624 writer_or_last_reader = true;
625 state.read_mode = false;
627 // Case 2: Writer downgraded & was not the last reader
628 writer_or_last_reader = false;
631 // Case 3: Writer did not downgrade
632 writer_or_last_reader = true;
634 if writer_or_last_reader {
635 // Nobody left inside; release the "reader cloud" lock.
636 (&self.access_lock).release();
642 /// To be called inside of the write_downgrade block.
643 pub fn downgrade<'a>(&self, token: RWLockWriteMode<'a>)
644 -> RWLockReadMode<'a> {
645 if !borrow::ref_eq(self, token.lock) {
646 fail!("Can't downgrade() with a different rwlock's write_mode!");
649 do task::unkillable {
650 let state = &mut *self.state.get();
651 assert!(!state.read_mode);
652 state.read_mode = true;
653 // If a reader attempts to enter at this point, both the
654 // downgrader and reader will set the mode flag. This is fine.
655 let old_count = state.read_count.fetch_add(1, atomics::Release);
656 // If another reader was already blocking, we need to hand-off
657 // the "reader cloud" access lock to them.
659 // Guaranteed not to let another writer in, because
660 // another reader was holding the order_lock. Hence they
661 // must be the one to get the access_lock (because all
662 // access_locks are acquired with order_lock held). See
663 // the comment in write_cond for more justification.
664 (&self.access_lock).release();
668 RWLockReadMode { lock: token.lock, token: NonCopyable::new() }
672 /// The "write permission" token used for rwlock.write_downgrade().
673 pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable }
675 /// The "read permission" token used for rwlock.write_downgrade().
676 pub struct RWLockReadMode<'self> { priv lock: &'self RWLock,
677 priv token: NonCopyable }
679 impl<'self> RWLockWriteMode<'self> {
680 /// Access the pre-downgrade rwlock in write mode.
681 pub fn write<U>(&self, blk: &fn() -> U) -> U { blk() }
682 /// Access the pre-downgrade rwlock in write mode with a condvar.
683 pub fn write_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
684 // Need to make the condvar use the order lock when reacquiring the
685 // access lock. See comment in RWLock::write_cond for why.
686 blk(&Condvar { sem: &self.lock.access_lock,
687 order: Just(&self.lock.order_lock),
688 token: NonCopyable::new() })
692 impl<'self> RWLockReadMode<'self> {
693 /// Access the post-downgrade rwlock in read mode.
694 pub fn read<U>(&self, blk: &fn() -> U) -> U { blk() }
697 /****************************************************************************
699 ****************************************************************************/
712 /************************************************************************
714 ************************************************************************/
716 fn test_sem_acquire_release() {
717 let s = ~Semaphore::new(1);
723 fn test_sem_basic() {
724 let s = ~Semaphore::new(1);
728 fn test_sem_as_mutex() {
729 let s = ~Semaphore::new(1);
733 do 5.times { task::yield(); }
737 do 5.times { task::yield(); }
741 fn test_sem_as_cvar() {
742 /* Child waits and parent signals */
743 let (p,c) = comm::stream();
744 let s = ~Semaphore::new(0);
750 do 5.times { task::yield(); }
754 /* Parent waits and child signals */
755 let (p,c) = comm::stream();
756 let s = ~Semaphore::new(0);
759 do 5.times { task::yield(); }
767 fn test_sem_multi_resource() {
768 // Parent and child both get in the critical section at the same
769 // time, and shake hands.
770 let s = ~Semaphore::new(2);
772 let (p1,c1) = comm::stream();
773 let (p2,c2) = comm::stream();
786 fn test_sem_runtime_friendly_blocking() {
787 // Force the runtime to schedule two threads on the same sched_loop.
788 // When one blocks, it should schedule the other one.
789 do task::spawn_sched(task::SingleThreaded) {
790 let s = ~Semaphore::new(1);
792 let (p,c) = comm::stream();
793 let child_data = Cell::new((s2, c));
795 let (s2, c) = child_data.take();
801 let _ = p.recv(); // wait for child to come alive
802 do 5.times { task::yield(); } // let the child contend
804 let _ = p.recv(); // wait for child to be done
807 /************************************************************************
809 ************************************************************************/
811 fn test_mutex_lock() {
812 // Unsafely achieve shared state, and do the textbook
813 // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
814 let (p,c) = comm::stream();
815 let m = ~Mutex::new();
817 let mut sharedstate = ~0;
819 let ptr: *int = &*sharedstate;
821 let sharedstate: &mut int =
822 unsafe { cast::transmute(ptr) };
823 access_shared(sharedstate, m2, 10);
829 access_shared(sharedstate, m, 10);
832 assert_eq!(*sharedstate, 20);
835 fn access_shared(sharedstate: &mut int, m: &Mutex, n: uint) {
838 let oldval = *sharedstate;
840 *sharedstate = oldval + 1;
846 fn test_mutex_cond_wait() {
847 let m = ~Mutex::new();
849 // Child wakes up parent
850 do m.lock_cond |cond| {
853 do m2.lock_cond |cond| {
854 let woken = cond.signal();
860 // Parent wakes up child
861 let (port,chan) = comm::stream();
864 do m3.lock_cond |cond| {
870 let _ = port.recv(); // Wait until child gets in the mutex
871 do m.lock_cond |cond| {
872 let woken = cond.signal();
875 let _ = port.recv(); // Wait until child wakes up
878 fn test_mutex_cond_broadcast_helper(num_waiters: uint) {
879 let m = ~Mutex::new();
882 do num_waiters.times {
884 let (port, chan) = comm::stream();
887 do mi.lock_cond |cond| {
895 // wait until all children get in the mutex
896 for port in ports.iter() { let _ = port.recv(); }
897 do m.lock_cond |cond| {
898 let num_woken = cond.broadcast();
899 assert_eq!(num_woken, num_waiters);
901 // wait until all children wake up
902 for port in ports.iter() { let _ = port.recv(); }
905 fn test_mutex_cond_broadcast() {
906 test_mutex_cond_broadcast_helper(12);
909 fn test_mutex_cond_broadcast_none() {
910 test_mutex_cond_broadcast_helper(0);
913 fn test_mutex_cond_no_waiter() {
914 let m = ~Mutex::new();
917 do m.lock_cond |_x| { }
919 do m2.lock_cond |cond| {
920 assert!(!cond.signal());
923 #[test] #[ignore(cfg(windows))]
924 fn test_mutex_killed_simple() {
925 // Mutex must get automatically unlocked if failed/killed within.
926 let m = ~Mutex::new();
929 let result: result::Result<(),()> = do task::try || {
934 assert!(result.is_err());
935 // child task must have finished by the time try returns
938 #[ignore(reason = "linked failure")]
939 #[test] #[ignore(cfg(windows))]
940 fn test_mutex_killed_cond() {
941 // Getting killed during cond wait must not corrupt the mutex while
942 // unwinding (e.g. double unlock).
943 let m = ~Mutex::new();
946 let result: result::Result<(),()> = do task::try || {
947 let (p,c) = comm::stream();
948 do task::spawn || { // linked
949 let _ = p.recv(); // wait for sibling to get in the mutex
953 do m2.lock_cond |cond| {
954 c.send(()); // tell sibling go ahead
955 cond.wait(); // block forever
958 assert!(result.is_err());
959 // child task must have finished by the time try returns
960 do m.lock_cond |cond| {
961 let woken = cond.signal();
965 #[ignore(reason = "linked failure")]
966 #[test] #[ignore(cfg(windows))]
967 fn test_mutex_killed_broadcast() {
968 use std::unstable::finally::Finally;
970 let m = ~Mutex::new();
972 let (p,c) = comm::stream();
974 let result: result::Result<(),()> = do task::try || {
975 let mut sibling_convos = ~[];
977 let (p,c) = comm::stream();
978 let c = Cell::new(c);
979 sibling_convos.push(p);
980 let mi = ~m2.clone();
981 // spawn sibling task
982 do task::spawn { // linked
983 do mi.lock_cond |cond| {
985 c.send(()); // tell sibling to go ahead
987 cond.wait(); // block forever
989 error!("task unwinding and sending");
991 error!("task unwinding and done sending");
996 for p in sibling_convos.iter() {
997 let _ = p.recv(); // wait for sibling to get in the mutex
1000 c.send(sibling_convos); // let parent wait on all children
1003 assert!(result.is_err());
1004 // child task must have finished by the time try returns
1006 for p in r.iter() { p.recv(); } // wait on all its siblings
1007 do m.lock_cond |cond| {
1008 let woken = cond.broadcast();
1009 assert_eq!(woken, 0);
1013 fn test_mutex_cond_signal_on_0() {
1014 // Tests that signal_on(0) is equivalent to signal().
1015 let m = ~Mutex::new();
1016 do m.lock_cond |cond| {
1017 let m2 = ~m.clone();
1019 do m2.lock_cond |cond| {
1026 #[test] #[ignore(cfg(windows))]
1027 fn test_mutex_different_conds() {
1028 let result = do task::try {
1029 let m = ~Mutex::new_with_condvars(2);
1030 let m2 = ~m.clone();
1031 let (p,c) = comm::stream();
1033 do m2.lock_cond |cond| {
1039 do m.lock_cond |cond| {
1040 if !cond.signal_on(0) {
1041 fail!(); // success; punt sibling awake.
1045 assert!(result.is_err());
1047 #[test] #[ignore(cfg(windows))]
1048 fn test_mutex_no_condvars() {
1049 let result = do task::try {
1050 let m = ~Mutex::new_with_condvars(0);
1051 do m.lock_cond |cond| { cond.wait(); }
1053 assert!(result.is_err());
1054 let result = do task::try {
1055 let m = ~Mutex::new_with_condvars(0);
1056 do m.lock_cond |cond| { cond.signal(); }
1058 assert!(result.is_err());
1059 let result = do task::try {
1060 let m = ~Mutex::new_with_condvars(0);
1061 do m.lock_cond |cond| { cond.broadcast(); }
1063 assert!(result.is_err());
1065 /************************************************************************
1066 * Reader/writer lock tests
1067 ************************************************************************/
1069 pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead }
1071 fn lock_rwlock_in_mode(x: &RWLock, mode: RWLockMode, blk: &fn()) {
1073 Read => x.read(blk),
1074 Write => x.write(blk),
1076 do x.write_downgrade |mode| {
1077 do mode.write { blk() };
1080 do x.write_downgrade |mode| {
1081 let mode = x.downgrade(mode);
1082 do mode.read { blk() };
1087 fn test_rwlock_exclusion(x: ~RWLock,
1089 mode2: RWLockMode) {
1090 // Test mutual exclusion between readers and writers. Just like the
1091 // mutex mutual exclusion test, a ways above.
1092 let (p,c) = comm::stream();
1093 let x2 = (*x).clone();
1094 let mut sharedstate = ~0;
1096 let ptr: *int = &*sharedstate;
1098 let sharedstate: &mut int =
1099 unsafe { cast::transmute(ptr) };
1100 access_shared(sharedstate, &x2, mode1, 10);
1105 access_shared(sharedstate, x, mode2, 10);
1108 assert_eq!(*sharedstate, 20);
1111 fn access_shared(sharedstate: &mut int, x: &RWLock, mode: RWLockMode,
1114 do lock_rwlock_in_mode(x, mode) {
1115 let oldval = *sharedstate;
1117 *sharedstate = oldval + 1;
1123 fn test_rwlock_readers_wont_modify_the_data() {
1124 test_rwlock_exclusion(~RWLock::new(), Read, Write);
1125 test_rwlock_exclusion(~RWLock::new(), Write, Read);
1126 test_rwlock_exclusion(~RWLock::new(), Read, Downgrade);
1127 test_rwlock_exclusion(~RWLock::new(), Downgrade, Read);
1130 fn test_rwlock_writers_and_writers() {
1131 test_rwlock_exclusion(~RWLock::new(), Write, Write);
1132 test_rwlock_exclusion(~RWLock::new(), Write, Downgrade);
1133 test_rwlock_exclusion(~RWLock::new(), Downgrade, Write);
1134 test_rwlock_exclusion(~RWLock::new(), Downgrade, Downgrade);
1137 fn test_rwlock_handshake(x: ~RWLock,
1140 make_mode2_go_first: bool) {
1141 // Much like sem_multi_resource.
1142 let x2 = (*x).clone();
1143 let (p1,c1) = comm::stream();
1144 let (p2,c2) = comm::stream();
1146 if !make_mode2_go_first {
1147 let _ = p2.recv(); // parent sends to us once it locks, or ...
1149 do lock_rwlock_in_mode(&x2, mode2) {
1150 if make_mode2_go_first {
1151 c1.send(()); // ... we send to it once we lock
1157 if make_mode2_go_first {
1158 let _ = p1.recv(); // child sends to us once it locks, or ...
1160 do lock_rwlock_in_mode(x, mode1) {
1161 if !make_mode2_go_first {
1162 c2.send(()); // ... we send to it once we lock
1169 fn test_rwlock_readers_and_readers() {
1170 test_rwlock_handshake(~RWLock::new(), Read, Read, false);
1171 // The downgrader needs to get in before the reader gets in, otherwise
1172 // they cannot end up reading at the same time.
1173 test_rwlock_handshake(~RWLock::new(), DowngradeRead, Read, false);
1174 test_rwlock_handshake(~RWLock::new(), Read, DowngradeRead, true);
1175 // Two downgrade_reads can never both end up reading at the same time.
1178 fn test_rwlock_downgrade_unlock() {
1179 // Tests that downgrade can unlock the lock in both modes
1180 let x = ~RWLock::new();
1181 do lock_rwlock_in_mode(x, Downgrade) { }
1182 test_rwlock_handshake(x, Read, Read, false);
1183 let y = ~RWLock::new();
1184 do lock_rwlock_in_mode(y, DowngradeRead) { }
1185 test_rwlock_exclusion(y, Write, Write);
1188 fn test_rwlock_read_recursive() {
1189 let x = ~RWLock::new();
1190 do x.read { do x.read { } }
1193 fn test_rwlock_cond_wait() {
1194 // As test_mutex_cond_wait above.
1195 let x = ~RWLock::new();
1197 // Child wakes up parent
1198 do x.write_cond |cond| {
1199 let x2 = (*x).clone();
1201 do x2.write_cond |cond| {
1202 let woken = cond.signal();
1208 // Parent wakes up child
1209 let (port,chan) = comm::stream();
1210 let x3 = (*x).clone();
1212 do x3.write_cond |cond| {
1218 let _ = port.recv(); // Wait until child gets in the rwlock
1219 do x.read { } // Must be able to get in as a reader in the meantime
1220 do x.write_cond |cond| { // Or as another writer
1221 let woken = cond.signal();
1224 let _ = port.recv(); // Wait until child wakes up
1225 do x.read { } // Just for good measure
1228 fn test_rwlock_cond_broadcast_helper(num_waiters: uint,
1231 // Much like the mutex broadcast test. Downgrade-enabled.
1232 fn lock_cond(x: &RWLock, downgrade: bool, blk: &fn(c: &Condvar)) {
1234 do x.write_downgrade |mode| {
1235 do mode.write_cond |c| { blk(c) }
1238 do x.write_cond |c| { blk(c) }
1241 let x = ~RWLock::new();
1242 let mut ports = ~[];
1244 do num_waiters.times {
1245 let xi = (*x).clone();
1246 let (port, chan) = comm::stream();
1249 do lock_cond(&xi, dg1) |cond| {
1257 // wait until all children get in the mutex
1258 for port in ports.iter() { let _ = port.recv(); }
1259 do lock_cond(x, dg2) |cond| {
1260 let num_woken = cond.broadcast();
1261 assert_eq!(num_woken, num_waiters);
1263 // wait until all children wake up
1264 for port in ports.iter() { let _ = port.recv(); }
1267 fn test_rwlock_cond_broadcast() {
1268 test_rwlock_cond_broadcast_helper(0, true, true);
1269 test_rwlock_cond_broadcast_helper(0, true, false);
1270 test_rwlock_cond_broadcast_helper(0, false, true);
1271 test_rwlock_cond_broadcast_helper(0, false, false);
1272 test_rwlock_cond_broadcast_helper(12, true, true);
1273 test_rwlock_cond_broadcast_helper(12, true, false);
1274 test_rwlock_cond_broadcast_helper(12, false, true);
1275 test_rwlock_cond_broadcast_helper(12, false, false);
1277 #[cfg(test)] #[ignore(cfg(windows))]
1278 fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) {
1279 // Mutex must get automatically unlocked if failed/killed within.
1280 let x = ~RWLock::new();
1281 let x2 = (*x).clone();
1283 let result: result::Result<(),()> = do task::try || {
1284 do lock_rwlock_in_mode(&x2, mode1) {
1288 assert!(result.is_err());
1289 // child task must have finished by the time try returns
1290 do lock_rwlock_in_mode(x, mode2) { }
1292 #[test] #[ignore(cfg(windows))]
1293 fn test_rwlock_reader_killed_writer() {
1294 rwlock_kill_helper(Read, Write);
1296 #[test] #[ignore(cfg(windows))]
1297 fn test_rwlock_writer_killed_reader() {
1298 rwlock_kill_helper(Write,Read );
1300 #[test] #[ignore(cfg(windows))]
1301 fn test_rwlock_reader_killed_reader() {
1302 rwlock_kill_helper(Read, Read );
1304 #[test] #[ignore(cfg(windows))]
1305 fn test_rwlock_writer_killed_writer() {
1306 rwlock_kill_helper(Write,Write);
1308 #[test] #[ignore(cfg(windows))]
1309 fn test_rwlock_kill_downgrader() {
1310 rwlock_kill_helper(Downgrade, Read);
1311 rwlock_kill_helper(Read, Downgrade);
1312 rwlock_kill_helper(Downgrade, Write);
1313 rwlock_kill_helper(Write, Downgrade);
1314 rwlock_kill_helper(DowngradeRead, Read);
1315 rwlock_kill_helper(Read, DowngradeRead);
1316 rwlock_kill_helper(DowngradeRead, Write);
1317 rwlock_kill_helper(Write, DowngradeRead);
1318 rwlock_kill_helper(DowngradeRead, Downgrade);
1319 rwlock_kill_helper(DowngradeRead, Downgrade);
1320 rwlock_kill_helper(Downgrade, DowngradeRead);
1321 rwlock_kill_helper(Downgrade, DowngradeRead);
1323 #[test] #[should_fail] #[ignore(cfg(windows))]
1324 fn test_rwlock_downgrade_cant_swap() {
1325 // Tests that you can't downgrade with a different rwlock's token.
1326 let x = ~RWLock::new();
1327 let y = ~RWLock::new();
1328 do x.write_downgrade |xwrite| {
1329 let mut xopt = Some(xwrite);
1330 do y.write_downgrade |_ywrite| {
1331 y.downgrade(xopt.take_unwrap());
1332 error!("oops, y.downgrade(x) should have failed!");