]> git.lizzy.rs Git - rust.git/blob - src/libsync/raw.rs
Rename all raw pointers as necessary
[rust.git] / src / libsync / raw.rs
1 // Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! Raw concurrency primitives you know and love.
12 //!
13 //! These primitives are not recommended for general use, but are provided for
14 //! flavorful use-cases. It is recommended to use the types at the top of the
15 //! `sync` crate which wrap values directly and provide safer abstractions for
16 //! containing data.
17
18 use core::prelude::*;
19
20 use core::atomics;
21 use core::finally::Finally;
22 use core::kinds::marker;
23 use core::mem;
24 use core::ty::Unsafe;
25 use collections::Vec;
26
27 use mutex;
28 use comm::{Receiver, Sender, channel};
29
30 /****************************************************************************
31  * Internals
32  ****************************************************************************/
33
34 // Each waiting task receives on one of these.
35 type WaitEnd = Receiver<()>;
36 type SignalEnd = Sender<()>;
37 // A doubly-ended queue of waiting tasks.
38 struct WaitQueue {
39     head: Receiver<SignalEnd>,
40     tail: Sender<SignalEnd>,
41 }
42
43 impl WaitQueue {
44     fn new() -> WaitQueue {
45         let (block_tail, block_head) = channel();
46         WaitQueue { head: block_head, tail: block_tail }
47     }
48
49     // Signals one live task from the queue.
50     fn signal(&self) -> bool {
51         match self.head.try_recv() {
52             Ok(ch) => {
53                 // Send a wakeup signal. If the waiter was killed, its port will
54                 // have closed. Keep trying until we get a live task.
55                 if ch.send_opt(()).is_ok() {
56                     true
57                 } else {
58                     self.signal()
59                 }
60             }
61             _ => false
62         }
63     }
64
65     fn broadcast(&self) -> uint {
66         let mut count = 0;
67         loop {
68             match self.head.try_recv() {
69                 Ok(ch) => {
70                     if ch.send_opt(()).is_ok() {
71                         count += 1;
72                     }
73                 }
74                 _ => break
75             }
76         }
77         count
78     }
79
80     fn wait_end(&self) -> WaitEnd {
81         let (signal_end, wait_end) = channel();
82         self.tail.send(signal_end);
83         wait_end
84     }
85 }
86
87 // The building-block used to make semaphores, mutexes, and rwlocks.
88 struct Sem<Q> {
89     lock: mutex::Mutex,
90     // n.b, we need Sem to be `Share`, but the WaitQueue type is not send/share
91     //      (for good reason). We have an internal invariant on this semaphore,
92     //      however, that the queue is never accessed outside of a locked
93     //      context.
94     inner: Unsafe<SemInner<Q>>
95 }
96
97 struct SemInner<Q> {
98     count: int,
99     waiters: WaitQueue,
100     // Can be either unit or another waitqueue. Some sems shouldn't come with
101     // a condition variable attached, others should.
102     blocked: Q,
103 }
104
105 #[must_use]
106 struct SemGuard<'a, Q> {
107     sem: &'a Sem<Q>,
108 }
109
110 impl<Q: Send> Sem<Q> {
111     fn new(count: int, q: Q) -> Sem<Q> {
112         Sem {
113             lock: mutex::Mutex::new(),
114             inner: Unsafe::new(SemInner {
115                 waiters: WaitQueue::new(),
116                 count: count,
117                 blocked: q,
118             })
119         }
120     }
121
122     unsafe fn with(&self, f: |&mut SemInner<Q>|) {
123         let _g = self.lock.lock();
124         // This &mut is safe because, due to the lock, we are the only one who can touch the data
125         f(&mut *self.inner.get())
126     }
127
128     pub fn acquire(&self) {
129         unsafe {
130             let mut waiter_nobe = None;
131             self.with(|state| {
132                 state.count -= 1;
133                 if state.count < 0 {
134                     // Create waiter nobe, enqueue ourself, and tell
135                     // outer scope we need to block.
136                     waiter_nobe = Some(state.waiters.wait_end());
137                 }
138             });
139             // Uncomment if you wish to test for sem races. Not
140             // valgrind-friendly.
141             /* for _ in range(0u, 1000) { task::deschedule(); } */
142             // Need to wait outside the exclusive.
143             if waiter_nobe.is_some() {
144                 let _ = waiter_nobe.unwrap().recv();
145             }
146         }
147     }
148
149     pub fn release(&self) {
150         unsafe {
151             self.with(|state| {
152                 state.count += 1;
153                 if state.count <= 0 {
154                     state.waiters.signal();
155                 }
156             })
157         }
158     }
159
160     pub fn access<'a>(&'a self) -> SemGuard<'a, Q> {
161         self.acquire();
162         SemGuard { sem: self }
163     }
164 }
165
166 #[unsafe_destructor]
167 impl<'a, Q: Send> Drop for SemGuard<'a, Q> {
168     fn drop(&mut self) {
169         self.sem.release();
170     }
171 }
172
173 impl Sem<Vec<WaitQueue>> {
174     fn new_and_signal(count: int, num_condvars: uint) -> Sem<Vec<WaitQueue>> {
175         let mut queues = Vec::new();
176         for _ in range(0, num_condvars) { queues.push(WaitQueue::new()); }
177         Sem::new(count, queues)
178     }
179
180     // The only other places that condvars get built are rwlock.write_cond()
181     // and rwlock_write_mode.
182     pub fn access_cond<'a>(&'a self) -> SemCondGuard<'a> {
183         SemCondGuard {
184             guard: self.access(),
185             cvar: Condvar { sem: self, order: Nothing, nocopy: marker::NoCopy },
186         }
187     }
188 }
189
190 // FIXME(#3598): Want to use an Option down below, but we need a custom enum
191 // that's not polymorphic to get around the fact that lifetimes are invariant
192 // inside of type parameters.
193 enum ReacquireOrderLock<'a> {
194     Nothing, // c.c
195     Just(&'a Semaphore),
196 }
197
198 /// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
199 pub struct Condvar<'a> {
200     // The 'Sem' object associated with this condvar. This is the one that's
201     // atomically-unlocked-and-descheduled upon and reacquired during wakeup.
202     sem: &'a Sem<Vec<WaitQueue> >,
203     // This is (can be) an extra semaphore which is held around the reacquire
204     // operation on the first one. This is only used in cvars associated with
205     // rwlocks, and is needed to ensure that, when a downgrader is trying to
206     // hand off the access lock (which would be the first field, here), a 2nd
207     // writer waking up from a cvar wait can't race with a reader to steal it,
208     // See the comment in write_cond for more detail.
209     order: ReacquireOrderLock<'a>,
210     // Make sure condvars are non-copyable.
211     nocopy: marker::NoCopy,
212 }
213
214 impl<'a> Condvar<'a> {
215     /// Atomically drop the associated lock, and block until a signal is sent.
216     ///
217     /// # Failure
218     ///
219     /// A task which is killed while waiting on a condition variable will wake
220     /// up, fail, and unlock the associated lock as it unwinds.
221     pub fn wait(&self) { self.wait_on(0) }
222
223     /// As wait(), but can specify which of multiple condition variables to
224     /// wait on. Only a signal_on() or broadcast_on() with the same condvar_id
225     /// will wake this thread.
226     ///
227     /// The associated lock must have been initialised with an appropriate
228     /// number of condvars. The condvar_id must be between 0 and num_condvars-1
229     /// or else this call will fail.
230     ///
231     /// wait() is equivalent to wait_on(0).
232     pub fn wait_on(&self, condvar_id: uint) {
233         let mut wait_end = None;
234         let mut out_of_bounds = None;
235         // Release lock, 'atomically' enqueuing ourselves in so doing.
236         unsafe {
237             self.sem.with(|state| {
238                 if condvar_id < state.blocked.len() {
239                     // Drop the lock.
240                     state.count += 1;
241                     if state.count <= 0 {
242                         state.waiters.signal();
243                     }
244                     // Create waiter nobe, and enqueue ourself to
245                     // be woken up by a signaller.
246                     wait_end = Some(state.blocked.get(condvar_id).wait_end());
247                 } else {
248                     out_of_bounds = Some(state.blocked.len());
249                 }
250             })
251         }
252
253         // If deschedule checks start getting inserted anywhere, we can be
254         // killed before or after enqueueing.
255         check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()", || {
256             // Unconditionally "block". (Might not actually block if a
257             // signaller already sent -- I mean 'unconditionally' in contrast
258             // with acquire().)
259             (|| {
260                 let _ = wait_end.take_unwrap().recv();
261             }).finally(|| {
262                 // Reacquire the condvar.
263                 match self.order {
264                     Just(lock) => {
265                         let _g = lock.access();
266                         self.sem.acquire();
267                     }
268                     Nothing => self.sem.acquire(),
269                 }
270             })
271         })
272     }
273
274     /// Wake up a blocked task. Returns false if there was no blocked task.
275     pub fn signal(&self) -> bool { self.signal_on(0) }
276
277     /// As signal, but with a specified condvar_id. See wait_on.
278     pub fn signal_on(&self, condvar_id: uint) -> bool {
279         unsafe {
280             let mut out_of_bounds = None;
281             let mut result = false;
282             self.sem.with(|state| {
283                 if condvar_id < state.blocked.len() {
284                     result = state.blocked.get(condvar_id).signal();
285                 } else {
286                     out_of_bounds = Some(state.blocked.len());
287                 }
288             });
289             check_cvar_bounds(out_of_bounds,
290                               condvar_id,
291                               "cond.signal_on()",
292                               || result)
293         }
294     }
295
296     /// Wake up all blocked tasks. Returns the number of tasks woken.
297     pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
298
299     /// As broadcast, but with a specified condvar_id. See wait_on.
300     pub fn broadcast_on(&self, condvar_id: uint) -> uint {
301         let mut out_of_bounds = None;
302         let mut queue = None;
303         unsafe {
304             self.sem.with(|state| {
305                 if condvar_id < state.blocked.len() {
306                     // To avoid :broadcast_heavy, we make a new waitqueue,
307                     // swap it out with the old one, and broadcast on the
308                     // old one outside of the little-lock.
309                     queue = Some(mem::replace(state.blocked.get_mut(condvar_id),
310                                               WaitQueue::new()));
311                 } else {
312                     out_of_bounds = Some(state.blocked.len());
313                 }
314             });
315             check_cvar_bounds(out_of_bounds,
316                               condvar_id,
317                               "cond.signal_on()",
318                               || {
319                 queue.take_unwrap().broadcast()
320             })
321         }
322     }
323 }
324
325 // Checks whether a condvar ID was out of bounds, and fails if so, or does
326 // something else next on success.
327 #[inline]
328 fn check_cvar_bounds<U>(
329                      out_of_bounds: Option<uint>,
330                      id: uint,
331                      act: &str,
332                      blk: || -> U)
333                      -> U {
334     match out_of_bounds {
335         Some(0) =>
336             fail!("{} with illegal ID {} - this lock has no condvars!", act, id),
337         Some(length) =>
338             fail!("{} with illegal ID {} - ID must be less than {}", act, id, length),
339         None => blk()
340     }
341 }
342
343 #[must_use]
344 struct SemCondGuard<'a> {
345     guard: SemGuard<'a, Vec<WaitQueue>>,
346     cvar: Condvar<'a>,
347 }
348
349 /****************************************************************************
350  * Semaphores
351  ****************************************************************************/
352
353 /// A counting, blocking, bounded-waiting semaphore.
354 pub struct Semaphore {
355     sem: Sem<()>,
356 }
357
358 /// An RAII guard used to represent an acquired resource to a semaphore. When
359 /// dropped, this value will release the resource back to the semaphore.
360 #[must_use]
361 pub struct SemaphoreGuard<'a> {
362     _guard: SemGuard<'a, ()>,
363 }
364
365 impl Semaphore {
366     /// Create a new semaphore with the specified count.
367     pub fn new(count: int) -> Semaphore {
368         Semaphore { sem: Sem::new(count, ()) }
369     }
370
371     /// Acquire a resource represented by the semaphore. Blocks if necessary
372     /// until resource(s) become available.
373     pub fn acquire(&self) { self.sem.acquire() }
374
375     /// Release a held resource represented by the semaphore. Wakes a blocked
376     /// contending task, if any exist. Won't block the caller.
377     pub fn release(&self) { self.sem.release() }
378
379     /// Acquire a resource of this semaphore, returning an RAII guard which will
380     /// release the resource when dropped.
381     pub fn access<'a>(&'a self) -> SemaphoreGuard<'a> {
382         SemaphoreGuard { _guard: self.sem.access() }
383     }
384 }
385
386 /****************************************************************************
387  * Mutexes
388  ****************************************************************************/
389
390 /// A blocking, bounded-waiting, mutual exclusion lock with an associated
391 /// FIFO condition variable.
392 ///
393 /// # Failure
394 /// A task which fails while holding a mutex will unlock the mutex as it
395 /// unwinds.
396 pub struct Mutex {
397     sem: Sem<Vec<WaitQueue>>,
398 }
399
400 /// An RAII structure which is used to gain access to a mutex's condition
401 /// variable. Additionally, when a value of this type is dropped, the
402 /// corresponding mutex is also unlocked.
403 #[must_use]
404 pub struct MutexGuard<'a> {
405     _guard: SemGuard<'a, Vec<WaitQueue>>,
406     /// Inner condition variable which is connected to the outer mutex, and can
407     /// be used for atomic-unlock-and-deschedule.
408     pub cond: Condvar<'a>,
409 }
410
411 impl Mutex {
412     /// Create a new mutex, with one associated condvar.
413     pub fn new() -> Mutex { Mutex::new_with_condvars(1) }
414
415     /// Create a new mutex, with a specified number of associated condvars. This
416     /// will allow calling wait_on/signal_on/broadcast_on with condvar IDs
417     /// between 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be
418     /// allowed but any operations on the condvar will fail.)
419     pub fn new_with_condvars(num_condvars: uint) -> Mutex {
420         Mutex { sem: Sem::new_and_signal(1, num_condvars) }
421     }
422
423     /// Acquires ownership of this mutex, returning an RAII guard which will
424     /// unlock the mutex when dropped. The associated condition variable can
425     /// also be accessed through the returned guard.
426     pub fn lock<'a>(&'a self) -> MutexGuard<'a> {
427         let SemCondGuard { guard, cvar } = self.sem.access_cond();
428         MutexGuard { _guard: guard, cond: cvar }
429     }
430 }
431
432 /****************************************************************************
433  * Reader-writer locks
434  ****************************************************************************/
435
436 // NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem
437
438 /// A blocking, no-starvation, reader-writer lock with an associated condvar.
439 ///
440 /// # Failure
441 ///
442 /// A task which fails while holding an rwlock will unlock the rwlock as it
443 /// unwinds.
444 pub struct RWLock {
445     order_lock:  Semaphore,
446     access_lock: Sem<Vec<WaitQueue>>,
447
448     // The only way the count flag is ever accessed is with xadd. Since it is
449     // a read-modify-write operation, multiple xadds on different cores will
450     // always be consistent with respect to each other, so a monotonic/relaxed
451     // consistency ordering suffices (i.e., no extra barriers are needed).
452     //
453     // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use
454     // acquire/release orderings superfluously. Change these someday.
455     read_count: atomics::AtomicUint,
456 }
457
458 /// An RAII helper which is created by acquiring a read lock on an RWLock. When
459 /// dropped, this will unlock the RWLock.
460 #[must_use]
461 pub struct RWLockReadGuard<'a> {
462     lock: &'a RWLock,
463 }
464
465 /// An RAII helper which is created by acquiring a write lock on an RWLock. When
466 /// dropped, this will unlock the RWLock.
467 ///
468 /// A value of this type can also be consumed to downgrade to a read-only lock.
469 #[must_use]
470 pub struct RWLockWriteGuard<'a> {
471     lock: &'a RWLock,
472     /// Inner condition variable that is connected to the write-mode of the
473     /// outer rwlock.
474     pub cond: Condvar<'a>,
475 }
476
477 impl RWLock {
478     /// Create a new rwlock, with one associated condvar.
479     pub fn new() -> RWLock { RWLock::new_with_condvars(1) }
480
481     /// Create a new rwlock, with a specified number of associated condvars.
482     /// Similar to mutex_with_condvars.
483     pub fn new_with_condvars(num_condvars: uint) -> RWLock {
484         RWLock {
485             order_lock: Semaphore::new(1),
486             access_lock: Sem::new_and_signal(1, num_condvars),
487             read_count: atomics::AtomicUint::new(0),
488         }
489     }
490
491     /// Acquires a read-lock, returning an RAII guard that will unlock the lock
492     /// when dropped. Calls to 'read' from other tasks may run concurrently with
493     /// this one.
494     pub fn read<'a>(&'a self) -> RWLockReadGuard<'a> {
495         let _guard = self.order_lock.access();
496         let old_count = self.read_count.fetch_add(1, atomics::Acquire);
497         if old_count == 0 {
498             self.access_lock.acquire();
499         }
500         RWLockReadGuard { lock: self }
501     }
502
503     /// Acquire a write-lock, returning an RAII guard that will unlock the lock
504     /// when dropped. No calls to 'read' or 'write' from other tasks will run
505     /// concurrently with this one.
506     ///
507     /// You can also downgrade a write to a read by calling the `downgrade`
508     /// method on the returned guard. Additionally, the guard will contain a
509     /// `Condvar` attached to this lock.
510     ///
511     /// # Example
512     ///
513     /// ```rust
514     /// use sync::raw::RWLock;
515     ///
516     /// let lock = RWLock::new();
517     /// let write = lock.write();
518     /// // ... exclusive access ...
519     /// let read = write.downgrade();
520     /// // ... shared access ...
521     /// drop(read);
522     /// ```
523     pub fn write<'a>(&'a self) -> RWLockWriteGuard<'a> {
524         let _g = self.order_lock.access();
525         self.access_lock.acquire();
526
527         // It's important to thread our order lock into the condvar, so that
528         // when a cond.wait() wakes up, it uses it while reacquiring the
529         // access lock. If we permitted a waking-up writer to "cut in line",
530         // there could arise a subtle race when a downgrader attempts to hand
531         // off the reader cloud lock to a waiting reader. This race is tested
532         // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like:
533         // T1 (writer)              T2 (downgrader)             T3 (reader)
534         // [in cond.wait()]
535         //                          [locks for writing]
536         //                          [holds access_lock]
537         // [is signalled, perhaps by
538         //  downgrader or a 4th thread]
539         // tries to lock access(!)
540         //                                                      lock order_lock
541         //                                                      xadd read_count[0->1]
542         //                                                      tries to lock access
543         //                          [downgrade]
544         //                          xadd read_count[1->2]
545         //                          unlock access
546         // Since T1 contended on the access lock before T3 did, it will steal
547         // the lock handoff. Adding order_lock in the condvar reacquire path
548         // solves this because T1 will hold order_lock while waiting on access,
549         // which will cause T3 to have to wait until T1 finishes its write,
550         // which can't happen until T2 finishes the downgrade-read entirely.
551         // The astute reader will also note that making waking writers use the
552         // order_lock is better for not starving readers.
553         RWLockWriteGuard {
554             lock: self,
555             cond: Condvar {
556                 sem: &self.access_lock,
557                 order: Just(&self.order_lock),
558                 nocopy: marker::NoCopy,
559             }
560         }
561     }
562 }
563
564 impl<'a> RWLockWriteGuard<'a> {
565     /// Consumes this write lock and converts it into a read lock.
566     pub fn downgrade(self) -> RWLockReadGuard<'a> {
567         let lock = self.lock;
568         // Don't run the destructor of the write guard, we're in charge of
569         // things from now on
570         unsafe { mem::forget(self) }
571
572         let old_count = lock.read_count.fetch_add(1, atomics::Release);
573         // If another reader was already blocking, we need to hand-off
574         // the "reader cloud" access lock to them.
575         if old_count != 0 {
576             // Guaranteed not to let another writer in, because
577             // another reader was holding the order_lock. Hence they
578             // must be the one to get the access_lock (because all
579             // access_locks are acquired with order_lock held). See
580             // the comment in write_cond for more justification.
581             lock.access_lock.release();
582         }
583         RWLockReadGuard { lock: lock }
584     }
585 }
586
587 #[unsafe_destructor]
588 impl<'a> Drop for RWLockWriteGuard<'a> {
589     fn drop(&mut self) {
590         self.lock.access_lock.release();
591     }
592 }
593
594 #[unsafe_destructor]
595 impl<'a> Drop for RWLockReadGuard<'a> {
596     fn drop(&mut self) {
597         let old_count = self.lock.read_count.fetch_sub(1, atomics::Release);
598         assert!(old_count > 0);
599         if old_count == 1 {
600             // Note: this release used to be outside of a locked access
601             // to exclusive-protected state. If this code is ever
602             // converted back to such (instead of using atomic ops),
603             // this access MUST NOT go inside the exclusive access.
604             self.lock.access_lock.release();
605         }
606     }
607 }
608
609 /****************************************************************************
610  * Tests
611  ****************************************************************************/
612
613 #[cfg(test)]
614 mod tests {
615     use std::prelude::*;
616
617     use Arc;
618     use super::{Semaphore, Mutex, RWLock, Condvar};
619
620     use std::mem;
621     use std::result;
622     use std::task;
623
624     /************************************************************************
625      * Semaphore tests
626      ************************************************************************/
627     #[test]
628     fn test_sem_acquire_release() {
629         let s = Semaphore::new(1);
630         s.acquire();
631         s.release();
632         s.acquire();
633     }
634     #[test]
635     fn test_sem_basic() {
636         let s = Semaphore::new(1);
637         let _g = s.access();
638     }
639     #[test]
640     fn test_sem_as_mutex() {
641         let s = Arc::new(Semaphore::new(1));
642         let s2 = s.clone();
643         task::spawn(proc() {
644             let _g = s2.access();
645             for _ in range(0u, 5) { task::deschedule(); }
646         });
647         let _g = s.access();
648         for _ in range(0u, 5) { task::deschedule(); }
649     }
650     #[test]
651     fn test_sem_as_cvar() {
652         /* Child waits and parent signals */
653         let (tx, rx) = channel();
654         let s = Arc::new(Semaphore::new(0));
655         let s2 = s.clone();
656         task::spawn(proc() {
657             s2.acquire();
658             tx.send(());
659         });
660         for _ in range(0u, 5) { task::deschedule(); }
661         s.release();
662         let _ = rx.recv();
663
664         /* Parent waits and child signals */
665         let (tx, rx) = channel();
666         let s = Arc::new(Semaphore::new(0));
667         let s2 = s.clone();
668         task::spawn(proc() {
669             for _ in range(0u, 5) { task::deschedule(); }
670             s2.release();
671             let _ = rx.recv();
672         });
673         s.acquire();
674         tx.send(());
675     }
676     #[test]
677     fn test_sem_multi_resource() {
678         // Parent and child both get in the critical section at the same
679         // time, and shake hands.
680         let s = Arc::new(Semaphore::new(2));
681         let s2 = s.clone();
682         let (tx1, rx1) = channel();
683         let (tx2, rx2) = channel();
684         task::spawn(proc() {
685             let _g = s2.access();
686             let _ = rx2.recv();
687             tx1.send(());
688         });
689         let _g = s.access();
690         tx2.send(());
691         let _ = rx1.recv();
692     }
693     #[test]
694     fn test_sem_runtime_friendly_blocking() {
695         // Force the runtime to schedule two threads on the same sched_loop.
696         // When one blocks, it should schedule the other one.
697         let s = Arc::new(Semaphore::new(1));
698         let s2 = s.clone();
699         let (tx, rx) = channel();
700         {
701             let _g = s.access();
702             task::spawn(proc() {
703                 tx.send(());
704                 drop(s2.access());
705                 tx.send(());
706             });
707             rx.recv(); // wait for child to come alive
708             for _ in range(0u, 5) { task::deschedule(); } // let the child contend
709         }
710         rx.recv(); // wait for child to be done
711     }
712     /************************************************************************
713      * Mutex tests
714      ************************************************************************/
715     #[test]
716     fn test_mutex_lock() {
717         // Unsafely achieve shared state, and do the textbook
718         // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
719         let (tx, rx) = channel();
720         let m = Arc::new(Mutex::new());
721         let m2 = m.clone();
722         let mut sharedstate = box 0;
723         {
724             let ptr: *mut int = &mut *sharedstate;
725             task::spawn(proc() {
726                 access_shared(ptr, &m2, 10);
727                 tx.send(());
728             });
729         }
730         {
731             access_shared(&mut *sharedstate, &m, 10);
732             let _ = rx.recv();
733
734             assert_eq!(*sharedstate, 20);
735         }
736
737         fn access_shared(sharedstate: *mut int, m: &Arc<Mutex>, n: uint) {
738             for _ in range(0u, n) {
739                 let _g = m.lock();
740                 let oldval = unsafe { *sharedstate };
741                 task::deschedule();
742                 unsafe { *sharedstate = oldval + 1; }
743             }
744         }
745     }
746     #[test]
747     fn test_mutex_cond_wait() {
748         let m = Arc::new(Mutex::new());
749
750         // Child wakes up parent
751         {
752             let lock = m.lock();
753             let m2 = m.clone();
754             task::spawn(proc() {
755                 let lock = m2.lock();
756                 let woken = lock.cond.signal();
757                 assert!(woken);
758             });
759             lock.cond.wait();
760         }
761         // Parent wakes up child
762         let (tx, rx) = channel();
763         let m3 = m.clone();
764         task::spawn(proc() {
765             let lock = m3.lock();
766             tx.send(());
767             lock.cond.wait();
768             tx.send(());
769         });
770         rx.recv(); // Wait until child gets in the mutex
771         {
772             let lock = m.lock();
773             let woken = lock.cond.signal();
774             assert!(woken);
775         }
776         rx.recv(); // Wait until child wakes up
777     }
778
779     fn test_mutex_cond_broadcast_helper(num_waiters: uint) {
780         let m = Arc::new(Mutex::new());
781         let mut rxs = Vec::new();
782
783         for _ in range(0u, num_waiters) {
784             let mi = m.clone();
785             let (tx, rx) = channel();
786             rxs.push(rx);
787             task::spawn(proc() {
788                 let lock = mi.lock();
789                 tx.send(());
790                 lock.cond.wait();
791                 tx.send(());
792             });
793         }
794
795         // wait until all children get in the mutex
796         for rx in rxs.mut_iter() { rx.recv(); }
797         {
798             let lock = m.lock();
799             let num_woken = lock.cond.broadcast();
800             assert_eq!(num_woken, num_waiters);
801         }
802         // wait until all children wake up
803         for rx in rxs.mut_iter() { rx.recv(); }
804     }
805     #[test]
806     fn test_mutex_cond_broadcast() {
807         test_mutex_cond_broadcast_helper(12);
808     }
809     #[test]
810     fn test_mutex_cond_broadcast_none() {
811         test_mutex_cond_broadcast_helper(0);
812     }
813     #[test]
814     fn test_mutex_cond_no_waiter() {
815         let m = Arc::new(Mutex::new());
816         let m2 = m.clone();
817         let _ = task::try(proc() {
818             drop(m.lock());
819         });
820         let lock = m2.lock();
821         assert!(!lock.cond.signal());
822     }
823     #[test]
824     fn test_mutex_killed_simple() {
825         use std::any::Any;
826
827         // Mutex must get automatically unlocked if failed/killed within.
828         let m = Arc::new(Mutex::new());
829         let m2 = m.clone();
830
831         let result: result::Result<(), Box<Any + Send>> = task::try(proc() {
832             let _lock = m2.lock();
833             fail!();
834         });
835         assert!(result.is_err());
836         // child task must have finished by the time try returns
837         drop(m.lock());
838     }
839     #[test]
840     fn test_mutex_cond_signal_on_0() {
841         // Tests that signal_on(0) is equivalent to signal().
842         let m = Arc::new(Mutex::new());
843         let lock = m.lock();
844         let m2 = m.clone();
845         task::spawn(proc() {
846             let lock = m2.lock();
847             lock.cond.signal_on(0);
848         });
849         lock.cond.wait();
850     }
851     #[test]
852     fn test_mutex_no_condvars() {
853         let result = task::try(proc() {
854             let m = Mutex::new_with_condvars(0);
855             m.lock().cond.wait();
856         });
857         assert!(result.is_err());
858         let result = task::try(proc() {
859             let m = Mutex::new_with_condvars(0);
860             m.lock().cond.signal();
861         });
862         assert!(result.is_err());
863         let result = task::try(proc() {
864             let m = Mutex::new_with_condvars(0);
865             m.lock().cond.broadcast();
866         });
867         assert!(result.is_err());
868     }
869     /************************************************************************
870      * Reader/writer lock tests
871      ************************************************************************/
872     #[cfg(test)]
873     pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead }
874     #[cfg(test)]
875     fn lock_rwlock_in_mode(x: &Arc<RWLock>, mode: RWLockMode, blk: ||) {
876         match mode {
877             Read => { let _g = x.read(); blk() }
878             Write => { let _g = x.write(); blk() }
879             Downgrade => { let _g = x.write(); blk() }
880             DowngradeRead => { let _g = x.write().downgrade(); blk() }
881         }
882     }
883     #[cfg(test)]
884     fn test_rwlock_exclusion(x: Arc<RWLock>,
885                              mode1: RWLockMode,
886                              mode2: RWLockMode) {
887         // Test mutual exclusion between readers and writers. Just like the
888         // mutex mutual exclusion test, a ways above.
889         let (tx, rx) = channel();
890         let x2 = x.clone();
891         let mut sharedstate = box 0;
892         {
893             let ptr: *const int = &*sharedstate;
894             task::spawn(proc() {
895                 let sharedstate: &mut int =
896                     unsafe { mem::transmute(ptr) };
897                 access_shared(sharedstate, &x2, mode1, 10);
898                 tx.send(());
899             });
900         }
901         {
902             access_shared(sharedstate, &x, mode2, 10);
903             let _ = rx.recv();
904
905             assert_eq!(*sharedstate, 20);
906         }
907
908         fn access_shared(sharedstate: &mut int, x: &Arc<RWLock>,
909                          mode: RWLockMode, n: uint) {
910             for _ in range(0u, n) {
911                 lock_rwlock_in_mode(x, mode, || {
912                     let oldval = *sharedstate;
913                     task::deschedule();
914                     *sharedstate = oldval + 1;
915                 })
916             }
917         }
918     }
919     #[test]
920     fn test_rwlock_readers_wont_modify_the_data() {
921         test_rwlock_exclusion(Arc::new(RWLock::new()), Read, Write);
922         test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Read);
923         test_rwlock_exclusion(Arc::new(RWLock::new()), Read, Downgrade);
924         test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Read);
925         test_rwlock_exclusion(Arc::new(RWLock::new()), Write, DowngradeRead);
926         test_rwlock_exclusion(Arc::new(RWLock::new()), DowngradeRead, Write);
927     }
928     #[test]
929     fn test_rwlock_writers_and_writers() {
930         test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Write);
931         test_rwlock_exclusion(Arc::new(RWLock::new()), Write, Downgrade);
932         test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Write);
933         test_rwlock_exclusion(Arc::new(RWLock::new()), Downgrade, Downgrade);
934     }
935     #[cfg(test)]
936     fn test_rwlock_handshake(x: Arc<RWLock>,
937                              mode1: RWLockMode,
938                              mode2: RWLockMode,
939                              make_mode2_go_first: bool) {
940         // Much like sem_multi_resource.
941         let x2 = x.clone();
942         let (tx1, rx1) = channel();
943         let (tx2, rx2) = channel();
944         task::spawn(proc() {
945             if !make_mode2_go_first {
946                 rx2.recv(); // parent sends to us once it locks, or ...
947             }
948             lock_rwlock_in_mode(&x2, mode2, || {
949                 if make_mode2_go_first {
950                     tx1.send(()); // ... we send to it once we lock
951                 }
952                 rx2.recv();
953                 tx1.send(());
954             })
955         });
956         if make_mode2_go_first {
957             rx1.recv(); // child sends to us once it locks, or ...
958         }
959         lock_rwlock_in_mode(&x, mode1, || {
960             if !make_mode2_go_first {
961                 tx2.send(()); // ... we send to it once we lock
962             }
963             tx2.send(());
964             rx1.recv();
965         })
966     }
967     #[test]
968     fn test_rwlock_readers_and_readers() {
969         test_rwlock_handshake(Arc::new(RWLock::new()), Read, Read, false);
970         // The downgrader needs to get in before the reader gets in, otherwise
971         // they cannot end up reading at the same time.
972         test_rwlock_handshake(Arc::new(RWLock::new()), DowngradeRead, Read, false);
973         test_rwlock_handshake(Arc::new(RWLock::new()), Read, DowngradeRead, true);
974         // Two downgrade_reads can never both end up reading at the same time.
975     }
976     #[test]
977     fn test_rwlock_downgrade_unlock() {
978         // Tests that downgrade can unlock the lock in both modes
979         let x = Arc::new(RWLock::new());
980         lock_rwlock_in_mode(&x, Downgrade, || { });
981         test_rwlock_handshake(x, Read, Read, false);
982         let y = Arc::new(RWLock::new());
983         lock_rwlock_in_mode(&y, DowngradeRead, || { });
984         test_rwlock_exclusion(y, Write, Write);
985     }
986     #[test]
987     fn test_rwlock_read_recursive() {
988         let x = RWLock::new();
989         let _g1 = x.read();
990         let _g2 = x.read();
991     }
992     #[test]
993     fn test_rwlock_cond_wait() {
994         // As test_mutex_cond_wait above.
995         let x = Arc::new(RWLock::new());
996
997         // Child wakes up parent
998         {
999             let lock = x.write();
1000             let x2 = x.clone();
1001             task::spawn(proc() {
1002                 let lock = x2.write();
1003                 assert!(lock.cond.signal());
1004             });
1005             lock.cond.wait();
1006         }
1007         // Parent wakes up child
1008         let (tx, rx) = channel();
1009         let x3 = x.clone();
1010         task::spawn(proc() {
1011             let lock = x3.write();
1012             tx.send(());
1013             lock.cond.wait();
1014             tx.send(());
1015         });
1016         rx.recv(); // Wait until child gets in the rwlock
1017         drop(x.read()); // Must be able to get in as a reader
1018         {
1019             let x = x.write();
1020             assert!(x.cond.signal());
1021         }
1022         rx.recv(); // Wait until child wakes up
1023         drop(x.read()); // Just for good measure
1024     }
1025     #[cfg(test)]
1026     fn test_rwlock_cond_broadcast_helper(num_waiters: uint) {
1027         // Much like the mutex broadcast test. Downgrade-enabled.
1028         fn lock_cond(x: &Arc<RWLock>, blk: |c: &Condvar|) {
1029             let lock = x.write();
1030             blk(&lock.cond);
1031         }
1032
1033         let x = Arc::new(RWLock::new());
1034         let mut rxs = Vec::new();
1035
1036         for _ in range(0u, num_waiters) {
1037             let xi = x.clone();
1038             let (tx, rx) = channel();
1039             rxs.push(rx);
1040             task::spawn(proc() {
1041                 lock_cond(&xi, |cond| {
1042                     tx.send(());
1043                     cond.wait();
1044                     tx.send(());
1045                 })
1046             });
1047         }
1048
1049         // wait until all children get in the mutex
1050         for rx in rxs.mut_iter() { let _ = rx.recv(); }
1051         lock_cond(&x, |cond| {
1052             let num_woken = cond.broadcast();
1053             assert_eq!(num_woken, num_waiters);
1054         });
1055         // wait until all children wake up
1056         for rx in rxs.mut_iter() { let _ = rx.recv(); }
1057     }
1058     #[test]
1059     fn test_rwlock_cond_broadcast() {
1060         test_rwlock_cond_broadcast_helper(0);
1061         test_rwlock_cond_broadcast_helper(12);
1062     }
1063     #[cfg(test)]
1064     fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) {
1065         use std::any::Any;
1066
1067         // Mutex must get automatically unlocked if failed/killed within.
1068         let x = Arc::new(RWLock::new());
1069         let x2 = x.clone();
1070
1071         let result: result::Result<(), Box<Any + Send>> = task::try(proc() {
1072             lock_rwlock_in_mode(&x2, mode1, || {
1073                 fail!();
1074             })
1075         });
1076         assert!(result.is_err());
1077         // child task must have finished by the time try returns
1078         lock_rwlock_in_mode(&x, mode2, || { })
1079     }
1080     #[test]
1081     fn test_rwlock_reader_killed_writer() {
1082         rwlock_kill_helper(Read, Write);
1083     }
1084     #[test]
1085     fn test_rwlock_writer_killed_reader() {
1086         rwlock_kill_helper(Write, Read);
1087     }
1088     #[test]
1089     fn test_rwlock_reader_killed_reader() {
1090         rwlock_kill_helper(Read, Read);
1091     }
1092     #[test]
1093     fn test_rwlock_writer_killed_writer() {
1094         rwlock_kill_helper(Write, Write);
1095     }
1096     #[test]
1097     fn test_rwlock_kill_downgrader() {
1098         rwlock_kill_helper(Downgrade, Read);
1099         rwlock_kill_helper(Read, Downgrade);
1100         rwlock_kill_helper(Downgrade, Write);
1101         rwlock_kill_helper(Write, Downgrade);
1102         rwlock_kill_helper(DowngradeRead, Read);
1103         rwlock_kill_helper(Read, DowngradeRead);
1104         rwlock_kill_helper(DowngradeRead, Write);
1105         rwlock_kill_helper(Write, DowngradeRead);
1106         rwlock_kill_helper(DowngradeRead, Downgrade);
1107         rwlock_kill_helper(DowngradeRead, Downgrade);
1108         rwlock_kill_helper(Downgrade, DowngradeRead);
1109         rwlock_kill_helper(Downgrade, DowngradeRead);
1110     }
1111 }