]> git.lizzy.rs Git - rust.git/blob - src/libextra/sync.rs
Disable linked failure tests
[rust.git] / src / libextra / sync.rs
1 // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 /**
12  * The concurrency primitives you know and love.
13  *
14  * Maybe once we have a "core exports x only to std" mechanism, these can be
15  * in std.
16  */
17
18
19 use std::borrow;
20 use std::comm;
21 use std::comm::SendDeferred;
22 use std::task;
23 use std::unstable::sync::{Exclusive, UnsafeAtomicRcBox};
24 use std::unstable::atomics;
25 use std::unstable::finally::Finally;
26 use std::util;
27 use std::util::NonCopyable;
28
29 /****************************************************************************
30  * Internals
31  ****************************************************************************/
32
33 // Each waiting task receives on one of these.
34 #[doc(hidden)]
35 type WaitEnd = comm::PortOne<()>;
36 #[doc(hidden)]
37 type SignalEnd = comm::ChanOne<()>;
38 // A doubly-ended queue of waiting tasks.
39 #[doc(hidden)]
40 struct WaitQueue { head: comm::Port<SignalEnd>,
41                    tail: comm::Chan<SignalEnd> }
42
43 impl WaitQueue {
44     fn new() -> WaitQueue {
45         let (block_head, block_tail) = comm::stream();
46         WaitQueue { head: block_head, tail: block_tail }
47     }
48
49     // Signals one live task from the queue.
50     fn signal(&self) -> bool {
51         // The peek is mandatory to make sure recv doesn't block.
52         if self.head.peek() {
53             // Pop and send a wakeup signal. If the waiter was killed, its port
54             // will have closed. Keep trying until we get a live task.
55             if self.head.recv().try_send_deferred(()) {
56                 true
57             } else {
58                 self.signal()
59             }
60         } else {
61             false
62         }
63     }
64
65     fn broadcast(&self) -> uint {
66         let mut count = 0;
67         while self.head.peek() {
68             if self.head.recv().try_send_deferred(()) {
69                 count += 1;
70             }
71         }
72         count
73     }
74 }
75
76 // The building-block used to make semaphores, mutexes, and rwlocks.
77 #[doc(hidden)]
78 struct SemInner<Q> {
79     count: int,
80     waiters:   WaitQueue,
81     // Can be either unit or another waitqueue. Some sems shouldn't come with
82     // a condition variable attached, others should.
83     blocked:   Q
84 }
85
86 #[doc(hidden)]
87 struct Sem<Q>(Exclusive<SemInner<Q>>);
88
89 #[doc(hidden)]
90 impl<Q:Send> Sem<Q> {
91     fn new(count: int, q: Q) -> Sem<Q> {
92         Sem(Exclusive::new(SemInner {
93             count: count, waiters: WaitQueue::new(), blocked: q }))
94     }
95
96     pub fn acquire(&self) {
97         unsafe {
98             let mut waiter_nobe = None;
99             do (**self).with |state| {
100                 state.count -= 1;
101                 if state.count < 0 {
102                     // Create waiter nobe.
103                     let (WaitEnd, SignalEnd) = comm::oneshot();
104                     // Tell outer scope we need to block.
105                     waiter_nobe = Some(WaitEnd);
106                     // Enqueue ourself.
107                     state.waiters.tail.send_deferred(SignalEnd);
108                 }
109             }
110             // Uncomment if you wish to test for sem races. Not valgrind-friendly.
111             /* do 1000.times { task::yield(); } */
112             // Need to wait outside the exclusive.
113             if waiter_nobe.is_some() {
114                 let _ = comm::recv_one(waiter_nobe.unwrap());
115             }
116         }
117     }
118
119     pub fn release(&self) {
120         unsafe {
121             do (**self).with |state| {
122                 state.count += 1;
123                 if state.count <= 0 {
124                     state.waiters.signal();
125                 }
126             }
127         }
128     }
129
130     pub fn access<U>(&self, blk: &fn() -> U) -> U {
131         do task::unkillable {
132             do (|| {
133                 self.acquire();
134                 unsafe {
135                     do task::rekillable { blk() }
136                 }
137             }).finally {
138                 self.release();
139             }
140         }
141     }
142 }
143
144 #[doc(hidden)]
145 impl Sem<~[WaitQueue]> {
146     fn new_and_signal(count: int, num_condvars: uint)
147         -> Sem<~[WaitQueue]> {
148         let mut queues = ~[];
149         do num_condvars.times {
150             queues.push(WaitQueue::new());
151         }
152         Sem::new(count, queues)
153     }
154 }
155
156 // FIXME(#3598): Want to use an Option down below, but we need a custom enum
157 // that's not polymorphic to get around the fact that lifetimes are invariant
158 // inside of type parameters.
159 enum ReacquireOrderLock<'self> {
160     Nothing, // c.c
161     Just(&'self Semaphore),
162 }
163
164 /// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
165 pub struct Condvar<'self> {
166     // The 'Sem' object associated with this condvar. This is the one that's
167     // atomically-unlocked-and-descheduled upon and reacquired during wakeup.
168     priv sem: &'self Sem<~[WaitQueue]>,
169     // This is (can be) an extra semaphore which is held around the reacquire
170     // operation on the first one. This is only used in cvars associated with
171     // rwlocks, and is needed to ensure that, when a downgrader is trying to
172     // hand off the access lock (which would be the first field, here), a 2nd
173     // writer waking up from a cvar wait can't race with a reader to steal it,
174     // See the comment in write_cond for more detail.
175     priv order: ReacquireOrderLock<'self>,
176     // Make sure condvars are non-copyable.
177     priv token: util::NonCopyable,
178 }
179
180 impl<'self> Condvar<'self> {
181     /**
182      * Atomically drop the associated lock, and block until a signal is sent.
183      *
184      * # Failure
185      * A task which is killed (i.e., by linked failure with another task)
186      * while waiting on a condition variable will wake up, fail, and unlock
187      * the associated lock as it unwinds.
188      */
189     pub fn wait(&self) { self.wait_on(0) }
190
191     /**
192      * As wait(), but can specify which of multiple condition variables to
193      * wait on. Only a signal_on() or broadcast_on() with the same condvar_id
194      * will wake this thread.
195      *
196      * The associated lock must have been initialised with an appropriate
197      * number of condvars. The condvar_id must be between 0 and num_condvars-1
198      * or else this call will fail.
199      *
200      * wait() is equivalent to wait_on(0).
201      */
202     pub fn wait_on(&self, condvar_id: uint) {
203         // Create waiter nobe.
204         let (WaitEnd, SignalEnd) = comm::oneshot();
205         let mut WaitEnd   = Some(WaitEnd);
206         let mut SignalEnd = Some(SignalEnd);
207         let mut out_of_bounds = None;
208         do task::unkillable {
209             // Release lock, 'atomically' enqueuing ourselves in so doing.
210             unsafe {
211                 do (**self.sem).with |state| {
212                     if condvar_id < state.blocked.len() {
213                         // Drop the lock.
214                         state.count += 1;
215                         if state.count <= 0 {
216                             state.waiters.signal();
217                         }
218                         // Enqueue ourself to be woken up by a signaller.
219                         let SignalEnd = SignalEnd.take_unwrap();
220                         state.blocked[condvar_id].tail.send_deferred(SignalEnd);
221                     } else {
222                         out_of_bounds = Some(state.blocked.len());
223                     }
224                 }
225             }
226
227             // If yield checks start getting inserted anywhere, we can be
228             // killed before or after enqueueing. Deciding whether to
229             // unkillably reacquire the lock needs to happen atomically
230             // wrt enqueuing.
231             do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
232                 // Unconditionally "block". (Might not actually block if a
233                 // signaller already sent -- I mean 'unconditionally' in contrast
234                 // with acquire().)
235                 do (|| {
236                     unsafe {
237                         do task::rekillable {
238                             let _ = comm::recv_one(WaitEnd.take_unwrap());
239                         }
240                     }
241                 }).finally {
242                     // Reacquire the condvar. Note this is back in the unkillable
243                     // section; it needs to succeed, instead of itself dying.
244                     match self.order {
245                         Just(lock) => do lock.access {
246                             self.sem.acquire();
247                         },
248                         Nothing => {
249                             self.sem.acquire();
250                         },
251                     }
252                 }
253             }
254         }
255     }
256
257     /// Wake up a blocked task. Returns false if there was no blocked task.
258     pub fn signal(&self) -> bool { self.signal_on(0) }
259
260     /// As signal, but with a specified condvar_id. See wait_on.
261     pub fn signal_on(&self, condvar_id: uint) -> bool {
262         unsafe {
263             let mut out_of_bounds = None;
264             let mut result = false;
265             do (**self.sem).with |state| {
266                 if condvar_id < state.blocked.len() {
267                     result = state.blocked[condvar_id].signal();
268                 } else {
269                     out_of_bounds = Some(state.blocked.len());
270                 }
271             }
272             do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
273                 result
274             }
275         }
276     }
277
278     /// Wake up all blocked tasks. Returns the number of tasks woken.
279     pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
280
281     /// As broadcast, but with a specified condvar_id. See wait_on.
282     pub fn broadcast_on(&self, condvar_id: uint) -> uint {
283         let mut out_of_bounds = None;
284         let mut queue = None;
285         unsafe {
286             do (**self.sem).with |state| {
287                 if condvar_id < state.blocked.len() {
288                     // To avoid :broadcast_heavy, we make a new waitqueue,
289                     // swap it out with the old one, and broadcast on the
290                     // old one outside of the little-lock.
291                     queue = Some(util::replace(&mut state.blocked[condvar_id],
292                                                WaitQueue::new()));
293                 } else {
294                     out_of_bounds = Some(state.blocked.len());
295                 }
296             }
297             do check_cvar_bounds(out_of_bounds, condvar_id, "cond.signal_on()") {
298                 let queue = queue.take_unwrap();
299                 queue.broadcast()
300             }
301         }
302     }
303 }
304
305 // Checks whether a condvar ID was out of bounds, and fails if so, or does
306 // something else next on success.
307 #[inline]
308 #[doc(hidden)]
309 fn check_cvar_bounds<U>(out_of_bounds: Option<uint>, id: uint, act: &str,
310                         blk: &fn() -> U) -> U {
311     match out_of_bounds {
312         Some(0) =>
313             fail!("%s with illegal ID %u - this lock has no condvars!", act, id),
314         Some(length) =>
315             fail!("%s with illegal ID %u - ID must be less than %u", act, id, length),
316         None => blk()
317     }
318 }
319
320 #[doc(hidden)]
321 impl Sem<~[WaitQueue]> {
322     // The only other places that condvars get built are rwlock.write_cond()
323     // and rwlock_write_mode.
324     pub fn access_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
325         do self.access {
326             blk(&Condvar { sem: self, order: Nothing, token: NonCopyable::new() })
327         }
328     }
329 }
330
331 /****************************************************************************
332  * Semaphores
333  ****************************************************************************/
334
335 /// A counting, blocking, bounded-waiting semaphore.
336 struct Semaphore { priv sem: Sem<()> }
337
338
339 impl Clone for Semaphore {
340     /// Create a new handle to the semaphore.
341     fn clone(&self) -> Semaphore {
342         Semaphore { sem: Sem((*self.sem).clone()) }
343     }
344 }
345
346 impl Semaphore {
347     /// Create a new semaphore with the specified count.
348     pub fn new(count: int) -> Semaphore {
349         Semaphore { sem: Sem::new(count, ()) }
350     }
351
352     /**
353      * Acquire a resource represented by the semaphore. Blocks if necessary
354      * until resource(s) become available.
355      */
356     pub fn acquire(&self) { (&self.sem).acquire() }
357
358     /**
359      * Release a held resource represented by the semaphore. Wakes a blocked
360      * contending task, if any exist. Won't block the caller.
361      */
362     pub fn release(&self) { (&self.sem).release() }
363
364     /// Run a function with ownership of one of the semaphore's resources.
365     pub fn access<U>(&self, blk: &fn() -> U) -> U { (&self.sem).access(blk) }
366 }
367
368 /****************************************************************************
369  * Mutexes
370  ****************************************************************************/
371
372 /**
373  * A blocking, bounded-waiting, mutual exclusion lock with an associated
374  * FIFO condition variable.
375  *
376  * # Failure
377  * A task which fails while holding a mutex will unlock the mutex as it
378  * unwinds.
379  */
380 pub struct Mutex { priv sem: Sem<~[WaitQueue]> }
381
382 impl Clone for Mutex {
383     /// Create a new handle to the mutex.
384     fn clone(&self) -> Mutex { Mutex { sem: Sem((*self.sem).clone()) } }
385 }
386
387 impl Mutex {
388     /// Create a new mutex, with one associated condvar.
389     pub fn new() -> Mutex { Mutex::new_with_condvars(1) }
390
391     /**
392     * Create a new mutex, with a specified number of associated condvars. This
393     * will allow calling wait_on/signal_on/broadcast_on with condvar IDs between
394     * 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be allowed but
395     * any operations on the condvar will fail.)
396     */
397     pub fn new_with_condvars(num_condvars: uint) -> Mutex {
398         Mutex { sem: Sem::new_and_signal(1, num_condvars) }
399     }
400
401
402     /// Run a function with ownership of the mutex.
403     pub fn lock<U>(&self, blk: &fn() -> U) -> U {
404         (&self.sem).access(blk)
405     }
406
407     /// Run a function with ownership of the mutex and a handle to a condvar.
408     pub fn lock_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
409         (&self.sem).access_cond(blk)
410     }
411 }
412
413 /****************************************************************************
414  * Reader-writer locks
415  ****************************************************************************/
416
417 // NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem
418
419 #[doc(hidden)]
420 struct RWLockInner {
421     // You might ask, "Why don't you need to use an atomic for the mode flag?"
422     // This flag affects the behaviour of readers (for plain readers, they
423     // assert on it; for downgraders, they use it to decide which mode to
424     // unlock for). Consider that the flag is only unset when the very last
425     // reader exits; therefore, it can never be unset during a reader/reader
426     // (or reader/downgrader) race.
427     // By the way, if we didn't care about the assert in the read unlock path,
428     // we could instead store the mode flag in write_downgrade's stack frame,
429     // and have the downgrade tokens store a borrowed pointer to it.
430     read_mode:  bool,
431     // The only way the count flag is ever accessed is with xadd. Since it is
432     // a read-modify-write operation, multiple xadds on different cores will
433     // always be consistent with respect to each other, so a monotonic/relaxed
434     // consistency ordering suffices (i.e., no extra barriers are needed).
435     // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use
436     // acquire/release orderings superfluously. Change these someday.
437     read_count: atomics::AtomicUint,
438 }
439
440 /**
441  * A blocking, no-starvation, reader-writer lock with an associated condvar.
442  *
443  * # Failure
444  * A task which fails while holding an rwlock will unlock the rwlock as it
445  * unwinds.
446  */
447 pub struct RWLock {
448     priv order_lock:  Semaphore,
449     priv access_lock: Sem<~[WaitQueue]>,
450     priv state:       UnsafeAtomicRcBox<RWLockInner>,
451 }
452
453 impl RWLock {
454     /// Create a new rwlock, with one associated condvar.
455     pub fn new() -> RWLock { RWLock::new_with_condvars(1) }
456
457     /**
458     * Create a new rwlock, with a specified number of associated condvars.
459     * Similar to mutex_with_condvars.
460     */
461     pub fn new_with_condvars(num_condvars: uint) -> RWLock {
462         let state = UnsafeAtomicRcBox::new(RWLockInner {
463             read_mode:  false,
464             read_count: atomics::AtomicUint::new(0),
465         });
466         RWLock { order_lock:  Semaphore::new(1),
467                 access_lock: Sem::new_and_signal(1, num_condvars),
468                 state:       state, }
469     }
470
471     /// Create a new handle to the rwlock.
472     pub fn clone(&self) -> RWLock {
473         RWLock { order_lock:  (&(self.order_lock)).clone(),
474                  access_lock: Sem((*self.access_lock).clone()),
475                  state:       self.state.clone() }
476     }
477
478     /**
479      * Run a function with the rwlock in read mode. Calls to 'read' from other
480      * tasks may run concurrently with this one.
481      */
482     pub fn read<U>(&self, blk: &fn() -> U) -> U {
483         unsafe {
484             do task::unkillable {
485                 do (&self.order_lock).access {
486                     let state = &mut *self.state.get();
487                     let old_count = state.read_count.fetch_add(1, atomics::Acquire);
488                     if old_count == 0 {
489                         (&self.access_lock).acquire();
490                         state.read_mode = true;
491                     }
492                 }
493                 do (|| {
494                     do task::rekillable { blk() }
495                 }).finally {
496                     let state = &mut *self.state.get();
497                     assert!(state.read_mode);
498                     let old_count = state.read_count.fetch_sub(1, atomics::Release);
499                     assert!(old_count > 0);
500                     if old_count == 1 {
501                         state.read_mode = false;
502                         // Note: this release used to be outside of a locked access
503                         // to exclusive-protected state. If this code is ever
504                         // converted back to such (instead of using atomic ops),
505                         // this access MUST NOT go inside the exclusive access.
506                         (&self.access_lock).release();
507                     }
508                 }
509             }
510         }
511     }
512
513     /**
514      * Run a function with the rwlock in write mode. No calls to 'read' or
515      * 'write' from other tasks will run concurrently with this one.
516      */
517     pub fn write<U>(&self, blk: &fn() -> U) -> U {
518         unsafe {
519             do task::unkillable {
520                 (&self.order_lock).acquire();
521                 do (&self.access_lock).access {
522                     (&self.order_lock).release();
523                     do task::rekillable {
524                         blk()
525                     }
526                 }
527             }
528         }
529     }
530
531     /**
532      * As write(), but also with a handle to a condvar. Waiting on this
533      * condvar will allow readers and writers alike to take the rwlock before
534      * the waiting task is signalled. (Note: a writer that waited and then
535      * was signalled might reacquire the lock before other waiting writers.)
536      */
537     pub fn write_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
538         // It's important to thread our order lock into the condvar, so that
539         // when a cond.wait() wakes up, it uses it while reacquiring the
540         // access lock. If we permitted a waking-up writer to "cut in line",
541         // there could arise a subtle race when a downgrader attempts to hand
542         // off the reader cloud lock to a waiting reader. This race is tested
543         // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like:
544         // T1 (writer)              T2 (downgrader)             T3 (reader)
545         // [in cond.wait()]
546         //                          [locks for writing]
547         //                          [holds access_lock]
548         // [is signalled, perhaps by
549         //  downgrader or a 4th thread]
550         // tries to lock access(!)
551         //                                                      lock order_lock
552         //                                                      xadd read_count[0->1]
553         //                                                      tries to lock access
554         //                          [downgrade]
555         //                          xadd read_count[1->2]
556         //                          unlock access
557         // Since T1 contended on the access lock before T3 did, it will steal
558         // the lock handoff. Adding order_lock in the condvar reacquire path
559         // solves this because T1 will hold order_lock while waiting on access,
560         // which will cause T3 to have to wait until T1 finishes its write,
561         // which can't happen until T2 finishes the downgrade-read entirely.
562         // The astute reader will also note that making waking writers use the
563         // order_lock is better for not starving readers.
564         unsafe {
565             do task::unkillable {
566                 (&self.order_lock).acquire();
567                 do (&self.access_lock).access_cond |cond| {
568                     (&self.order_lock).release();
569                     do task::rekillable {
570                         let opt_lock = Just(&self.order_lock);
571                         blk(&Condvar { sem: cond.sem, order: opt_lock,
572                                        token: NonCopyable::new() })
573                     }
574                 }
575             }
576         }
577     }
578
579     /**
580      * As write(), but with the ability to atomically 'downgrade' the lock;
581      * i.e., to become a reader without letting other writers get the lock in
582      * the meantime (such as unlocking and then re-locking as a reader would
583      * do). The block takes a "write mode token" argument, which can be
584      * transformed into a "read mode token" by calling downgrade(). Example:
585      *
586      * # Example
587      *
588      * ~~~ {.rust}
589      * do lock.write_downgrade |mut write_token| {
590      *     do write_token.write_cond |condvar| {
591      *         ... exclusive access ...
592      *     }
593      *     let read_token = lock.downgrade(write_token);
594      *     do read_token.read {
595      *         ... shared access ...
596      *     }
597      * }
598      * ~~~
599      */
600     pub fn write_downgrade<U>(&self, blk: &fn(v: RWLockWriteMode) -> U) -> U {
601         // Implementation slightly different from the slicker 'write's above.
602         // The exit path is conditional on whether the caller downgrades.
603         do task::unkillable {
604             (&self.order_lock).acquire();
605             (&self.access_lock).acquire();
606             (&self.order_lock).release();
607             do (|| {
608                 unsafe {
609                     do task::rekillable {
610                         blk(RWLockWriteMode { lock: self, token: NonCopyable::new() })
611                     }
612                 }
613             }).finally {
614                 let writer_or_last_reader;
615                 // Check if we're releasing from read mode or from write mode.
616                 let state = unsafe { &mut *self.state.get() };
617                 if state.read_mode {
618                     // Releasing from read mode.
619                     let old_count = state.read_count.fetch_sub(1, atomics::Release);
620                     assert!(old_count > 0);
621                     // Check if other readers remain.
622                     if old_count == 1 {
623                         // Case 1: Writer downgraded & was the last reader
624                         writer_or_last_reader = true;
625                         state.read_mode = false;
626                     } else {
627                         // Case 2: Writer downgraded & was not the last reader
628                         writer_or_last_reader = false;
629                     }
630                 } else {
631                     // Case 3: Writer did not downgrade
632                     writer_or_last_reader = true;
633                 }
634                 if writer_or_last_reader {
635                     // Nobody left inside; release the "reader cloud" lock.
636                     (&self.access_lock).release();
637                 }
638             }
639         }
640     }
641
642     /// To be called inside of the write_downgrade block.
643     pub fn downgrade<'a>(&self, token: RWLockWriteMode<'a>)
644                          -> RWLockReadMode<'a> {
645         if !borrow::ref_eq(self, token.lock) {
646             fail!("Can't downgrade() with a different rwlock's write_mode!");
647         }
648         unsafe {
649             do task::unkillable {
650                 let state = &mut *self.state.get();
651                 assert!(!state.read_mode);
652                 state.read_mode = true;
653                 // If a reader attempts to enter at this point, both the
654                 // downgrader and reader will set the mode flag. This is fine.
655                 let old_count = state.read_count.fetch_add(1, atomics::Release);
656                 // If another reader was already blocking, we need to hand-off
657                 // the "reader cloud" access lock to them.
658                 if old_count != 0 {
659                     // Guaranteed not to let another writer in, because
660                     // another reader was holding the order_lock. Hence they
661                     // must be the one to get the access_lock (because all
662                     // access_locks are acquired with order_lock held). See
663                     // the comment in write_cond for more justification.
664                     (&self.access_lock).release();
665                 }
666             }
667         }
668         RWLockReadMode { lock: token.lock, token: NonCopyable::new() }
669     }
670 }
671
672 /// The "write permission" token used for rwlock.write_downgrade().
673 pub struct RWLockWriteMode<'self> { priv lock: &'self RWLock, priv token: NonCopyable }
674
675 /// The "read permission" token used for rwlock.write_downgrade().
676 pub struct RWLockReadMode<'self> { priv lock: &'self RWLock,
677                                    priv token: NonCopyable }
678
679 impl<'self> RWLockWriteMode<'self> {
680     /// Access the pre-downgrade rwlock in write mode.
681     pub fn write<U>(&self, blk: &fn() -> U) -> U { blk() }
682     /// Access the pre-downgrade rwlock in write mode with a condvar.
683     pub fn write_cond<U>(&self, blk: &fn(c: &Condvar) -> U) -> U {
684         // Need to make the condvar use the order lock when reacquiring the
685         // access lock. See comment in RWLock::write_cond for why.
686         blk(&Condvar { sem:        &self.lock.access_lock,
687                        order: Just(&self.lock.order_lock),
688                        token: NonCopyable::new() })
689     }
690 }
691
692 impl<'self> RWLockReadMode<'self> {
693     /// Access the post-downgrade rwlock in read mode.
694     pub fn read<U>(&self, blk: &fn() -> U) -> U { blk() }
695 }
696
697 /****************************************************************************
698  * Tests
699  ****************************************************************************/
700
701 #[cfg(test)]
702 mod tests {
703
704     use sync::*;
705
706     use std::cast;
707     use std::cell::Cell;
708     use std::comm;
709     use std::result;
710     use std::task;
711
712     /************************************************************************
713      * Semaphore tests
714      ************************************************************************/
715     #[test]
716     fn test_sem_acquire_release() {
717         let s = ~Semaphore::new(1);
718         s.acquire();
719         s.release();
720         s.acquire();
721     }
722     #[test]
723     fn test_sem_basic() {
724         let s = ~Semaphore::new(1);
725         do s.access { }
726     }
727     #[test]
728     fn test_sem_as_mutex() {
729         let s = ~Semaphore::new(1);
730         let s2 = ~s.clone();
731         do task::spawn || {
732             do s2.access {
733                 do 5.times { task::yield(); }
734             }
735         }
736         do s.access {
737             do 5.times { task::yield(); }
738         }
739     }
740     #[test]
741     fn test_sem_as_cvar() {
742         /* Child waits and parent signals */
743         let (p,c) = comm::stream();
744         let s = ~Semaphore::new(0);
745         let s2 = ~s.clone();
746         do task::spawn || {
747             s2.acquire();
748             c.send(());
749         }
750         do 5.times { task::yield(); }
751         s.release();
752         let _ = p.recv();
753
754         /* Parent waits and child signals */
755         let (p,c) = comm::stream();
756         let s = ~Semaphore::new(0);
757         let s2 = ~s.clone();
758         do task::spawn || {
759             do 5.times { task::yield(); }
760             s2.release();
761             let _ = p.recv();
762         }
763         s.acquire();
764         c.send(());
765     }
766     #[test]
767     fn test_sem_multi_resource() {
768         // Parent and child both get in the critical section at the same
769         // time, and shake hands.
770         let s = ~Semaphore::new(2);
771         let s2 = ~s.clone();
772         let (p1,c1) = comm::stream();
773         let (p2,c2) = comm::stream();
774         do task::spawn || {
775             do s2.access {
776                 let _ = p2.recv();
777                 c1.send(());
778             }
779         }
780         do s.access {
781             c2.send(());
782             let _ = p1.recv();
783         }
784     }
785     #[test]
786     fn test_sem_runtime_friendly_blocking() {
787         // Force the runtime to schedule two threads on the same sched_loop.
788         // When one blocks, it should schedule the other one.
789         do task::spawn_sched(task::SingleThreaded) {
790             let s = ~Semaphore::new(1);
791             let s2 = ~s.clone();
792             let (p,c) = comm::stream();
793             let child_data = Cell::new((s2, c));
794             do s.access {
795                 let (s2, c) = child_data.take();
796                 do task::spawn || {
797                     c.send(());
798                     do s2.access { }
799                     c.send(());
800                 }
801                 let _ = p.recv(); // wait for child to come alive
802                 do 5.times { task::yield(); } // let the child contend
803             }
804             let _ = p.recv(); // wait for child to be done
805         }
806     }
807     /************************************************************************
808      * Mutex tests
809      ************************************************************************/
810     #[test]
811     fn test_mutex_lock() {
812         // Unsafely achieve shared state, and do the textbook
813         // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
814         let (p,c) = comm::stream();
815         let m = ~Mutex::new();
816         let m2 = m.clone();
817         let mut sharedstate = ~0;
818         {
819             let ptr: *int = &*sharedstate;
820             do task::spawn || {
821                 let sharedstate: &mut int =
822                     unsafe { cast::transmute(ptr) };
823                 access_shared(sharedstate, m2, 10);
824                 c.send(());
825
826             }
827         }
828         {
829             access_shared(sharedstate, m, 10);
830             let _ = p.recv();
831
832             assert_eq!(*sharedstate, 20);
833         }
834
835         fn access_shared(sharedstate: &mut int, m: &Mutex, n: uint) {
836             do n.times {
837                 do m.lock {
838                     let oldval = *sharedstate;
839                     task::yield();
840                     *sharedstate = oldval + 1;
841                 }
842             }
843         }
844     }
845     #[test]
846     fn test_mutex_cond_wait() {
847         let m = ~Mutex::new();
848
849         // Child wakes up parent
850         do m.lock_cond |cond| {
851             let m2 = ~m.clone();
852             do task::spawn || {
853                 do m2.lock_cond |cond| {
854                     let woken = cond.signal();
855                     assert!(woken);
856                 }
857             }
858             cond.wait();
859         }
860         // Parent wakes up child
861         let (port,chan) = comm::stream();
862         let m3 = ~m.clone();
863         do task::spawn || {
864             do m3.lock_cond |cond| {
865                 chan.send(());
866                 cond.wait();
867                 chan.send(());
868             }
869         }
870         let _ = port.recv(); // Wait until child gets in the mutex
871         do m.lock_cond |cond| {
872             let woken = cond.signal();
873             assert!(woken);
874         }
875         let _ = port.recv(); // Wait until child wakes up
876     }
877     #[cfg(test)]
878     fn test_mutex_cond_broadcast_helper(num_waiters: uint) {
879         let m = ~Mutex::new();
880         let mut ports = ~[];
881
882         do num_waiters.times {
883             let mi = ~m.clone();
884             let (port, chan) = comm::stream();
885             ports.push(port);
886             do task::spawn || {
887                 do mi.lock_cond |cond| {
888                     chan.send(());
889                     cond.wait();
890                     chan.send(());
891                 }
892             }
893         }
894
895         // wait until all children get in the mutex
896         for port in ports.iter() { let _ = port.recv(); }
897         do m.lock_cond |cond| {
898             let num_woken = cond.broadcast();
899             assert_eq!(num_woken, num_waiters);
900         }
901         // wait until all children wake up
902         for port in ports.iter() { let _ = port.recv(); }
903     }
904     #[test]
905     fn test_mutex_cond_broadcast() {
906         test_mutex_cond_broadcast_helper(12);
907     }
908     #[test]
909     fn test_mutex_cond_broadcast_none() {
910         test_mutex_cond_broadcast_helper(0);
911     }
912     #[test]
913     fn test_mutex_cond_no_waiter() {
914         let m = ~Mutex::new();
915         let m2 = ~m.clone();
916         do task::try || {
917             do m.lock_cond |_x| { }
918         };
919         do m2.lock_cond |cond| {
920             assert!(!cond.signal());
921         }
922     }
923     #[test] #[ignore(cfg(windows))]
924     fn test_mutex_killed_simple() {
925         // Mutex must get automatically unlocked if failed/killed within.
926         let m = ~Mutex::new();
927         let m2 = ~m.clone();
928
929         let result: result::Result<(),()> = do task::try || {
930             do m2.lock {
931                 fail!();
932             }
933         };
934         assert!(result.is_err());
935         // child task must have finished by the time try returns
936         do m.lock { }
937     }
938     #[ignore(reason = "linked failure")]
939     #[test] #[ignore(cfg(windows))]
940     fn test_mutex_killed_cond() {
941         // Getting killed during cond wait must not corrupt the mutex while
942         // unwinding (e.g. double unlock).
943         let m = ~Mutex::new();
944         let m2 = ~m.clone();
945
946         let result: result::Result<(),()> = do task::try || {
947             let (p,c) = comm::stream();
948             do task::spawn || { // linked
949                 let _ = p.recv(); // wait for sibling to get in the mutex
950                 task::yield();
951                 fail!();
952             }
953             do m2.lock_cond |cond| {
954                 c.send(()); // tell sibling go ahead
955                 cond.wait(); // block forever
956             }
957         };
958         assert!(result.is_err());
959         // child task must have finished by the time try returns
960         do m.lock_cond |cond| {
961             let woken = cond.signal();
962             assert!(!woken);
963         }
964     }
965     #[ignore(reason = "linked failure")]
966     #[test] #[ignore(cfg(windows))]
967     fn test_mutex_killed_broadcast() {
968         use std::unstable::finally::Finally;
969
970         let m = ~Mutex::new();
971         let m2 = ~m.clone();
972         let (p,c) = comm::stream();
973
974         let result: result::Result<(),()> = do task::try || {
975             let mut sibling_convos = ~[];
976             do 2.times {
977                 let (p,c) = comm::stream();
978                 let c = Cell::new(c);
979                 sibling_convos.push(p);
980                 let mi = ~m2.clone();
981                 // spawn sibling task
982                 do task::spawn { // linked
983                     do mi.lock_cond |cond| {
984                         let c = c.take();
985                         c.send(()); // tell sibling to go ahead
986                         do (|| {
987                             cond.wait(); // block forever
988                         }).finally {
989                             error!("task unwinding and sending");
990                             c.send(());
991                             error!("task unwinding and done sending");
992                         }
993                     }
994                 }
995             }
996             for p in sibling_convos.iter() {
997                 let _ = p.recv(); // wait for sibling to get in the mutex
998             }
999             do m2.lock { }
1000             c.send(sibling_convos); // let parent wait on all children
1001             fail!();
1002         };
1003         assert!(result.is_err());
1004         // child task must have finished by the time try returns
1005         let r = p.recv();
1006         for p in r.iter() { p.recv(); } // wait on all its siblings
1007         do m.lock_cond |cond| {
1008             let woken = cond.broadcast();
1009             assert_eq!(woken, 0);
1010         }
1011     }
1012     #[test]
1013     fn test_mutex_cond_signal_on_0() {
1014         // Tests that signal_on(0) is equivalent to signal().
1015         let m = ~Mutex::new();
1016         do m.lock_cond |cond| {
1017             let m2 = ~m.clone();
1018             do task::spawn || {
1019                 do m2.lock_cond |cond| {
1020                     cond.signal_on(0);
1021                 }
1022             }
1023             cond.wait();
1024         }
1025     }
1026     #[test] #[ignore(cfg(windows))]
1027     fn test_mutex_different_conds() {
1028         let result = do task::try {
1029             let m = ~Mutex::new_with_condvars(2);
1030             let m2 = ~m.clone();
1031             let (p,c) = comm::stream();
1032             do task::spawn || {
1033                 do m2.lock_cond |cond| {
1034                     c.send(());
1035                     cond.wait_on(1);
1036                 }
1037             }
1038             let _ = p.recv();
1039             do m.lock_cond |cond| {
1040                 if !cond.signal_on(0) {
1041                     fail!(); // success; punt sibling awake.
1042                 }
1043             }
1044         };
1045         assert!(result.is_err());
1046     }
1047     #[test] #[ignore(cfg(windows))]
1048     fn test_mutex_no_condvars() {
1049         let result = do task::try {
1050             let m = ~Mutex::new_with_condvars(0);
1051             do m.lock_cond |cond| { cond.wait(); }
1052         };
1053         assert!(result.is_err());
1054         let result = do task::try {
1055             let m = ~Mutex::new_with_condvars(0);
1056             do m.lock_cond |cond| { cond.signal(); }
1057         };
1058         assert!(result.is_err());
1059         let result = do task::try {
1060             let m = ~Mutex::new_with_condvars(0);
1061             do m.lock_cond |cond| { cond.broadcast(); }
1062         };
1063         assert!(result.is_err());
1064     }
1065     /************************************************************************
1066      * Reader/writer lock tests
1067      ************************************************************************/
1068     #[cfg(test)]
1069     pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead }
1070     #[cfg(test)]
1071     fn lock_rwlock_in_mode(x: &RWLock, mode: RWLockMode, blk: &fn()) {
1072         match mode {
1073             Read => x.read(blk),
1074             Write => x.write(blk),
1075             Downgrade =>
1076                 do x.write_downgrade |mode| {
1077                     do mode.write { blk() };
1078                 },
1079             DowngradeRead =>
1080                 do x.write_downgrade |mode| {
1081                     let mode = x.downgrade(mode);
1082                     do mode.read { blk() };
1083                 },
1084         }
1085     }
1086     #[cfg(test)]
1087     fn test_rwlock_exclusion(x: ~RWLock,
1088                                  mode1: RWLockMode,
1089                                  mode2: RWLockMode) {
1090         // Test mutual exclusion between readers and writers. Just like the
1091         // mutex mutual exclusion test, a ways above.
1092         let (p,c) = comm::stream();
1093         let x2 = (*x).clone();
1094         let mut sharedstate = ~0;
1095         {
1096             let ptr: *int = &*sharedstate;
1097             do task::spawn || {
1098                 let sharedstate: &mut int =
1099                     unsafe { cast::transmute(ptr) };
1100                 access_shared(sharedstate, &x2, mode1, 10);
1101                 c.send(());
1102             }
1103         }
1104         {
1105             access_shared(sharedstate, x, mode2, 10);
1106             let _ = p.recv();
1107
1108             assert_eq!(*sharedstate, 20);
1109         }
1110
1111         fn access_shared(sharedstate: &mut int, x: &RWLock, mode: RWLockMode,
1112                          n: uint) {
1113             do n.times {
1114                 do lock_rwlock_in_mode(x, mode) {
1115                     let oldval = *sharedstate;
1116                     task::yield();
1117                     *sharedstate = oldval + 1;
1118                 }
1119             }
1120         }
1121     }
1122     #[test]
1123     fn test_rwlock_readers_wont_modify_the_data() {
1124         test_rwlock_exclusion(~RWLock::new(), Read, Write);
1125         test_rwlock_exclusion(~RWLock::new(), Write, Read);
1126         test_rwlock_exclusion(~RWLock::new(), Read, Downgrade);
1127         test_rwlock_exclusion(~RWLock::new(), Downgrade, Read);
1128     }
1129     #[test]
1130     fn test_rwlock_writers_and_writers() {
1131         test_rwlock_exclusion(~RWLock::new(), Write, Write);
1132         test_rwlock_exclusion(~RWLock::new(), Write, Downgrade);
1133         test_rwlock_exclusion(~RWLock::new(), Downgrade, Write);
1134         test_rwlock_exclusion(~RWLock::new(), Downgrade, Downgrade);
1135     }
1136     #[cfg(test)]
1137     fn test_rwlock_handshake(x: ~RWLock,
1138                                  mode1: RWLockMode,
1139                                  mode2: RWLockMode,
1140                                  make_mode2_go_first: bool) {
1141         // Much like sem_multi_resource.
1142         let x2 = (*x).clone();
1143         let (p1,c1) = comm::stream();
1144         let (p2,c2) = comm::stream();
1145         do task::spawn || {
1146             if !make_mode2_go_first {
1147                 let _ = p2.recv(); // parent sends to us once it locks, or ...
1148             }
1149             do lock_rwlock_in_mode(&x2, mode2) {
1150                 if make_mode2_go_first {
1151                     c1.send(()); // ... we send to it once we lock
1152                 }
1153                 let _ = p2.recv();
1154                 c1.send(());
1155             }
1156         }
1157         if make_mode2_go_first {
1158             let _ = p1.recv(); // child sends to us once it locks, or ...
1159         }
1160         do lock_rwlock_in_mode(x, mode1) {
1161             if !make_mode2_go_first {
1162                 c2.send(()); // ... we send to it once we lock
1163             }
1164             c2.send(());
1165             let _ = p1.recv();
1166         }
1167     }
1168     #[test]
1169     fn test_rwlock_readers_and_readers() {
1170         test_rwlock_handshake(~RWLock::new(), Read, Read, false);
1171         // The downgrader needs to get in before the reader gets in, otherwise
1172         // they cannot end up reading at the same time.
1173         test_rwlock_handshake(~RWLock::new(), DowngradeRead, Read, false);
1174         test_rwlock_handshake(~RWLock::new(), Read, DowngradeRead, true);
1175         // Two downgrade_reads can never both end up reading at the same time.
1176     }
1177     #[test]
1178     fn test_rwlock_downgrade_unlock() {
1179         // Tests that downgrade can unlock the lock in both modes
1180         let x = ~RWLock::new();
1181         do lock_rwlock_in_mode(x, Downgrade) { }
1182         test_rwlock_handshake(x, Read, Read, false);
1183         let y = ~RWLock::new();
1184         do lock_rwlock_in_mode(y, DowngradeRead) { }
1185         test_rwlock_exclusion(y, Write, Write);
1186     }
1187     #[test]
1188     fn test_rwlock_read_recursive() {
1189         let x = ~RWLock::new();
1190         do x.read { do x.read { } }
1191     }
1192     #[test]
1193     fn test_rwlock_cond_wait() {
1194         // As test_mutex_cond_wait above.
1195         let x = ~RWLock::new();
1196
1197         // Child wakes up parent
1198         do x.write_cond |cond| {
1199             let x2 = (*x).clone();
1200             do task::spawn || {
1201                 do x2.write_cond |cond| {
1202                     let woken = cond.signal();
1203                     assert!(woken);
1204                 }
1205             }
1206             cond.wait();
1207         }
1208         // Parent wakes up child
1209         let (port,chan) = comm::stream();
1210         let x3 = (*x).clone();
1211         do task::spawn || {
1212             do x3.write_cond |cond| {
1213                 chan.send(());
1214                 cond.wait();
1215                 chan.send(());
1216             }
1217         }
1218         let _ = port.recv(); // Wait until child gets in the rwlock
1219         do x.read { } // Must be able to get in as a reader in the meantime
1220         do x.write_cond |cond| { // Or as another writer
1221             let woken = cond.signal();
1222             assert!(woken);
1223         }
1224         let _ = port.recv(); // Wait until child wakes up
1225         do x.read { } // Just for good measure
1226     }
1227     #[cfg(test)]
1228     fn test_rwlock_cond_broadcast_helper(num_waiters: uint,
1229                                              dg1: bool,
1230                                              dg2: bool) {
1231         // Much like the mutex broadcast test. Downgrade-enabled.
1232         fn lock_cond(x: &RWLock, downgrade: bool, blk: &fn(c: &Condvar)) {
1233             if downgrade {
1234                 do x.write_downgrade |mode| {
1235                     do mode.write_cond |c| { blk(c) }
1236                 }
1237             } else {
1238                 do x.write_cond |c| { blk(c) }
1239             }
1240         }
1241         let x = ~RWLock::new();
1242         let mut ports = ~[];
1243
1244         do num_waiters.times {
1245             let xi = (*x).clone();
1246             let (port, chan) = comm::stream();
1247             ports.push(port);
1248             do task::spawn || {
1249                 do lock_cond(&xi, dg1) |cond| {
1250                     chan.send(());
1251                     cond.wait();
1252                     chan.send(());
1253                 }
1254             }
1255         }
1256
1257         // wait until all children get in the mutex
1258         for port in ports.iter() { let _ = port.recv(); }
1259         do lock_cond(x, dg2) |cond| {
1260             let num_woken = cond.broadcast();
1261             assert_eq!(num_woken, num_waiters);
1262         }
1263         // wait until all children wake up
1264         for port in ports.iter() { let _ = port.recv(); }
1265     }
1266     #[test]
1267     fn test_rwlock_cond_broadcast() {
1268         test_rwlock_cond_broadcast_helper(0, true, true);
1269         test_rwlock_cond_broadcast_helper(0, true, false);
1270         test_rwlock_cond_broadcast_helper(0, false, true);
1271         test_rwlock_cond_broadcast_helper(0, false, false);
1272         test_rwlock_cond_broadcast_helper(12, true, true);
1273         test_rwlock_cond_broadcast_helper(12, true, false);
1274         test_rwlock_cond_broadcast_helper(12, false, true);
1275         test_rwlock_cond_broadcast_helper(12, false, false);
1276     }
1277     #[cfg(test)] #[ignore(cfg(windows))]
1278     fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) {
1279         // Mutex must get automatically unlocked if failed/killed within.
1280         let x = ~RWLock::new();
1281         let x2 = (*x).clone();
1282
1283         let result: result::Result<(),()> = do task::try || {
1284             do lock_rwlock_in_mode(&x2, mode1) {
1285                 fail!();
1286             }
1287         };
1288         assert!(result.is_err());
1289         // child task must have finished by the time try returns
1290         do lock_rwlock_in_mode(x, mode2) { }
1291     }
1292     #[test] #[ignore(cfg(windows))]
1293     fn test_rwlock_reader_killed_writer() {
1294         rwlock_kill_helper(Read, Write);
1295     }
1296     #[test] #[ignore(cfg(windows))]
1297     fn test_rwlock_writer_killed_reader() {
1298         rwlock_kill_helper(Write,Read );
1299     }
1300     #[test] #[ignore(cfg(windows))]
1301     fn test_rwlock_reader_killed_reader() {
1302         rwlock_kill_helper(Read, Read );
1303     }
1304     #[test] #[ignore(cfg(windows))]
1305     fn test_rwlock_writer_killed_writer() {
1306         rwlock_kill_helper(Write,Write);
1307     }
1308     #[test] #[ignore(cfg(windows))]
1309     fn test_rwlock_kill_downgrader() {
1310         rwlock_kill_helper(Downgrade, Read);
1311         rwlock_kill_helper(Read, Downgrade);
1312         rwlock_kill_helper(Downgrade, Write);
1313         rwlock_kill_helper(Write, Downgrade);
1314         rwlock_kill_helper(DowngradeRead, Read);
1315         rwlock_kill_helper(Read, DowngradeRead);
1316         rwlock_kill_helper(DowngradeRead, Write);
1317         rwlock_kill_helper(Write, DowngradeRead);
1318         rwlock_kill_helper(DowngradeRead, Downgrade);
1319         rwlock_kill_helper(DowngradeRead, Downgrade);
1320         rwlock_kill_helper(Downgrade, DowngradeRead);
1321         rwlock_kill_helper(Downgrade, DowngradeRead);
1322     }
1323     #[test] #[should_fail] #[ignore(cfg(windows))]
1324     fn test_rwlock_downgrade_cant_swap() {
1325         // Tests that you can't downgrade with a different rwlock's token.
1326         let x = ~RWLock::new();
1327         let y = ~RWLock::new();
1328         do x.write_downgrade |xwrite| {
1329             let mut xopt = Some(xwrite);
1330             do y.write_downgrade |_ywrite| {
1331                 y.downgrade(xopt.take_unwrap());
1332                 error!("oops, y.downgrade(x) should have failed!");
1333             }
1334         }
1335     }
1336 }