]> git.lizzy.rs Git - rust.git/blob - src/libsync/sync/mod.rs
3bb60046b035ed3f2aa19d7deaa29b6cfd55eff5
[rust.git] / src / libsync / sync / mod.rs
1 // Copyright 2012-2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 #[allow(missing_doc)];
12
13 /**
14  * The concurrency primitives you know and love.
15  *
16  * Maybe once we have a "core exports x only to std" mechanism, these can be
17  * in std.
18  */
19
20 use std::cast;
21 use std::comm;
22 use std::kinds::marker;
23 use std::mem::replace;
24 use std::sync::arc::UnsafeArc;
25 use std::sync::atomics;
26 use std::unstable::finally::Finally;
27
28 use arc::MutexArc;
29
30 /****************************************************************************
31  * Internals
32  ****************************************************************************/
33
34 pub mod mutex;
35 pub mod one;
36 mod mpsc_intrusive;
37
38 // Each waiting task receives on one of these.
39 #[doc(hidden)]
40 type WaitEnd = Receiver<()>;
41 #[doc(hidden)]
42 type SignalEnd = Sender<()>;
43 // A doubly-ended queue of waiting tasks.
44 #[doc(hidden)]
45 struct WaitQueue { head: Receiver<SignalEnd>,
46                    tail: Sender<SignalEnd> }
47
48 impl WaitQueue {
49     fn new() -> WaitQueue {
50         let (block_tail, block_head) = channel();
51         WaitQueue { head: block_head, tail: block_tail }
52     }
53
54     // Signals one live task from the queue.
55     fn signal(&self) -> bool {
56         match self.head.try_recv() {
57             comm::Data(ch) => {
58                 // Send a wakeup signal. If the waiter was killed, its port will
59                 // have closed. Keep trying until we get a live task.
60                 if ch.try_send(()) {
61                     true
62                 } else {
63                     self.signal()
64                 }
65             }
66             _ => false
67         }
68     }
69
70     fn broadcast(&self) -> uint {
71         let mut count = 0;
72         loop {
73             match self.head.try_recv() {
74                 comm::Data(ch) => {
75                     if ch.try_send(()) {
76                         count += 1;
77                     }
78                 }
79                 _ => break
80             }
81         }
82         count
83     }
84
85     fn wait_end(&self) -> WaitEnd {
86         let (signal_end, wait_end) = channel();
87         assert!(self.tail.try_send(signal_end));
88         wait_end
89     }
90 }
91
92 // The building-block used to make semaphores, mutexes, and rwlocks.
93 struct SemInner<Q> {
94     lock: mutex::Mutex,
95     count: int,
96     waiters:   WaitQueue,
97     // Can be either unit or another waitqueue. Some sems shouldn't come with
98     // a condition variable attached, others should.
99     blocked:   Q
100 }
101
102 struct Sem<Q>(UnsafeArc<SemInner<Q>>);
103
104 #[doc(hidden)]
105 impl<Q:Send> Sem<Q> {
106     fn new(count: int, q: Q) -> Sem<Q> {
107         Sem(UnsafeArc::new(SemInner {
108             count: count,
109             waiters: WaitQueue::new(),
110             blocked: q,
111             lock: mutex::Mutex::new(),
112         }))
113     }
114
115     unsafe fn with(&self, f: |&mut SemInner<Q>|) {
116         let Sem(ref arc) = *self;
117         let state = arc.get();
118         let _g = (*state).lock.lock();
119         f(cast::transmute(state));
120     }
121
122     pub fn acquire(&self) {
123         unsafe {
124             let mut waiter_nobe = None;
125             self.with(|state| {
126                 state.count -= 1;
127                 if state.count < 0 {
128                     // Create waiter nobe, enqueue ourself, and tell
129                     // outer scope we need to block.
130                     waiter_nobe = Some(state.waiters.wait_end());
131                 }
132             });
133             // Uncomment if you wish to test for sem races. Not valgrind-friendly.
134             /* for _ in range(0, 1000) { task::deschedule(); } */
135             // Need to wait outside the exclusive.
136             if waiter_nobe.is_some() {
137                 let _ = waiter_nobe.unwrap().recv();
138             }
139         }
140     }
141
142     pub fn release(&self) {
143         unsafe {
144             self.with(|state| {
145                 state.count += 1;
146                 if state.count <= 0 {
147                     state.waiters.signal();
148                 }
149             })
150         }
151     }
152
153     pub fn access<U>(&self, blk: || -> U) -> U {
154         (|| {
155             self.acquire();
156             blk()
157         }).finally(|| {
158             self.release();
159         })
160     }
161 }
162
163 #[doc(hidden)]
164 impl Sem<~[WaitQueue]> {
165     fn new_and_signal(count: int, num_condvars: uint)
166         -> Sem<~[WaitQueue]> {
167         let mut queues = ~[];
168         for _ in range(0, num_condvars) { queues.push(WaitQueue::new()); }
169         Sem::new(count, queues)
170     }
171 }
172
173 // FIXME(#3598): Want to use an Option down below, but we need a custom enum
174 // that's not polymorphic to get around the fact that lifetimes are invariant
175 // inside of type parameters.
176 enum ReacquireOrderLock<'a> {
177     Nothing, // c.c
178     Just(&'a Semaphore),
179 }
180
181 /// A mechanism for atomic-unlock-and-deschedule blocking and signalling.
182 pub struct Condvar<'a> {
183     // The 'Sem' object associated with this condvar. This is the one that's
184     // atomically-unlocked-and-descheduled upon and reacquired during wakeup.
185     priv sem: &'a Sem<~[WaitQueue]>,
186     // This is (can be) an extra semaphore which is held around the reacquire
187     // operation on the first one. This is only used in cvars associated with
188     // rwlocks, and is needed to ensure that, when a downgrader is trying to
189     // hand off the access lock (which would be the first field, here), a 2nd
190     // writer waking up from a cvar wait can't race with a reader to steal it,
191     // See the comment in write_cond for more detail.
192     priv order: ReacquireOrderLock<'a>,
193     // Make sure condvars are non-copyable.
194     priv nopod: marker::NoPod,
195 }
196
197 impl<'a> Condvar<'a> {
198     /**
199      * Atomically drop the associated lock, and block until a signal is sent.
200      *
201      * # Failure
202      * A task which is killed (i.e., by linked failure with another task)
203      * while waiting on a condition variable will wake up, fail, and unlock
204      * the associated lock as it unwinds.
205      */
206     pub fn wait(&self) { self.wait_on(0) }
207
208     /**
209      * As wait(), but can specify which of multiple condition variables to
210      * wait on. Only a signal_on() or broadcast_on() with the same condvar_id
211      * will wake this thread.
212      *
213      * The associated lock must have been initialised with an appropriate
214      * number of condvars. The condvar_id must be between 0 and num_condvars-1
215      * or else this call will fail.
216      *
217      * wait() is equivalent to wait_on(0).
218      */
219     pub fn wait_on(&self, condvar_id: uint) {
220         let mut wait_end = None;
221         let mut out_of_bounds = None;
222         // Release lock, 'atomically' enqueuing ourselves in so doing.
223         unsafe {
224             self.sem.with(|state| {
225                 if condvar_id < state.blocked.len() {
226                     // Drop the lock.
227                     state.count += 1;
228                     if state.count <= 0 {
229                         state.waiters.signal();
230                     }
231                     // Create waiter nobe, and enqueue ourself to
232                     // be woken up by a signaller.
233                     wait_end = Some(state.blocked[condvar_id].wait_end());
234                 } else {
235                     out_of_bounds = Some(state.blocked.len());
236                 }
237             })
238         }
239
240         // If deschedule checks start getting inserted anywhere, we can be
241         // killed before or after enqueueing.
242         check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()", || {
243             // Unconditionally "block". (Might not actually block if a
244             // signaller already sent -- I mean 'unconditionally' in contrast
245             // with acquire().)
246             (|| {
247                 let _ = wait_end.take_unwrap().recv();
248             }).finally(|| {
249                 // Reacquire the condvar.
250                 match self.order {
251                     Just(lock) => lock.access(|| self.sem.acquire()),
252                     Nothing => self.sem.acquire(),
253                 }
254             })
255         })
256     }
257
258     /// Wake up a blocked task. Returns false if there was no blocked task.
259     pub fn signal(&self) -> bool { self.signal_on(0) }
260
261     /// As signal, but with a specified condvar_id. See wait_on.
262     pub fn signal_on(&self, condvar_id: uint) -> bool {
263         unsafe {
264             let mut out_of_bounds = None;
265             let mut result = false;
266             self.sem.with(|state| {
267                 if condvar_id < state.blocked.len() {
268                     result = state.blocked[condvar_id].signal();
269                 } else {
270                     out_of_bounds = Some(state.blocked.len());
271                 }
272             });
273             check_cvar_bounds(out_of_bounds,
274                               condvar_id,
275                               "cond.signal_on()",
276                               || result)
277         }
278     }
279
280     /// Wake up all blocked tasks. Returns the number of tasks woken.
281     pub fn broadcast(&self) -> uint { self.broadcast_on(0) }
282
283     /// As broadcast, but with a specified condvar_id. See wait_on.
284     pub fn broadcast_on(&self, condvar_id: uint) -> uint {
285         let mut out_of_bounds = None;
286         let mut queue = None;
287         unsafe {
288             self.sem.with(|state| {
289                 if condvar_id < state.blocked.len() {
290                     // To avoid :broadcast_heavy, we make a new waitqueue,
291                     // swap it out with the old one, and broadcast on the
292                     // old one outside of the little-lock.
293                     queue = Some(replace(&mut state.blocked[condvar_id],
294                                                WaitQueue::new()));
295                 } else {
296                     out_of_bounds = Some(state.blocked.len());
297                 }
298             });
299             check_cvar_bounds(out_of_bounds,
300                               condvar_id,
301                               "cond.signal_on()",
302                               || {
303                 queue.take_unwrap().broadcast()
304             })
305         }
306     }
307 }
308
309 // Checks whether a condvar ID was out of bounds, and fails if so, or does
310 // something else next on success.
311 #[inline]
312 #[doc(hidden)]
313 fn check_cvar_bounds<U>(
314                      out_of_bounds: Option<uint>,
315                      id: uint,
316                      act: &str,
317                      blk: || -> U)
318                      -> U {
319     match out_of_bounds {
320         Some(0) =>
321             fail!("{} with illegal ID {} - this lock has no condvars!", act, id),
322         Some(length) =>
323             fail!("{} with illegal ID {} - ID must be less than {}", act, id, length),
324         None => blk()
325     }
326 }
327
328 #[doc(hidden)]
329 impl Sem<~[WaitQueue]> {
330     // The only other places that condvars get built are rwlock.write_cond()
331     // and rwlock_write_mode.
332     pub fn access_cond<U>(&self, blk: |c: &Condvar| -> U) -> U {
333         self.access(|| {
334             blk(&Condvar {
335                 sem: self,
336                 order: Nothing,
337                 nopod: marker::NoPod
338             })
339         })
340     }
341 }
342
343 /****************************************************************************
344  * Semaphores
345  ****************************************************************************/
346
347 /// A counting, blocking, bounded-waiting semaphore.
348 pub struct Semaphore { priv sem: Sem<()> }
349
350
351 impl Clone for Semaphore {
352     /// Create a new handle to the semaphore.
353     fn clone(&self) -> Semaphore {
354         let Sem(ref lock) = self.sem;
355         Semaphore { sem: Sem(lock.clone()) }
356     }
357 }
358
359 impl Semaphore {
360     /// Create a new semaphore with the specified count.
361     pub fn new(count: int) -> Semaphore {
362         Semaphore { sem: Sem::new(count, ()) }
363     }
364
365     /**
366      * Acquire a resource represented by the semaphore. Blocks if necessary
367      * until resource(s) become available.
368      */
369     pub fn acquire(&self) { (&self.sem).acquire() }
370
371     /**
372      * Release a held resource represented by the semaphore. Wakes a blocked
373      * contending task, if any exist. Won't block the caller.
374      */
375     pub fn release(&self) { (&self.sem).release() }
376
377     /// Run a function with ownership of one of the semaphore's resources.
378     pub fn access<U>(&self, blk: || -> U) -> U { (&self.sem).access(blk) }
379 }
380
381 /****************************************************************************
382  * Mutexes
383  ****************************************************************************/
384
385 /**
386  * A blocking, bounded-waiting, mutual exclusion lock with an associated
387  * FIFO condition variable.
388  *
389  * # Failure
390  * A task which fails while holding a mutex will unlock the mutex as it
391  * unwinds.
392  */
393
394 pub struct Mutex { priv sem: Sem<~[WaitQueue]> }
395 impl Clone for Mutex {
396     /// Create a new handle to the mutex.
397     fn clone(&self) -> Mutex {
398         let Sem(ref queue) = self.sem;
399         Mutex { sem: Sem(queue.clone()) } }
400 }
401
402 impl Mutex {
403     /// Create a new mutex, with one associated condvar.
404     pub fn new() -> Mutex { Mutex::new_with_condvars(1) }
405
406     /**
407     * Create a new mutex, with a specified number of associated condvars. This
408     * will allow calling wait_on/signal_on/broadcast_on with condvar IDs between
409     * 0 and num_condvars-1. (If num_condvars is 0, lock_cond will be allowed but
410     * any operations on the condvar will fail.)
411     */
412     pub fn new_with_condvars(num_condvars: uint) -> Mutex {
413         Mutex { sem: Sem::new_and_signal(1, num_condvars) }
414     }
415
416
417     /// Run a function with ownership of the mutex.
418     pub fn lock<U>(&self, blk: || -> U) -> U {
419         (&self.sem).access(blk)
420     }
421
422     /// Run a function with ownership of the mutex and a handle to a condvar.
423     pub fn lock_cond<U>(&self, blk: |c: &Condvar| -> U) -> U {
424         (&self.sem).access_cond(blk)
425     }
426 }
427
428 /****************************************************************************
429  * Reader-writer locks
430  ****************************************************************************/
431
432 // NB: Wikipedia - Readers-writers_problem#The_third_readers-writers_problem
433
434 #[doc(hidden)]
435 struct RWLockInner {
436     // You might ask, "Why don't you need to use an atomic for the mode flag?"
437     // This flag affects the behaviour of readers (for plain readers, they
438     // assert on it; for downgraders, they use it to decide which mode to
439     // unlock for). Consider that the flag is only unset when the very last
440     // reader exits; therefore, it can never be unset during a reader/reader
441     // (or reader/downgrader) race.
442     // By the way, if we didn't care about the assert in the read unlock path,
443     // we could instead store the mode flag in write_downgrade's stack frame,
444     // and have the downgrade tokens store a reference to it.
445     read_mode:  bool,
446     // The only way the count flag is ever accessed is with xadd. Since it is
447     // a read-modify-write operation, multiple xadds on different cores will
448     // always be consistent with respect to each other, so a monotonic/relaxed
449     // consistency ordering suffices (i.e., no extra barriers are needed).
450     // FIXME(#6598): The atomics module has no relaxed ordering flag, so I use
451     // acquire/release orderings superfluously. Change these someday.
452     read_count: atomics::AtomicUint,
453 }
454
455 /**
456  * A blocking, no-starvation, reader-writer lock with an associated condvar.
457  *
458  * # Failure
459  * A task which fails while holding an rwlock will unlock the rwlock as it
460  * unwinds.
461  */
462 pub struct RWLock {
463     priv order_lock:  Semaphore,
464     priv access_lock: Sem<~[WaitQueue]>,
465     priv state:       UnsafeArc<RWLockInner>,
466 }
467
468 impl RWLock {
469     /// Create a new rwlock, with one associated condvar.
470     pub fn new() -> RWLock { RWLock::new_with_condvars(1) }
471
472     /**
473     * Create a new rwlock, with a specified number of associated condvars.
474     * Similar to mutex_with_condvars.
475     */
476     pub fn new_with_condvars(num_condvars: uint) -> RWLock {
477         let state = UnsafeArc::new(RWLockInner {
478             read_mode:  false,
479             read_count: atomics::AtomicUint::new(0),
480         });
481         RWLock { order_lock:  Semaphore::new(1),
482                 access_lock: Sem::new_and_signal(1, num_condvars),
483                 state:       state, }
484     }
485
486     /// Create a new handle to the rwlock.
487     pub fn clone(&self) -> RWLock {
488         let Sem(ref access_lock_queue) = self.access_lock;
489         RWLock { order_lock:  (&(self.order_lock)).clone(),
490                  access_lock: Sem(access_lock_queue.clone()),
491                  state:       self.state.clone() }
492     }
493
494     /**
495      * Run a function with the rwlock in read mode. Calls to 'read' from other
496      * tasks may run concurrently with this one.
497      */
498     pub fn read<U>(&self, blk: || -> U) -> U {
499         unsafe {
500             (&self.order_lock).access(|| {
501                 let state = &mut *self.state.get();
502                 let old_count = state.read_count.fetch_add(1, atomics::Acquire);
503                 if old_count == 0 {
504                     (&self.access_lock).acquire();
505                     state.read_mode = true;
506                 }
507             });
508             (|| {
509                 blk()
510             }).finally(|| {
511                 let state = &mut *self.state.get();
512                 assert!(state.read_mode);
513                 let old_count = state.read_count.fetch_sub(1, atomics::Release);
514                 assert!(old_count > 0);
515                 if old_count == 1 {
516                     state.read_mode = false;
517                     // Note: this release used to be outside of a locked access
518                     // to exclusive-protected state. If this code is ever
519                     // converted back to such (instead of using atomic ops),
520                     // this access MUST NOT go inside the exclusive access.
521                     (&self.access_lock).release();
522                 }
523             })
524         }
525     }
526
527     /**
528      * Run a function with the rwlock in write mode. No calls to 'read' or
529      * 'write' from other tasks will run concurrently with this one.
530      */
531     pub fn write<U>(&self, blk: || -> U) -> U {
532         (&self.order_lock).acquire();
533         (&self.access_lock).access(|| {
534             (&self.order_lock).release();
535             blk()
536         })
537     }
538
539     /**
540      * As write(), but also with a handle to a condvar. Waiting on this
541      * condvar will allow readers and writers alike to take the rwlock before
542      * the waiting task is signalled. (Note: a writer that waited and then
543      * was signalled might reacquire the lock before other waiting writers.)
544      */
545     pub fn write_cond<U>(&self, blk: |c: &Condvar| -> U) -> U {
546         // It's important to thread our order lock into the condvar, so that
547         // when a cond.wait() wakes up, it uses it while reacquiring the
548         // access lock. If we permitted a waking-up writer to "cut in line",
549         // there could arise a subtle race when a downgrader attempts to hand
550         // off the reader cloud lock to a waiting reader. This race is tested
551         // in arc.rs (test_rw_write_cond_downgrade_read_race) and looks like:
552         // T1 (writer)              T2 (downgrader)             T3 (reader)
553         // [in cond.wait()]
554         //                          [locks for writing]
555         //                          [holds access_lock]
556         // [is signalled, perhaps by
557         //  downgrader or a 4th thread]
558         // tries to lock access(!)
559         //                                                      lock order_lock
560         //                                                      xadd read_count[0->1]
561         //                                                      tries to lock access
562         //                          [downgrade]
563         //                          xadd read_count[1->2]
564         //                          unlock access
565         // Since T1 contended on the access lock before T3 did, it will steal
566         // the lock handoff. Adding order_lock in the condvar reacquire path
567         // solves this because T1 will hold order_lock while waiting on access,
568         // which will cause T3 to have to wait until T1 finishes its write,
569         // which can't happen until T2 finishes the downgrade-read entirely.
570         // The astute reader will also note that making waking writers use the
571         // order_lock is better for not starving readers.
572         (&self.order_lock).acquire();
573         (&self.access_lock).access_cond(|cond| {
574             (&self.order_lock).release();
575             let opt_lock = Just(&self.order_lock);
576             blk(&Condvar { sem: cond.sem, order: opt_lock,
577                            nopod: marker::NoPod })
578         })
579     }
580
581     /**
582      * As write(), but with the ability to atomically 'downgrade' the lock;
583      * i.e., to become a reader without letting other writers get the lock in
584      * the meantime (such as unlocking and then re-locking as a reader would
585      * do). The block takes a "write mode token" argument, which can be
586      * transformed into a "read mode token" by calling downgrade(). Example:
587      *
588      * # Example
589      *
590      * ```rust
591      * use sync::RWLock;
592      *
593      * let lock = RWLock::new();
594      * lock.write_downgrade(|mut write_token| {
595      *     write_token.write_cond(|condvar| {
596      *         // ... exclusive access ...
597      *     });
598      *     let read_token = lock.downgrade(write_token);
599      *     read_token.read(|| {
600      *         // ... shared access ...
601      *     })
602      * })
603      * ```
604      */
605     pub fn write_downgrade<U>(&self, blk: |v: RWLockWriteMode| -> U) -> U {
606         // Implementation slightly different from the slicker 'write's above.
607         // The exit path is conditional on whether the caller downgrades.
608         (&self.order_lock).acquire();
609         (&self.access_lock).acquire();
610         (&self.order_lock).release();
611         (|| {
612             blk(RWLockWriteMode { lock: self, nopod: marker::NoPod })
613         }).finally(|| {
614             let writer_or_last_reader;
615             // Check if we're releasing from read mode or from write mode.
616             let state = unsafe { &mut *self.state.get() };
617             if state.read_mode {
618                 // Releasing from read mode.
619                 let old_count = state.read_count.fetch_sub(1, atomics::Release);
620                 assert!(old_count > 0);
621                 // Check if other readers remain.
622                 if old_count == 1 {
623                     // Case 1: Writer downgraded & was the last reader
624                     writer_or_last_reader = true;
625                     state.read_mode = false;
626                 } else {
627                     // Case 2: Writer downgraded & was not the last reader
628                     writer_or_last_reader = false;
629                 }
630             } else {
631                 // Case 3: Writer did not downgrade
632                 writer_or_last_reader = true;
633             }
634             if writer_or_last_reader {
635                 // Nobody left inside; release the "reader cloud" lock.
636                 (&self.access_lock).release();
637             }
638         })
639     }
640
641     /// To be called inside of the write_downgrade block.
642     pub fn downgrade<'a>(&self, token: RWLockWriteMode<'a>)
643                          -> RWLockReadMode<'a> {
644         if !((self as *RWLock) == (token.lock as *RWLock)) {
645             fail!("Can't downgrade() with a different rwlock's write_mode!");
646         }
647         unsafe {
648             let state = &mut *self.state.get();
649             assert!(!state.read_mode);
650             state.read_mode = true;
651             // If a reader attempts to enter at this point, both the
652             // downgrader and reader will set the mode flag. This is fine.
653             let old_count = state.read_count.fetch_add(1, atomics::Release);
654             // If another reader was already blocking, we need to hand-off
655             // the "reader cloud" access lock to them.
656             if old_count != 0 {
657                 // Guaranteed not to let another writer in, because
658                 // another reader was holding the order_lock. Hence they
659                 // must be the one to get the access_lock (because all
660                 // access_locks are acquired with order_lock held). See
661                 // the comment in write_cond for more justification.
662                 (&self.access_lock).release();
663             }
664         }
665         RWLockReadMode { lock: token.lock, nopod: marker::NoPod }
666     }
667 }
668
669 /// The "write permission" token used for rwlock.write_downgrade().
670
671 pub struct RWLockWriteMode<'a> { priv lock: &'a RWLock, priv nopod: marker::NoPod }
672 /// The "read permission" token used for rwlock.write_downgrade().
673 pub struct RWLockReadMode<'a> { priv lock: &'a RWLock,
674                                    priv nopod: marker::NoPod }
675
676 impl<'a> RWLockWriteMode<'a> {
677     /// Access the pre-downgrade rwlock in write mode.
678     pub fn write<U>(&self, blk: || -> U) -> U { blk() }
679     /// Access the pre-downgrade rwlock in write mode with a condvar.
680     pub fn write_cond<U>(&self, blk: |c: &Condvar| -> U) -> U {
681         // Need to make the condvar use the order lock when reacquiring the
682         // access lock. See comment in RWLock::write_cond for why.
683         blk(&Condvar { sem:        &self.lock.access_lock,
684                        order: Just(&self.lock.order_lock),
685                        nopod: marker::NoPod })
686     }
687 }
688
689 impl<'a> RWLockReadMode<'a> {
690     /// Access the post-downgrade rwlock in read mode.
691     pub fn read<U>(&self, blk: || -> U) -> U { blk() }
692 }
693
694 /// A barrier enables multiple tasks to synchronize the beginning
695 /// of some computation.
696 ///
697 /// ```rust
698 /// use sync::Barrier;
699 ///
700 /// let barrier = Barrier::new(10);
701 /// for _ in range(0, 10) {
702 ///     let c = barrier.clone();
703 ///     // The same messages will be printed together.
704 ///     // You will NOT see any interleaving.
705 ///     spawn(proc() {
706 ///         println!("before wait");
707 ///         c.wait();
708 ///         println!("after wait");
709 ///     });
710 /// }
711 /// ```
712 #[deriving(Clone)]
713 pub struct Barrier {
714     priv arc: MutexArc<BarrierState>,
715     priv num_tasks: uint,
716 }
717
718 // The inner state of a double barrier
719 struct BarrierState {
720     count: uint,
721     generation_id: uint,
722 }
723
724 impl Barrier {
725     /// Create a new barrier that can block a given number of tasks.
726     pub fn new(num_tasks: uint) -> Barrier {
727         Barrier {
728             arc: MutexArc::new(BarrierState {
729                 count: 0,
730                 generation_id: 0,
731             }),
732             num_tasks: num_tasks,
733         }
734     }
735
736     /// Block the current task until a certain number of tasks is waiting.
737     pub fn wait(&self) {
738         self.arc.access_cond(|state, cond| {
739             let local_gen = state.generation_id;
740             state.count += 1;
741             if state.count < self.num_tasks {
742                 // We need a while loop to guard against spurious wakeups.
743                 // http://en.wikipedia.org/wiki/Spurious_wakeup
744                 while local_gen == state.generation_id && state.count < self.num_tasks {
745                     cond.wait();
746                 }
747             } else {
748                 state.count = 0;
749                 state.generation_id += 1;
750                 cond.broadcast();
751             }
752         });
753     }
754 }
755
756 /****************************************************************************
757  * Tests
758  ****************************************************************************/
759
760 #[cfg(test)]
761 mod tests {
762     use sync::{Semaphore, Mutex, RWLock, Barrier, Condvar};
763
764     use std::cast;
765     use std::result;
766     use std::task;
767     use std::comm::Empty;
768
769     /************************************************************************
770      * Semaphore tests
771      ************************************************************************/
772     #[test]
773     fn test_sem_acquire_release() {
774         let s = Semaphore::new(1);
775         s.acquire();
776         s.release();
777         s.acquire();
778     }
779     #[test]
780     fn test_sem_basic() {
781         let s = Semaphore::new(1);
782         s.access(|| { })
783     }
784     #[test]
785     fn test_sem_as_mutex() {
786         let s = Semaphore::new(1);
787         let s2 = s.clone();
788         task::spawn(proc() {
789             s2.access(|| {
790                 for _ in range(0, 5) { task::deschedule(); }
791             })
792         });
793         s.access(|| {
794             for _ in range(0, 5) { task::deschedule(); }
795         })
796     }
797     #[test]
798     fn test_sem_as_cvar() {
799         /* Child waits and parent signals */
800         let (tx, rx) = channel();
801         let s = Semaphore::new(0);
802         let s2 = s.clone();
803         task::spawn(proc() {
804             s2.acquire();
805             tx.send(());
806         });
807         for _ in range(0, 5) { task::deschedule(); }
808         s.release();
809         let _ = rx.recv();
810
811         /* Parent waits and child signals */
812         let (tx, rx) = channel();
813         let s = Semaphore::new(0);
814         let s2 = s.clone();
815         task::spawn(proc() {
816             for _ in range(0, 5) { task::deschedule(); }
817             s2.release();
818             let _ = rx.recv();
819         });
820         s.acquire();
821         tx.send(());
822     }
823     #[test]
824     fn test_sem_multi_resource() {
825         // Parent and child both get in the critical section at the same
826         // time, and shake hands.
827         let s = Semaphore::new(2);
828         let s2 = s.clone();
829         let (tx1, rx1) = channel();
830         let (tx2, rx2) = channel();
831         task::spawn(proc() {
832             s2.access(|| {
833                 let _ = rx2.recv();
834                 tx1.send(());
835             })
836         });
837         s.access(|| {
838             tx2.send(());
839             let _ = rx1.recv();
840         })
841     }
842     #[test]
843     fn test_sem_runtime_friendly_blocking() {
844         // Force the runtime to schedule two threads on the same sched_loop.
845         // When one blocks, it should schedule the other one.
846         let s = Semaphore::new(1);
847         let s2 = s.clone();
848         let (tx, rx) = channel();
849         let mut child_data = Some((s2, tx));
850         s.access(|| {
851             let (s2, tx) = child_data.take_unwrap();
852             task::spawn(proc() {
853                 tx.send(());
854                 s2.access(|| { });
855                 tx.send(());
856             });
857             let _ = rx.recv(); // wait for child to come alive
858             for _ in range(0, 5) { task::deschedule(); } // let the child contend
859         });
860         let _ = rx.recv(); // wait for child to be done
861     }
862     /************************************************************************
863      * Mutex tests
864      ************************************************************************/
865     #[test]
866     fn test_mutex_lock() {
867         // Unsafely achieve shared state, and do the textbook
868         // "load tmp = move ptr; inc tmp; store ptr <- tmp" dance.
869         let (tx, rx) = channel();
870         let m = Mutex::new();
871         let m2 = m.clone();
872         let mut sharedstate = ~0;
873         {
874             let ptr: *int = &*sharedstate;
875             task::spawn(proc() {
876                 let sharedstate: &mut int =
877                     unsafe { cast::transmute(ptr) };
878                 access_shared(sharedstate, &m2, 10);
879                 tx.send(());
880             });
881         }
882         {
883             access_shared(sharedstate, &m, 10);
884             let _ = rx.recv();
885
886             assert_eq!(*sharedstate, 20);
887         }
888
889         fn access_shared(sharedstate: &mut int, m: &Mutex, n: uint) {
890             for _ in range(0, n) {
891                 m.lock(|| {
892                     let oldval = *sharedstate;
893                     task::deschedule();
894                     *sharedstate = oldval + 1;
895                 })
896             }
897         }
898     }
899     #[test]
900     fn test_mutex_cond_wait() {
901         let m = Mutex::new();
902
903         // Child wakes up parent
904         m.lock_cond(|cond| {
905             let m2 = m.clone();
906             task::spawn(proc() {
907                 m2.lock_cond(|cond| {
908                     let woken = cond.signal();
909                     assert!(woken);
910                 })
911             });
912             cond.wait();
913         });
914         // Parent wakes up child
915         let (tx, rx) = channel();
916         let m3 = m.clone();
917         task::spawn(proc() {
918             m3.lock_cond(|cond| {
919                 tx.send(());
920                 cond.wait();
921                 tx.send(());
922             })
923         });
924         let _ = rx.recv(); // Wait until child gets in the mutex
925         m.lock_cond(|cond| {
926             let woken = cond.signal();
927             assert!(woken);
928         });
929         let _ = rx.recv(); // Wait until child wakes up
930     }
931     #[cfg(test)]
932     fn test_mutex_cond_broadcast_helper(num_waiters: uint) {
933         let m = Mutex::new();
934         let mut rxs = ~[];
935
936         for _ in range(0, num_waiters) {
937             let mi = m.clone();
938             let (tx, rx) = channel();
939             rxs.push(rx);
940             task::spawn(proc() {
941                 mi.lock_cond(|cond| {
942                     tx.send(());
943                     cond.wait();
944                     tx.send(());
945                 })
946             });
947         }
948
949         // wait until all children get in the mutex
950         for rx in rxs.mut_iter() { let _ = rx.recv(); }
951         m.lock_cond(|cond| {
952             let num_woken = cond.broadcast();
953             assert_eq!(num_woken, num_waiters);
954         });
955         // wait until all children wake up
956         for rx in rxs.mut_iter() { let _ = rx.recv(); }
957     }
958     #[test]
959     fn test_mutex_cond_broadcast() {
960         test_mutex_cond_broadcast_helper(12);
961     }
962     #[test]
963     fn test_mutex_cond_broadcast_none() {
964         test_mutex_cond_broadcast_helper(0);
965     }
966     #[test]
967     fn test_mutex_cond_no_waiter() {
968         let m = Mutex::new();
969         let m2 = m.clone();
970         let _ = task::try(proc() {
971             m.lock_cond(|_x| { })
972         });
973         m2.lock_cond(|cond| {
974             assert!(!cond.signal());
975         })
976     }
977     #[test]
978     fn test_mutex_killed_simple() {
979         use std::any::Any;
980
981         // Mutex must get automatically unlocked if failed/killed within.
982         let m = Mutex::new();
983         let m2 = m.clone();
984
985         let result: result::Result<(), ~Any> = task::try(proc() {
986             m2.lock(|| {
987                 fail!();
988             })
989         });
990         assert!(result.is_err());
991         // child task must have finished by the time try returns
992         m.lock(|| { })
993     }
994     #[test]
995     fn test_mutex_cond_signal_on_0() {
996         // Tests that signal_on(0) is equivalent to signal().
997         let m = Mutex::new();
998         m.lock_cond(|cond| {
999             let m2 = m.clone();
1000             task::spawn(proc() {
1001                 m2.lock_cond(|cond| {
1002                     cond.signal_on(0);
1003                 })
1004             });
1005             cond.wait();
1006         })
1007     }
1008     #[test]
1009     fn test_mutex_no_condvars() {
1010         let result = task::try(proc() {
1011             let m = Mutex::new_with_condvars(0);
1012             m.lock_cond(|cond| { cond.wait(); })
1013         });
1014         assert!(result.is_err());
1015         let result = task::try(proc() {
1016             let m = Mutex::new_with_condvars(0);
1017             m.lock_cond(|cond| { cond.signal(); })
1018         });
1019         assert!(result.is_err());
1020         let result = task::try(proc() {
1021             let m = Mutex::new_with_condvars(0);
1022             m.lock_cond(|cond| { cond.broadcast(); })
1023         });
1024         assert!(result.is_err());
1025     }
1026     /************************************************************************
1027      * Reader/writer lock tests
1028      ************************************************************************/
1029     #[cfg(test)]
1030     pub enum RWLockMode { Read, Write, Downgrade, DowngradeRead }
1031     #[cfg(test)]
1032     fn lock_rwlock_in_mode(x: &RWLock, mode: RWLockMode, blk: ||) {
1033         match mode {
1034             Read => x.read(blk),
1035             Write => x.write(blk),
1036             Downgrade =>
1037                 x.write_downgrade(|mode| {
1038                     mode.write(|| { blk() });
1039                 }),
1040             DowngradeRead =>
1041                 x.write_downgrade(|mode| {
1042                     let mode = x.downgrade(mode);
1043                     mode.read(|| { blk() });
1044                 }),
1045         }
1046     }
1047     #[cfg(test)]
1048     fn test_rwlock_exclusion(x: &RWLock,
1049                              mode1: RWLockMode,
1050                              mode2: RWLockMode) {
1051         // Test mutual exclusion between readers and writers. Just like the
1052         // mutex mutual exclusion test, a ways above.
1053         let (tx, rx) = channel();
1054         let x2 = x.clone();
1055         let mut sharedstate = ~0;
1056         {
1057             let ptr: *int = &*sharedstate;
1058             task::spawn(proc() {
1059                 let sharedstate: &mut int =
1060                     unsafe { cast::transmute(ptr) };
1061                 access_shared(sharedstate, &x2, mode1, 10);
1062                 tx.send(());
1063             });
1064         }
1065         {
1066             access_shared(sharedstate, x, mode2, 10);
1067             let _ = rx.recv();
1068
1069             assert_eq!(*sharedstate, 20);
1070         }
1071
1072         fn access_shared(sharedstate: &mut int, x: &RWLock, mode: RWLockMode,
1073                          n: uint) {
1074             for _ in range(0, n) {
1075                 lock_rwlock_in_mode(x, mode, || {
1076                     let oldval = *sharedstate;
1077                     task::deschedule();
1078                     *sharedstate = oldval + 1;
1079                 })
1080             }
1081         }
1082     }
1083     #[test]
1084     fn test_rwlock_readers_wont_modify_the_data() {
1085         test_rwlock_exclusion(&RWLock::new(), Read, Write);
1086         test_rwlock_exclusion(&RWLock::new(), Write, Read);
1087         test_rwlock_exclusion(&RWLock::new(), Read, Downgrade);
1088         test_rwlock_exclusion(&RWLock::new(), Downgrade, Read);
1089     }
1090     #[test]
1091     fn test_rwlock_writers_and_writers() {
1092         test_rwlock_exclusion(&RWLock::new(), Write, Write);
1093         test_rwlock_exclusion(&RWLock::new(), Write, Downgrade);
1094         test_rwlock_exclusion(&RWLock::new(), Downgrade, Write);
1095         test_rwlock_exclusion(&RWLock::new(), Downgrade, Downgrade);
1096     }
1097     #[cfg(test)]
1098     fn test_rwlock_handshake(x: &RWLock,
1099                                  mode1: RWLockMode,
1100                                  mode2: RWLockMode,
1101                                  make_mode2_go_first: bool) {
1102         // Much like sem_multi_resource.
1103         let x2 = x.clone();
1104         let (tx1, rx1) = channel();
1105         let (tx2, rx2) = channel();
1106         task::spawn(proc() {
1107             if !make_mode2_go_first {
1108                 let _ = rx2.recv(); // parent sends to us once it locks, or ...
1109             }
1110             lock_rwlock_in_mode(&x2, mode2, || {
1111                 if make_mode2_go_first {
1112                     tx1.send(()); // ... we send to it once we lock
1113                 }
1114                 let _ = rx2.recv();
1115                 tx1.send(());
1116             })
1117         });
1118         if make_mode2_go_first {
1119             let _ = rx1.recv(); // child sends to us once it locks, or ...
1120         }
1121         lock_rwlock_in_mode(x, mode1, || {
1122             if !make_mode2_go_first {
1123                 tx2.send(()); // ... we send to it once we lock
1124             }
1125             tx2.send(());
1126             let _ = rx1.recv();
1127         })
1128     }
1129     #[test]
1130     fn test_rwlock_readers_and_readers() {
1131         test_rwlock_handshake(&RWLock::new(), Read, Read, false);
1132         // The downgrader needs to get in before the reader gets in, otherwise
1133         // they cannot end up reading at the same time.
1134         test_rwlock_handshake(&RWLock::new(), DowngradeRead, Read, false);
1135         test_rwlock_handshake(&RWLock::new(), Read, DowngradeRead, true);
1136         // Two downgrade_reads can never both end up reading at the same time.
1137     }
1138     #[test]
1139     fn test_rwlock_downgrade_unlock() {
1140         // Tests that downgrade can unlock the lock in both modes
1141         let x = RWLock::new();
1142         lock_rwlock_in_mode(&x, Downgrade, || { });
1143         test_rwlock_handshake(&x, Read, Read, false);
1144         let y = RWLock::new();
1145         lock_rwlock_in_mode(&y, DowngradeRead, || { });
1146         test_rwlock_exclusion(&y, Write, Write);
1147     }
1148     #[test]
1149     fn test_rwlock_read_recursive() {
1150         let x = RWLock::new();
1151         x.read(|| { x.read(|| { }) })
1152     }
1153     #[test]
1154     fn test_rwlock_cond_wait() {
1155         // As test_mutex_cond_wait above.
1156         let x = RWLock::new();
1157
1158         // Child wakes up parent
1159         x.write_cond(|cond| {
1160             let x2 = x.clone();
1161             task::spawn(proc() {
1162                 x2.write_cond(|cond| {
1163                     let woken = cond.signal();
1164                     assert!(woken);
1165                 })
1166             });
1167             cond.wait();
1168         });
1169         // Parent wakes up child
1170         let (tx, rx) = channel();
1171         let x3 = x.clone();
1172         task::spawn(proc() {
1173             x3.write_cond(|cond| {
1174                 tx.send(());
1175                 cond.wait();
1176                 tx.send(());
1177             })
1178         });
1179         let _ = rx.recv(); // Wait until child gets in the rwlock
1180         x.read(|| { }); // Must be able to get in as a reader in the meantime
1181         x.write_cond(|cond| { // Or as another writer
1182             let woken = cond.signal();
1183             assert!(woken);
1184         });
1185         let _ = rx.recv(); // Wait until child wakes up
1186         x.read(|| { }); // Just for good measure
1187     }
1188     #[cfg(test)]
1189     fn test_rwlock_cond_broadcast_helper(num_waiters: uint,
1190                                              dg1: bool,
1191                                              dg2: bool) {
1192         // Much like the mutex broadcast test. Downgrade-enabled.
1193         fn lock_cond(x: &RWLock, downgrade: bool, blk: |c: &Condvar|) {
1194             if downgrade {
1195                 x.write_downgrade(|mode| {
1196                     mode.write_cond(|c| { blk(c) });
1197                 });
1198             } else {
1199                 x.write_cond(|c| { blk(c) });
1200             }
1201         }
1202         let x = RWLock::new();
1203         let mut rxs = ~[];
1204
1205         for _ in range(0, num_waiters) {
1206             let xi = x.clone();
1207             let (tx, rx) = channel();
1208             rxs.push(rx);
1209             task::spawn(proc() {
1210                 lock_cond(&xi, dg1, |cond| {
1211                     tx.send(());
1212                     cond.wait();
1213                     tx.send(());
1214                 })
1215             });
1216         }
1217
1218         // wait until all children get in the mutex
1219         for rx in rxs.mut_iter() { let _ = rx.recv(); }
1220         lock_cond(&x, dg2, |cond| {
1221             let num_woken = cond.broadcast();
1222             assert_eq!(num_woken, num_waiters);
1223         });
1224         // wait until all children wake up
1225         for rx in rxs.mut_iter() { let _ = rx.recv(); }
1226     }
1227     #[test]
1228     fn test_rwlock_cond_broadcast() {
1229         test_rwlock_cond_broadcast_helper(0, true, true);
1230         test_rwlock_cond_broadcast_helper(0, true, false);
1231         test_rwlock_cond_broadcast_helper(0, false, true);
1232         test_rwlock_cond_broadcast_helper(0, false, false);
1233         test_rwlock_cond_broadcast_helper(12, true, true);
1234         test_rwlock_cond_broadcast_helper(12, true, false);
1235         test_rwlock_cond_broadcast_helper(12, false, true);
1236         test_rwlock_cond_broadcast_helper(12, false, false);
1237     }
1238     #[cfg(test)]
1239     fn rwlock_kill_helper(mode1: RWLockMode, mode2: RWLockMode) {
1240         use std::any::Any;
1241
1242         // Mutex must get automatically unlocked if failed/killed within.
1243         let x = RWLock::new();
1244         let x2 = x.clone();
1245
1246         let result: result::Result<(), ~Any> = task::try(proc() {
1247             lock_rwlock_in_mode(&x2, mode1, || {
1248                 fail!();
1249             })
1250         });
1251         assert!(result.is_err());
1252         // child task must have finished by the time try returns
1253         lock_rwlock_in_mode(&x, mode2, || { })
1254     }
1255     #[test]
1256     fn test_rwlock_reader_killed_writer() {
1257         rwlock_kill_helper(Read, Write);
1258     }
1259     #[test]
1260     fn test_rwlock_writer_killed_reader() {
1261         rwlock_kill_helper(Write, Read);
1262     }
1263     #[test]
1264     fn test_rwlock_reader_killed_reader() {
1265         rwlock_kill_helper(Read, Read);
1266     }
1267     #[test]
1268     fn test_rwlock_writer_killed_writer() {
1269         rwlock_kill_helper(Write, Write);
1270     }
1271     #[test]
1272     fn test_rwlock_kill_downgrader() {
1273         rwlock_kill_helper(Downgrade, Read);
1274         rwlock_kill_helper(Read, Downgrade);
1275         rwlock_kill_helper(Downgrade, Write);
1276         rwlock_kill_helper(Write, Downgrade);
1277         rwlock_kill_helper(DowngradeRead, Read);
1278         rwlock_kill_helper(Read, DowngradeRead);
1279         rwlock_kill_helper(DowngradeRead, Write);
1280         rwlock_kill_helper(Write, DowngradeRead);
1281         rwlock_kill_helper(DowngradeRead, Downgrade);
1282         rwlock_kill_helper(DowngradeRead, Downgrade);
1283         rwlock_kill_helper(Downgrade, DowngradeRead);
1284         rwlock_kill_helper(Downgrade, DowngradeRead);
1285     }
1286     #[test] #[should_fail]
1287     fn test_rwlock_downgrade_cant_swap() {
1288         // Tests that you can't downgrade with a different rwlock's token.
1289         let x = RWLock::new();
1290         let y = RWLock::new();
1291         x.write_downgrade(|xwrite| {
1292             let mut xopt = Some(xwrite);
1293             y.write_downgrade(|_ywrite| {
1294                 y.downgrade(xopt.take_unwrap());
1295                 error!("oops, y.downgrade(x) should have failed!");
1296             })
1297         })
1298     }
1299
1300     /************************************************************************
1301      * Barrier tests
1302      ************************************************************************/
1303     #[test]
1304     fn test_barrier() {
1305         let barrier = Barrier::new(10);
1306         let (tx, rx) = channel();
1307
1308         for _ in range(0, 9) {
1309             let c = barrier.clone();
1310             let tx = tx.clone();
1311             spawn(proc() {
1312                 c.wait();
1313                 tx.send(true);
1314             });
1315         }
1316
1317         // At this point, all spawned tasks should be blocked,
1318         // so we shouldn't get anything from the port
1319         assert!(match rx.try_recv() {
1320             Empty => true,
1321             _ => false,
1322         });
1323
1324         barrier.wait();
1325         // Now, the barrier is cleared and we should get data.
1326         for _ in range(0, 9) {
1327             rx.recv();
1328         }
1329     }
1330 }