]> git.lizzy.rs Git - rust.git/blob - src/libsync/raw.rs
rollup merge of #18407 : thestinger/arena
[rust.git] / src / libsync / raw.rs
1 // Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Raw concurrency primitives you know and love.
12 //!
13 //! These primitives are not recommended for general use, but are provided for
14 //! flavorful use-cases. It is recommended to use the types at the top of the
15 //! `sync` crate which wrap values directly and provide safer abstractions for
16 //! containing data.
17
18 use core::prelude::*;
19
20 use core::atomic;
21 use core::finally::Finally;
22 use core::kinds::marker;
23 use core::mem;
24 use core::cell::UnsafeCell;
25 use collections::{Vec, MutableSeq};
26
27 use mutex;
28 use comm::{Receiver, Sender, channel};
29
30 /****************************************************************************
31  * Internals
32  ****************************************************************************/
33
34 // Each waiting task receives on one of these.
35 type WaitEnd = Receiver<()>;
36 type SignalEnd = Sender<()>;
37 // A doubly-ended queue of waiting tasks.
38 struct WaitQueue {
39     head: Receiver<SignalEnd>,
40     tail: Sender<SignalEnd>,
41 }
42
43 impl WaitQueue {
44     fn new() -> WaitQueue {
45         let (block_tail, block_head) = channel();
46         WaitQueue { head: block_head, tail: block_tail }
47     }
48
49     // Signals one live task from the queue.
50     fn signal(&self) -> bool {
51         match self.head.try_recv() {
52             Ok(ch) => {
53                 // Send a wakeup signal. If the waiter was killed, its port will
54                 // have closed. Keep trying until we get a live task.
55                 if ch.send_opt(()).is_ok() {
56                     true
57                 } else {
58                     self.signal()
59                 }
60             }
61             _ => false
62         }
63     }
64
65     fn broadcast(&self) -> uint {
66         let mut count = 0;
67         loop {
68             match self.head.try_recv() {
69                 Ok(ch) => {
70                     if ch.send_opt(()).is_ok() {
71                         count += 1;
72                     }
73                 }
74                 _ => break
75             }
76         }
77         count
78     }
79
80     fn wait_end(&self) -> WaitEnd {
81         let (signal_end, wait_end) = channel();
82         self.tail.send(signal_end);
83         wait_end
84     }
85 }
86
87 // The building-block used to make semaphores, mutexes, and rwlocks.
88 struct Sem<Q> {
89     lock: mutex::Mutex,
90     // n.b, we need Sem to be `Sync`, but the WaitQueue type is not send/share
91     //      (for good reason). We have an internal invariant on this semaphore,
92     //      however, that the queue is never accessed outside of a locked
93     //      context.
94     inner: UnsafeCell<SemInner<Q>>
95 }
96
97 struct SemInner<Q> {
98     count: int,
99     waiters: WaitQueue,
100     // Can be either unit or another waitqueue. Some sems shouldn't come with
101     // a condition variable attached, others should.
102     blocked: Q,
103 }
104
105 #[must_use]
106 struct SemGuard<'a, Q:'a> {
107     sem: &'a Sem<Q>,
108 }
109
110 impl<Q: Send> Sem<Q> {
111     fn new(count: int, q: Q) -> Sem<Q> {
112         assert!(count >= 0,
113                 "semaphores cannot be initialized with negative values");
114         Sem {
115             lock: mutex::Mutex::new(),
116             inner: UnsafeCell::new(SemInner {
117                 waiters: WaitQueue::new(),
118                 count: count,
119                 blocked: q,
120             })
121         }
122     }
123
124     unsafe fn with(&self, f: |&mut SemInner<Q>|) {
125         let _g = self.lock.lock();
126         // This &mut is safe because, due to the lock, we are the only one who can touch the data
127         f(&mut *self.inner.get())
128     }
129
130     pub fn acquire(&self) {
131         unsafe {
132             let mut waiter_nobe = None;
133             self.with(|state| {
134                 state.count -= 1;
135                 if state.count < 0 {
136                     // Create waiter nobe, enqueue ourself, and tell
137                     // outer scope we need to block.
138                     waiter_nobe = Some(state.waiters.wait_end());
139                 }
140             });
141             // Uncomment if you wish to test for sem races. Not
142             // valgrind-friendly.
143             /* for _ in range(0u, 1000) { task::deschedule(); } */
144             // Need to wait outside the exclusive.
145             if waiter_nobe.is_some() {
146                 let _ = waiter_nobe.unwrap().recv();
147             }
148         }
149     }
150
151     pub fn release(&self) {
152         unsafe {
153             self.with(|state| {
154                 state.count += 1;
155                 if state.count <= 0 {
156                     state.waiters.signal();
157                 }
158             })
159         }
160     }
161
162     pub fn access<'a>(&'a self) -> SemGuard<'a, Q> {
163         self.acquire();
164         SemGuard { sem: self }
165     }
166 }
167
168 #[unsafe_destructor]
169 impl<'a, Q: Send> Drop for SemGuard<'a, Q> {
170     fn drop(&mut self) {
171         self.sem.release();
172     }
173 }
174
175 impl Sem<Vec<WaitQueue>> {
176     fn new_and_signal(count: int, num_condvars: uint) -> Sem<Vec<WaitQueue>> {
177         let mut queues = Vec::new();
178         for _ in range(0, num_condvars) { queues.push(WaitQueue::new()); }
179         Sem::new(count, queues)
180     }
181
182     // The only other places that condvars get built are rwlock.write_cond()
183     // and rwlock_write_mode.
184     pub fn access_cond<'a>(&'a self) -> SemCondGuard<'a> {
185         SemCondGuard {
186             guard: self.access(),
187             cvar: Condvar { sem: self, order: Nothing, nocopy: marker::NoCopy },
188         }
189     }
190 }
191
192 // FIXME(#3598): Want to use an Option down below, but we need a custom enum
193 // that's not polymorphic to get around the fact that lifetimes are invariant
194 // inside of type parameters.
195 enum ReacquireOrderLock<'a> {
196     Nothing, // c.c
197     Just(&'a Semaphore),
198 }
199
200 /// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
201 pub struct Condvar<'a> {
202     // The 'Sem' object associated with this condvar. This is the one that's
203     // atomically-unlocked-and-descheduled upon and reacquired during wakeup.
204     sem: &'a Sem<Vec<WaitQueue> >,
205     // This is (can be) an extra semaphore which is held around the reacquire
206     // operation on the first one. This is only used in cvars associated with
207     // rwlocks, and is needed to ensure that, when a downgrader is trying to
208     // hand off the access lock (which would be the first field, here), a 2nd
209     // writer waking up from a cvar wait can't race with a reader to steal it,
210     // See the comment in write_cond for more detail.
211     order: ReacquireOrderLock<'a>,
212     // Make sure condvars are non-copyable.
213     nocopy: marker::NoCopy,
214 }
215
216 impl<'a> Condvar<'a> {
217     /// Atomically drop the associated lock, and block until a signal is sent.
218     ///
219     /// # Panics
220     ///
221     /// A task which is killed while waiting on a condition variable will wake
222     /// up, panic, and unlock the associated lock as it unwinds.
223     pub fn wait(&self) { self.wait_on(0) }
224
225     /// As wait(), but can specify which of multiple condition variables to
226     /// wait on. Only a signal_on() or broadcast_on() with the same condvar_id
227     /// will wake this thread.
228     ///
229     /// The associated lock must have been initialised with an appropriate
230     /// number of condvars. The condvar_id must be between 0 and num_condvars-1
231     /// or else this call will panic.
232     ///
233     /// wait() is equivalent to wait_on(0).
234     pub fn wait_on(&self, condvar_id: uint) {
235         let mut wait_end = None;
236         let mut out_of_bounds = None;
237         // Release lock, 'atomically' enqueuing ourselves in so doing.
238         unsafe {
239             self.sem.with(|state| {
240                 if condvar_id < state.blocked.len() {
241                     // Drop the lock.
242                     state.count += 1;
243                     if state.count <= 0 {
244                         state.waiters.signal();
245                     }
246                     // Create waiter nobe, and enqueue ourself to
247                     // be woken up by a signaller.
248                     wait_end = Some(state.blocked[condvar_id].wait_end());
249                 } else {
250                     out_of_bounds = Some(state.blocked.len());
251                 }
252             })
253         }
254
255         // If deschedule checks start getting inserted anywhere, we can be
256         // killed before or after enqueueing.
257         check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()", || {
258             // Unconditionally "block". (Might not actually block if a
259             // signaller already sent -- I mean 'unconditionally' in contrast
260             // with acquire().)
261             (|| {
262                 let _ = wait_end.take().unwrap().recv();
263             }).finally(|| {
264                 // Reacquire the condvar.
265                 match self.order {
266                     Just(lock) => {
267                         let _g = lock.access();
268                         self.sem.acquire();
269                     }
270                     Nothing => self.sem.acquire(),
271                 }
272             })
273         })
274     }
275
276     /// Wake up a blocked task. Returns false if there was no blocked task.
277     pub fn signal(&self) -> bool { self.signal_on(0) }
278
279     /// As signal, but with a specified condvar_id. See wait_on.
280     pub fn signal_on(&self, condvar_id: uint) -> bool {
281         unsafe {
282             let mut out_of_bounds = None;
283             let mut result = false;
284             self.sem.with(|state| {
285                 if condvar_id < state.blocked.len() {
286                     result = state.blocked[condvar_id].signal();
287                 } else {
288                     out_of_bounds = Some(state.blocked.len());
289                 }
290             });
291             check_cvar_bounds(out_of_bounds,
292                               condvar_id,
293                               "cond.signal_on()",
294                               || result)
295         }
296     }
297
298     /// Wake up all blocked tasks. Returns the number of tasks woken.
299     pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
300
301     /// As broadcast, but with a specified condvar_id. See wait_on.
302     pub fn broadcast_on(&self, condvar_id: uint) -> uint {
303         let mut out_of_bounds = None;
304         let mut queue = None;
305         unsafe {
306             self.sem.with(|state| {
307                 if condvar_id < state.blocked.len() {
308                     // To avoid :broadcast_heavy, we make a new waitqueue,
309                     // swap it out with the old one, and broadcast on the
310                     // old one outside of the little-lock.
311                     queue = Some(mem::replace(state.blocked.get_mut(condvar_id),
312                                               WaitQueue::new()));
313                 } else {
314                     out_of_bounds = Some(state.blocked.len());
315                 }
316             });
317             check_cvar_bounds(out_of_bounds,
318                               condvar_id,
319                               "cond.signal_on()",
320                               || {
321                 queue.take().unwrap().broadcast()
322             })
323         }
324     }
325 }
326
327 // Checks whether a condvar ID was out of bounds, and panics if so, or does
328 // something else next on success.
329 #[inline]
330 fn check_cvar_bounds<U>(
331                      out_of_bounds: Option<uint>,
332                      id: uint,
333                      act: &str,
334                      blk: || -> U)
335                      -> U {
336     match out_of_bounds {
337         Some(0) =>
338             panic!("{} with illegal ID {} - this lock has no condvars!", act, id),
339         Some(length) =>
340             panic!("{} with illegal ID {} - ID must be less than {}", act, id, length),
341         None => blk()
342     }
343 }
344
345 #[must_use]
346 struct SemCondGuard<'a> {
347     guard: SemGuard<'a, Vec<WaitQueue>>,
348     cvar: Condvar<'a>,
349 }
350
351 /****************************************************************************
352  * Semaphores
353  ****************************************************************************/
354
355 /// A counting, blocking, bounded-waiting semaphore.
356 pub struct Semaphore {
357     sem: Sem<()>,
358 }
359
360 /// An RAII guard used to represent an acquired resource to a semaphore. When
361 /// dropped, this value will release the resource back to the semaphore.
362 #[must_use]
363 pub struct SemaphoreGuard<'a> {
364     _guard: SemGuard<'a, ()>,
365 }
366
367 impl Semaphore {
368     /// Create a new semaphore with the specified count.
369     ///
370     /// # Panics
371     ///
372     /// This function will panic if `count` is negative.
373     pub fn new(count: int) -> Semaphore {
374         Semaphore { sem: Sem::new(count, ()) }
375     }
376
377     /// Acquire a resource represented by the semaphore. Blocks if necessary
378     /// until resource(s) become available.
379     pub fn acquire(&self) { self.sem.acquire() }
380
381     /// Release a held resource represented by the semaphore. Wakes a blocked
382     /// contending task, if any exist. Won't block the caller.
383     pub fn release(&self) { self.sem.release() }
384
385     /// Acquire a resource of this semaphore, returning an RAII guard which will
386     /// release the resource when dropped.
387     pub fn access<'a>(&'a self) -> SemaphoreGuard<'a> {
388         SemaphoreGuard { _guard: self.sem.access() }
389     }
390 }
391
392 /****************************************************************************
393  * Mutexes
394  ****************************************************************************/
395
396 /// A blocking, bounded-waiting, mutual exclusion lock with an associated
397 /// FIFO condition variable.
398 ///
399 /// # Panics
400 ///
401 /// A task which panicks while holding a mutex will unlock the mutex as it
402 /// unwinds.
403 pub struct Mutex {
404     sem: Sem<Vec<WaitQueue>>,
405 }
406
407 /// An RAII structure which is used to gain access to a mutex's condition
408 /// variable. Additionally, when a value of this type is dropped, the
409 /// corresponding mutex is also unlocked.
410 #[must_use]
411 pub struct MutexGuard<'a> {
412     _guard: SemGuard<'a, Vec<WaitQueue>>,
413     /// Inner condition variable which is connected to the outer mutex, and can
414     /// be used for atomic-unlock-and-deschedule.
415     pub cond: Condvar<'a>,
416 }
417
418 impl Mutex {
419     /// Create a new mutex, with one associated condvar.
420     pub fn new() -> Mutex { Mutex::new_with_condvars(1) }
421
422     /// Create a new mutex, with a specified number of associated condvars. This
423     /// will allow calling wait_on/signal_on/broadcast_on with condvar IDs
424     /// between 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be
425     /// allowed but any operations on the condvar will panic.)
426     pub fn new_with_condvars(num_condvars: uint) -> Mutex {
427         Mutex { sem: Sem::new_and_signal(1, num_condvars) }
428     }
429
430     /// Acquires ownership of this mutex, returning an RAII guard which will
431     /// unlock the mutex when dropped. The associated condition variable can
432     /// also be accessed through the returned guard.
433     pub fn lock<'a>(&'a self) -> MutexGuard<'a> {
434         let SemCondGuard { guard, cvar } = self.sem.access_cond();
435         MutexGuard { _guard: guard, cond: cvar }
436     }
437 }
438
439 /****************************************************************************
440  * Reader-writer locks
441  ****************************************************************************/
442
443 // NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem
444
445 /// A blocking, no-starvation, reader-writer lock with an associated condvar.
446 ///
447 /// # Panics
448 ///
449 /// A task which panics while holding an rwlock will unlock the rwlock as it
450 /// unwinds.
451 pub struct RWLock {
452     order_lock:  Semaphore,
453     access_lock: Sem<Vec<WaitQueue>>,
454
455     // The only way the count flag is ever accessed is with xadd. Since it is
456     // a read-modify-write operation, multiple xadds on different cores will
457     // always be consistent with respect to each other, so a monotonic/relaxed
458     // consistency ordering suffices (i.e., no extra barriers are needed).
459     //
460     // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use
461     // acquire/release orderings superfluously. Change these someday.
462     read_count: atomic::AtomicUint,
463 }
464
465 /// An RAII helper which is created by acquiring a read lock on an RWLock. When
466 /// dropped, this will unlock the RWLock.
467 #[must_use]
468 pub struct RWLockReadGuard<'a> {
469     lock: &'a RWLock,
470 }
471
472 /// An RAII helper which is created by acquiring a write lock on an RWLock. When
473 /// dropped, this will unlock the RWLock.
474 ///
475 /// A value of this type can also be consumed to downgrade to a read-only lock.
476 #[must_use]
477 pub struct RWLockWriteGuard<'a> {
478     lock: &'a RWLock,
479     /// Inner condition variable that is connected to the write-mode of the
480     /// outer rwlock.
481     pub cond: Condvar<'a>,
482 }
483
484 impl RWLock {
485     /// Create a new rwlock, with one associated condvar.
486     pub fn new() -> RWLock { RWLock::new_with_condvars(1) }
487
488     /// Create a new rwlock, with a specified number of associated condvars.
489     /// Similar to mutex_with_condvars.
490     pub fn new_with_condvars(num_condvars: uint) -> RWLock {
491         RWLock {
492             order_lock: Semaphore::new(1),
493             access_lock: Sem::new_and_signal(1, num_condvars),
494             read_count: atomic::AtomicUint::new(0),
495         }
496     }
497
498     /// Acquires a read-lock, returning an RAII guard that will unlock the lock
499     /// when dropped. Calls to 'read' from other tasks may run concurrently with
500     /// this one.
501     pub fn read<'a>(&'a self) -> RWLockReadGuard<'a> {
502         let _guard = self.order_lock.access();
503         let old_count = self.read_count.fetch_add(1, atomic::Acquire);
504         if old_count == 0 {
505             self.access_lock.acquire();
506         }
507         RWLockReadGuard { lock: self }
508     }
509
510     /// Acquire a write-lock, returning an RAII guard that will unlock the lock
511     /// when dropped. No calls to 'read' or 'write' from other tasks will run
512     /// concurrently with this one.
513     ///
514     /// You can also downgrade a write to a read by calling the `downgrade`
515     /// method on the returned guard. Additionally, the guard will contain a
516     /// `Condvar` attached to this lock.
517     ///
518     /// # Example
519     ///
520     /// ```rust
521     /// use sync::raw::RWLock;
522     ///
523     /// let lock = RWLock::new();
524     /// let write = lock.write();
525     /// // ... exclusive access ...
526     /// let read = write.downgrade();
527     /// // ... shared access ...
528     /// drop(read);
529     /// ```
530     pub fn write<'a>(&'a self) -> RWLockWriteGuard<'a> {
531         let _g = self.order_lock.access();
532         self.access_lock.acquire();
533
534         // It's important to thread our order lock into the condvar, so that
535         // when a cond.wait() wakes up, it uses it while reacquiring the
536         // access lock. If we permitted a waking-up writer to "cut in line",
537         // there could arise a subtle race when a downgrader attempts to hand
538         // off the reader cloud lock to a waiting reader. This race is tested
539         // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like:
540         // T1 (writer)              T2 (downgrader)             T3 (reader)
541         // [in cond.wait()]
542         //                          [locks for writing]
543         //                          [holds access_lock]
544         // [is signalled, perhaps by
545         //  downgrader or a 4th thread]
546         // tries to lock access(!)
547         //                                                      lock order_lock
548         //                                                      xadd read_count[0->1]
549         //                                                      tries to lock access
550         //                          [downgrade]
551         //                          xadd read_count[1->2]
552         //                          unlock access
553         // Since T1 contended on the access lock before T3 did, it will steal
554         // the lock handoff. Adding order_lock in the condvar reacquire path
555         // solves this because T1 will hold order_lock while waiting on access,
556         // which will cause T3 to have to wait until T1 finishes its write,
557         // which can't happen until T2 finishes the downgrade-read entirely.
558         // The astute reader will also note that making waking writers use the
559         // order_lock is better for not starving readers.
560         RWLockWriteGuard {
561             lock: self,
562             cond: Condvar {
563                 sem: &self.access_lock,
564                 order: Just(&self.order_lock),
565                 nocopy: marker::NoCopy,
566             }
567         }
568     }
569 }
570
571 impl<'a> RWLockWriteGuard<'a> {
572     /// Consumes this write lock and converts it into a read lock.
573     pub fn downgrade(self) -> RWLockReadGuard<'a> {
574         let lock = self.lock;
575         // Don't run the destructor of the write guard, we're in charge of
576         // things from now on
577         unsafe { mem::forget(self) }
578
579         let old_count = lock.read_count.fetch_add(1, atomic::Release);
580         // If another reader was already blocking, we need to hand-off
581         // the "reader cloud" access lock to them.
582         if old_count != 0 {
583             // Guaranteed not to let another writer in, because
584             // another reader was holding the order_lock. Hence they
585             // must be the one to get the access_lock (because all
586             // access_locks are acquired with order_lock held). See
587             // the comment in write_cond for more justification.
588             lock.access_lock.release();
589         }
590         RWLockReadGuard { lock: lock }
591     }
592 }
593
594 #[unsafe_destructor]
595 impl<'a> Drop for RWLockWriteGuard<'a> {
596     fn drop(&mut self) {
597         self.lock.access_lock.release();
598     }
599 }
600
601 #[unsafe_destructor]
602 impl<'a> Drop for RWLockReadGuard<'a> {
603     fn drop(&mut self) {
604         let old_count = self.lock.read_count.fetch_sub(1, atomic::Release);
605         assert!(old_count > 0);
606         if old_count == 1 {
607             // Note: this release used to be outside of a locked access
608             // to exclusive-protected state. If this code is ever
609             // converted back to such (instead of using atomic ops),
610             // this access MUST NOT go inside the exclusive access.
611             self.lock.access_lock.release();
612         }
613     }
614 }
615
616 /****************************************************************************
617  * Tests
618  ****************************************************************************/
619
620 #[cfg(test)]
621 mod tests {
622     use std::prelude::*;
623
624     use Arc;
625     use super::{Semaphore, Mutex, RWLock, Condvar};
626
627     use std::mem;
628     use std::result;
629     use std::task;
630
631     /************************************************************************
632      * Semaphore tests
633      ************************************************************************/
634     #[test]
635     fn test_sem_acquire_release() {
636         let s = Semaphore::new(1);
637         s.acquire();
638         s.release();
639         s.acquire();
640     }
641     #[test]
642     fn test_sem_basic() {
643         let s = Semaphore::new(1);
644         let _g = s.access();
645     }
646     #[test]
647     #[should_fail]
648     fn test_sem_basic2() {
649         Semaphore::new(-1);
650     }
651     #[test]
652     fn test_sem_as_mutex() {
653         let s = Arc::new(Semaphore::new(1));
654         let s2 = s.clone();
655         task::spawn(proc() {
656             let _g = s2.access();
657             for _ in range(0u, 5) { task::deschedule(); }
658         });
659         let _g = s.access();
660         for _ in range(0u, 5) { task::deschedule(); }
661     }
662     #[test]
663     fn test_sem_as_cvar() {
664         /* Child waits and parent signals */
665         let (tx, rx) = channel();
666         let s = Arc::new(Semaphore::new(0));
667         let s2 = s.clone();
668         task::spawn(proc() {
669             s2.acquire();
670             tx.send(());
671         });
672         for _ in range(0u, 5) { task::deschedule(); }
673         s.release();
674         let _ = rx.recv();
675
676         /* Parent waits and child signals */
677         let (tx, rx) = channel();
678         let s = Arc::new(Semaphore::new(0));
679         let s2 = s.clone();
680         task::spawn(proc() {
681             for _ in range(0u, 5) { task::deschedule(); }
682             s2.release();
683             let _ = rx.recv();
684         });
685         s.acquire();
686         tx.send(());
687     }
688     #[test]
689     fn test_sem_multi_resource() {
690         // Parent and child both get in the critical section at the same
691         // time, and shake hands.
692         let s = Arc::new(Semaphore::new(2));
693         let s2 = s.clone();
694         let (tx1, rx1) = channel();
695         let (tx2, rx2) = channel();
696         task::spawn(proc() {
697             let _g = s2.access();
698             let _ = rx2.recv();
699             tx1.send(());
700         });
701         let _g = s.access();
702         tx2.send(());
703         let _ = rx1.recv();
704     }
705     #[test]
706     fn test_sem_runtime_friendly_blocking() {
707         // Force the runtime to schedule two threads on the same sched_loop.
708         // When one blocks, it should schedule the other one.
709         let s = Arc::new(Semaphore::new(1));
710         let s2 = s.clone();
711         let (tx, rx) = channel();
712         {
713             let _g = s.access();
714             task::spawn(proc() {
715                 tx.send(());
716                 drop(s2.access());
717                 tx.send(());
718             });
719             rx.recv(); // wait for child to come alive
720             for _ in range(0u, 5) { task::deschedule(); } // let the child contend
721         }
722         rx.recv(); // wait for child to be done
723     }
724     /************************************************************************
725      * Mutex tests
726      ************************************************************************/
727     #[test]
728     fn test_mutex_lock() {
729         // Unsafely achieve shared state, and do the textbook
730         // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
731         let (tx, rx) = channel();
732         let m = Arc::new(Mutex::new());
733         let m2 = m.clone();
734         let mut sharedstate = box 0;
735         {
736             let ptr: *mut int = &mut *sharedstate;
737             task::spawn(proc() {
738                 access_shared(ptr, &m2, 10);
739                 tx.send(());
740             });
741         }
742         {
743             access_shared(&mut *sharedstate, &m, 10);
744             let _ = rx.recv();
745
746             assert_eq!(*sharedstate, 20);
747         }
748
749         fn access_shared(sharedstate: *mut int, m: &Arc<Mutex>, n: uint) {
750             for _ in range(0u, n) {
751                 let _g = m.lock();
752                 let oldval = unsafe { *sharedstate };
753                 task::deschedule();
754                 unsafe { *sharedstate = oldval + 1; }
755             }
756         }
757     }
758     #[test]
759     fn test_mutex_cond_wait() {
760         let m = Arc::new(Mutex::new());
761
762         // Child wakes up parent
763         {
764             let lock = m.lock();
765             let m2 = m.clone();
766             task::spawn(proc() {
767                 let lock = m2.lock();
768                 let woken = lock.cond.signal();
769                 assert!(woken);
770             });
771             lock.cond.wait();
772         }
773         // Parent wakes up child
774         let (tx, rx) = channel();
775         let m3 = m.clone();
776         task::spawn(proc() {
777             let lock = m3.lock();
778             tx.send(());
779             lock.cond.wait();
780             tx.send(());
781         });
782         rx.recv(); // Wait until child gets in the mutex
783         {
784             let lock = m.lock();
785             let woken = lock.cond.signal();
786             assert!(woken);
787         }
788         rx.recv(); // Wait until child wakes up
789     }
790
791     fn test_mutex_cond_broadcast_helper(num_waiters: uint) {
792         let m = Arc::new(Mutex::new());
793         let mut rxs = Vec::new();
794
795         for _ in range(0u, num_waiters) {
796             let mi = m.clone();
797             let (tx, rx) = channel();
798             rxs.push(rx);
799             task::spawn(proc() {
800                 let lock = mi.lock();
801                 tx.send(());
802                 lock.cond.wait();
803                 tx.send(());
804             });
805         }
806
807         // wait until all children get in the mutex
808         for rx in rxs.iter_mut() { rx.recv(); }
809         {
810             let lock = m.lock();
811             let num_woken = lock.cond.broadcast();
812             assert_eq!(num_woken, num_waiters);
813         }
814         // wait until all children wake up
815         for rx in rxs.iter_mut() { rx.recv(); }
816     }
817     #[test]
818     fn test_mutex_cond_broadcast() {
819         test_mutex_cond_broadcast_helper(12);
820     }
821     #[test]
822     fn test_mutex_cond_broadcast_none() {
823         test_mutex_cond_broadcast_helper(0);
824     }
825     #[test]
826     fn test_mutex_cond_no_waiter() {
827         let m = Arc::new(Mutex::new());
828         let m2 = m.clone();
829         let _ = task::try(proc() {
830             drop(m.lock());
831         });
832         let lock = m2.lock();
833         assert!(!lock.cond.signal());
834     }
835     #[test]
836     fn test_mutex_killed_simple() {
837         use std::any::Any;
838
839         // Mutex must get automatically unlocked if panicked/killed within.
840         let m = Arc::new(Mutex::new());
841         let m2 = m.clone();
842
843         let result: result::Result<(), Box<Any + Send>> = task::try(proc() {
844             let _lock = m2.lock();
845             panic!();
846         });
847         assert!(result.is_err());
848         // child task must have finished by the time try returns
849         drop(m.lock());
850     }
851     #[test]
852     fn test_mutex_cond_signal_on_0() {
853         // Tests that signal_on(0) is equivalent to signal().
854         let m = Arc::new(Mutex::new());
855         let lock = m.lock();
856         let m2 = m.clone();
857         task::spawn(proc() {
858             let lock = m2.lock();
859             lock.cond.signal_on(0);
860         });
861         lock.cond.wait();
862     }
863     #[test]
864     fn test_mutex_no_condvars() {
865         let result = task::try(proc() {
866             let m = Mutex::new_with_condvars(0);
867             m.lock().cond.wait();
868         });
869         assert!(result.is_err());
870         let result = task::try(proc() {
871             let m = Mutex::new_with_condvars(0);
872             m.lock().cond.signal();
873         });
874         assert!(result.is_err());
875         let result = task::try(proc() {
876             let m = Mutex::new_with_condvars(0);
877             m.lock().cond.broadcast();
878         });
879         assert!(result.is_err());
880     }
881     /************************************************************************
882      * Reader/writer lock tests
883      ************************************************************************/
884     #[cfg(test)]
885     pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead }
886     #[cfg(test)]
887     fn lock_rwlock_in_mode(x: &Arc<RWLock>, mode: RWLockMode, blk: ||) {
888         match mode {
889             Read => { let _g = x.read(); blk() }
890             Write => { let _g = x.write(); blk() }
891             Downgrade => { let _g = x.write(); blk() }
892             DowngradeRead => { let _g = x.write().downgrade(); blk() }
893         }
894     }
895     #[cfg(test)]
896     fn test_rwlock_exclusion(x: Arc<RWLock>,
897                              mode1: RWLockMode,
898                              mode2: RWLockMode) {
899         // Test mutual exclusion between readers and writers. Just like the
900         // mutex mutual exclusion test, a ways above.
901         let (tx, rx) = channel();
902         let x2 = x.clone();
903         let mut sharedstate = box 0;
904         {
905             let ptr: *const int = &*sharedstate;
906             task::spawn(proc() {
907                 let sharedstate: &mut int =
908                     unsafe { mem::transmute(ptr) };
909                 access_shared(sharedstate, &x2, mode1, 10);
910                 tx.send(());
911             });
912         }
913         {
914             access_shared(&mut *sharedstate, &x, mode2, 10);
915             let _ = rx.recv();
916
917             assert_eq!(*sharedstate, 20);
918         }
919
920         fn access_shared(sharedstate: &mut int, x: &Arc<RWLock>,
921                          mode: RWLockMode, n: uint) {
922             for _ in range(0u, n) {
923                 lock_rwlock_in_mode(x, mode, || {
924                     let oldval = *sharedstate;
925                     task::deschedule();
926                     *sharedstate = oldval + 1;
927                 })
928             }
929         }
930     }
931     #[test]
932     fn test_rwlock_readers_wont_modify_the_data() {
933         test_rwlock_exclusion(Arc::new(RWLock::new()), Read, Write);
934         test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Read);
935         test_rwlock_exclusion(Arc::new(RWLock::new()), Read, Downgrade);
936         test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Read);
937         test_rwlock_exclusion(Arc::new(RWLock::new()), Write, DowngradeRead);
938         test_rwlock_exclusion(Arc::new(RWLock::new()), DowngradeRead, Write);
939     }
940     #[test]
941     fn test_rwlock_writers_and_writers() {
942         test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Write);
943         test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Downgrade);
944         test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Write);
945         test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Downgrade);
946     }
947     #[cfg(test)]
948     fn test_rwlock_handshake(x: Arc<RWLock>,
949                              mode1: RWLockMode,
950                              mode2: RWLockMode,
951                              make_mode2_go_first: bool) {
952         // Much like sem_multi_resource.
953         let x2 = x.clone();
954         let (tx1, rx1) = channel();
955         let (tx2, rx2) = channel();
956         task::spawn(proc() {
957             if !make_mode2_go_first {
958                 rx2.recv(); // parent sends to us once it locks, or ...
959             }
960             lock_rwlock_in_mode(&x2, mode2, || {
961                 if make_mode2_go_first {
962                     tx1.send(()); // ... we send to it once we lock
963                 }
964                 rx2.recv();
965                 tx1.send(());
966             })
967         });
968         if make_mode2_go_first {
969             rx1.recv(); // child sends to us once it locks, or ...
970         }
971         lock_rwlock_in_mode(&x, mode1, || {
972             if !make_mode2_go_first {
973                 tx2.send(()); // ... we send to it once we lock
974             }
975             tx2.send(());
976             rx1.recv();
977         })
978     }
979     #[test]
980     fn test_rwlock_readers_and_readers() {
981         test_rwlock_handshake(Arc::new(RWLock::new()), Read, Read, false);
982         // The downgrader needs to get in before the reader gets in, otherwise
983         // they cannot end up reading at the same time.
984         test_rwlock_handshake(Arc::new(RWLock::new()), DowngradeRead, Read, false);
985         test_rwlock_handshake(Arc::new(RWLock::new()), Read, DowngradeRead, true);
986         // Two downgrade_reads can never both end up reading at the same time.
987     }
988     #[test]
989     fn test_rwlock_downgrade_unlock() {
990         // Tests that downgrade can unlock the lock in both modes
991         let x = Arc::new(RWLock::new());
992         lock_rwlock_in_mode(&x, Downgrade, || { });
993         test_rwlock_handshake(x, Read, Read, false);
994         let y = Arc::new(RWLock::new());
995         lock_rwlock_in_mode(&y, DowngradeRead, || { });
996         test_rwlock_exclusion(y, Write, Write);
997     }
998     #[test]
999     fn test_rwlock_read_recursive() {
1000         let x = RWLock::new();
1001         let _g1 = x.read();
1002         let _g2 = x.read();
1003     }
1004     #[test]
1005     fn test_rwlock_cond_wait() {
1006         // As test_mutex_cond_wait above.
1007         let x = Arc::new(RWLock::new());
1008
1009         // Child wakes up parent
1010         {
1011             let lock = x.write();
1012             let x2 = x.clone();
1013             task::spawn(proc() {
1014                 let lock = x2.write();
1015                 assert!(lock.cond.signal());
1016             });
1017             lock.cond.wait();
1018         }
1019         // Parent wakes up child
1020         let (tx, rx) = channel();
1021         let x3 = x.clone();
1022         task::spawn(proc() {
1023             let lock = x3.write();
1024             tx.send(());
1025             lock.cond.wait();
1026             tx.send(());
1027         });
1028         rx.recv(); // Wait until child gets in the rwlock
1029         drop(x.read()); // Must be able to get in as a reader
1030         {
1031             let x = x.write();
1032             assert!(x.cond.signal());
1033         }
1034         rx.recv(); // Wait until child wakes up
1035         drop(x.read()); // Just for good measure
1036     }
1037     #[cfg(test)]
1038     fn test_rwlock_cond_broadcast_helper(num_waiters: uint) {
1039         // Much like the mutex broadcast test. Downgrade-enabled.
1040         fn lock_cond(x: &Arc<RWLock>, blk: |c: &Condvar|) {
1041             let lock = x.write();
1042             blk(&lock.cond);
1043         }
1044
1045         let x = Arc::new(RWLock::new());
1046         let mut rxs = Vec::new();
1047
1048         for _ in range(0u, num_waiters) {
1049             let xi = x.clone();
1050             let (tx, rx) = channel();
1051             rxs.push(rx);
1052             task::spawn(proc() {
1053                 lock_cond(&xi, |cond| {
1054                     tx.send(());
1055                     cond.wait();
1056                     tx.send(());
1057                 })
1058             });
1059         }
1060
1061         // wait until all children get in the mutex
1062         for rx in rxs.iter_mut() { let _ = rx.recv(); }
1063         lock_cond(&x, |cond| {
1064             let num_woken = cond.broadcast();
1065             assert_eq!(num_woken, num_waiters);
1066         });
1067         // wait until all children wake up
1068         for rx in rxs.iter_mut() { let _ = rx.recv(); }
1069     }
1070     #[test]
1071     fn test_rwlock_cond_broadcast() {
1072         test_rwlock_cond_broadcast_helper(0);
1073         test_rwlock_cond_broadcast_helper(12);
1074     }
1075     #[cfg(test)]
1076     fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) {
1077         use std::any::Any;
1078
1079         // Mutex must get automatically unlocked if panicked/killed within.
1080         let x = Arc::new(RWLock::new());
1081         let x2 = x.clone();
1082
1083         let result: result::Result<(), Box<Any + Send>> = task::try(proc() {
1084             lock_rwlock_in_mode(&x2, mode1, || {
1085                 panic!();
1086             })
1087         });
1088         assert!(result.is_err());
1089         // child task must have finished by the time try returns
1090         lock_rwlock_in_mode(&x, mode2, || { })
1091     }
1092     #[test]
1093     fn test_rwlock_reader_killed_writer() {
1094         rwlock_kill_helper(Read, Write);
1095     }
1096     #[test]
1097     fn test_rwlock_writer_killed_reader() {
1098         rwlock_kill_helper(Write, Read);
1099     }
1100     #[test]
1101     fn test_rwlock_reader_killed_reader() {
1102         rwlock_kill_helper(Read, Read);
1103     }
1104     #[test]
1105     fn test_rwlock_writer_killed_writer() {
1106         rwlock_kill_helper(Write, Write);
1107     }
1108     #[test]
1109     fn test_rwlock_kill_downgrader() {
1110         rwlock_kill_helper(Downgrade, Read);
1111         rwlock_kill_helper(Read, Downgrade);
1112         rwlock_kill_helper(Downgrade, Write);
1113         rwlock_kill_helper(Write, Downgrade);
1114         rwlock_kill_helper(DowngradeRead, Read);
1115         rwlock_kill_helper(Read, DowngradeRead);
1116         rwlock_kill_helper(DowngradeRead, Write);
1117         rwlock_kill_helper(Write, DowngradeRead);
1118         rwlock_kill_helper(DowngradeRead, Downgrade);
1119         rwlock_kill_helper(DowngradeRead, Downgrade);
1120         rwlock_kill_helper(Downgrade, DowngradeRead);
1121         rwlock_kill_helper(Downgrade, DowngradeRead);
1122     }
1123 }