]> git.lizzy.rs Git - rust.git/blob - src/libsync/mutex.rs
Ignore tests broken by failing on ICE
[rust.git] / src / libsync / mutex.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! A proper mutex implementation regardless of the "flavor of task" which is
12 //! acquiring the lock.
13
14 // # Implementation of Rust mutexes
15 //
16 // Most answers to the question of "how do I use a mutex" are "use pthreads",
17 // but for Rust this isn't quite sufficient. Green threads cannot acquire an OS
18 // mutex because they can context switch among many OS threads, leading to
19 // deadlocks with other green threads.
20 //
21 // Another problem for green threads grabbing an OS mutex is that POSIX dictates
22 // that unlocking a mutex on a different thread from where it was locked is
23 // undefined behavior. Remember that green threads can migrate among OS threads,
24 // so this would mean that we would have to pin green threads to OS threads,
25 // which is less than ideal.
26 //
27 // ## Using deschedule/reawaken
28 //
29 // We already have primitives for descheduling/reawakening tasks, so they're the
30 // first obvious choice when implementing a mutex. The idea would be to have a
31 // concurrent queue that everyone is pushed on to, and then the owner of the
32 // mutex is the one popping from the queue.
33 //
34 // Unfortunately, this is not very performant for native tasks. The suspected
35 // reason for this is that each native thread is suspended on its own condition
36 // variable, unique from all the other threads. In this situation, the kernel
37 // has no idea what the scheduling semantics are of the user program, so all of
38 // the threads are distributed among all cores on the system. This ends up
39 // having very expensive wakeups of remote cores high up in the profile when
40 // handing off the mutex among native tasks. On the other hand, when using an OS
41 // mutex, the kernel knows that all native threads are contended on the same
42 // mutex, so they're in theory all migrated to a single core (fast context
43 // switching).
44 //
45 // ## Mixing implementations
46 //
47 // From that above information, we have two constraints. The first is that
48 // green threads can't touch os mutexes, and the second is that native tasks
49 // pretty much *must* touch an os mutex.
50 //
51 // As a compromise, the queueing implementation is used for green threads and
52 // the os mutex is used for native threads (why not have both?). This ends up
53 // leading to fairly decent performance for both native threads and green
54 // threads on various workloads (uncontended and contended).
55 //
56 // The crux of this implementation is an atomic work which is CAS'd on many
57 // times in order to manage a few flags about who's blocking where and whether
58 // it's locked or not.
59
60 use std::kinds::marker;
61 use std::mem;
62 use std::rt::local::Local;
63 use std::rt::task::{BlockedTask, Task};
64 use std::rt::thread::Thread;
65 use std::sync::atomics;
66 use std::ty::Unsafe;
67 use std::unstable::mutex;
68
69 use q = mpsc_intrusive;
70
71 pub static LOCKED: uint = 1 << 0;
72 pub static GREEN_BLOCKED: uint = 1 << 1;
73 pub static NATIVE_BLOCKED: uint = 1 << 2;
74
75 /// A mutual exclusion primitive useful for protecting shared data
76 ///
77 /// This mutex is an implementation of a lock for all flavors of tasks which may
78 /// be grabbing. A common problem with green threads is that they cannot grab
79 /// locks (if they reschedule during the lock a contender could deadlock the
80 /// system), but this mutex does *not* suffer this problem.
81 ///
82 /// This mutex will properly block tasks waiting for the lock to become
83 /// available. The mutex can also be statically initialized or created via a
84 /// `new` constructor.
85 ///
86 /// # Example
87 ///
88 /// ```rust
89 /// use sync::mutex::Mutex;
90 ///
91 /// let m = Mutex::new();
92 /// let guard = m.lock();
93 /// // do some work
94 /// drop(guard); // unlock the lock
95 /// ```
96 pub struct Mutex {
97     lock: StaticMutex,
98 }
99
100 #[deriving(Eq, Show)]
101 enum Flavor {
102     Unlocked,
103     TryLockAcquisition,
104     GreenAcquisition,
105     NativeAcquisition,
106 }
107
108 /// The static mutex type is provided to allow for static allocation of mutexes.
109 ///
110 /// Note that this is a separate type because using a Mutex correctly means that
111 /// it needs to have a destructor run. In Rust, statics are not allowed to have
112 /// destructors. As a result, a `StaticMutex` has one extra method when compared
113 /// to a `Mutex`, a `destroy` method. This method is unsafe to call, and
114 /// documentation can be found directly on the method.
115 ///
116 /// # Example
117 ///
118 /// ```rust
119 /// use sync::mutex::{StaticMutex, MUTEX_INIT};
120 ///
121 /// static mut LOCK: StaticMutex = MUTEX_INIT;
122 ///
123 /// unsafe {
124 ///     let _g = LOCK.lock();
125 ///     // do some productive work
126 /// }
127 /// // lock is unlocked here.
128 /// ```
129 pub struct StaticMutex {
130     /// Current set of flags on this mutex
131     state: atomics::AtomicUint,
132     /// an OS mutex used by native threads
133     lock: mutex::StaticNativeMutex,
134
135     /// Type of locking operation currently on this mutex
136     flavor: Unsafe<Flavor>,
137     /// uint-cast of the green thread waiting for this mutex
138     green_blocker: Unsafe<uint>,
139     /// uint-cast of the native thread waiting for this mutex
140     native_blocker: Unsafe<uint>,
141
142     /// A concurrent mpsc queue used by green threads, along with a count used
143     /// to figure out when to dequeue and enqueue.
144     q: q::Queue<uint>,
145     green_cnt: atomics::AtomicUint,
146 }
147
148 /// An RAII implementation of a "scoped lock" of a mutex. When this structure is
149 /// dropped (falls out of scope), the lock will be unlocked.
150 #[must_use]
151 pub struct Guard<'a> {
152     lock: &'a StaticMutex,
153 }
154
155 /// Static initialization of a mutex. This constant can be used to initialize
156 /// other mutex constants.
157 pub static MUTEX_INIT: StaticMutex = StaticMutex {
158     lock: mutex::NATIVE_MUTEX_INIT,
159     state: atomics::INIT_ATOMIC_UINT,
160     flavor: Unsafe { value: Unlocked, marker1: marker::InvariantType },
161     green_blocker: Unsafe { value: 0, marker1: marker::InvariantType },
162     native_blocker: Unsafe { value: 0, marker1: marker::InvariantType },
163     green_cnt: atomics::INIT_ATOMIC_UINT,
164     q: q::Queue {
165         head: atomics::INIT_ATOMIC_UINT,
166         tail: Unsafe {
167             value: 0 as *mut q::Node<uint>,
168             marker1: marker::InvariantType,
169         },
170         stub: q::DummyNode {
171             next: atomics::INIT_ATOMIC_UINT,
172         }
173     }
174 };
175
176 impl StaticMutex {
177     /// Attempts to grab this lock, see `Mutex::try_lock`
178     pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
179         // Attempt to steal the mutex from an unlocked state.
180         //
181         // FIXME: this can mess up the fairness of the mutex, seems bad
182         match self.state.compare_and_swap(0, LOCKED, atomics::SeqCst) {
183             0 => {
184                 // After acquiring the mutex, we can safely access the inner
185                 // fields.
186                 let prev = unsafe {
187                     mem::replace(&mut *self.flavor.get(), TryLockAcquisition)
188                 };
189                 assert_eq!(prev, Unlocked);
190                 Some(Guard::new(self))
191             }
192             _ => None
193         }
194     }
195
196     /// Acquires this lock, see `Mutex::lock`
197     pub fn lock<'a>(&'a self) -> Guard<'a> {
198         // First, attempt to steal the mutex from an unlocked state. The "fast
199         // path" needs to have as few atomic instructions as possible, and this
200         // one cmpxchg is already pretty expensive.
201         //
202         // FIXME: this can mess up the fairness of the mutex, seems bad
203         match self.try_lock() {
204             Some(guard) => return guard,
205             None => {}
206         }
207
208         // After we've failed the fast path, then we delegate to the differnet
209         // locking protocols for green/native tasks. This will select two tasks
210         // to continue further (one native, one green).
211         let t: ~Task = Local::take();
212         let can_block = t.can_block();
213         let native_bit;
214         if can_block {
215             self.native_lock(t);
216             native_bit = NATIVE_BLOCKED;
217         } else {
218             self.green_lock(t);
219             native_bit = GREEN_BLOCKED;
220         }
221
222         // After we've arbitrated among task types, attempt to re-acquire the
223         // lock (avoids a deschedule). This is very important to do in order to
224         // allow threads coming out of the native_lock function to try their
225         // best to not hit a cvar in deschedule.
226         let mut old = match self.state.compare_and_swap(0, LOCKED,
227                                                         atomics::SeqCst) {
228             0 => {
229                 let flavor = if can_block {
230                     NativeAcquisition
231                 } else {
232                     GreenAcquisition
233                 };
234                 // We've acquired the lock, so this unsafe access to flavor is
235                 // allowed.
236                 unsafe { *self.flavor.get() = flavor; }
237                 return Guard::new(self)
238             }
239             old => old,
240         };
241
242         // Alright, everything else failed. We need to deschedule ourselves and
243         // flag ourselves as waiting. Note that this case should only happen
244         // regularly in native/green contention. Due to try_lock and the header
245         // of lock stealing the lock, it's also possible for native/native
246         // contention to hit this location, but as less common.
247         let t: ~Task = Local::take();
248         t.deschedule(1, |task| {
249             let task = unsafe { task.cast_to_uint() };
250
251             // These accesses are protected by the respective native/green
252             // mutexes which were acquired above.
253             let prev = if can_block {
254                 unsafe { mem::replace(&mut *self.native_blocker.get(), task) }
255             } else {
256                 unsafe { mem::replace(&mut *self.green_blocker.get(), task) }
257             };
258             assert_eq!(prev, 0);
259
260             loop {
261                 assert_eq!(old & native_bit, 0);
262                 // If the old state was locked, then we need to flag ourselves
263                 // as blocking in the state. If the old state was unlocked, then
264                 // we attempt to acquire the mutex. Everything here is a CAS
265                 // loop that'll eventually make progress.
266                 if old & LOCKED != 0 {
267                     old = match self.state.compare_and_swap(old,
268                                                             old | native_bit,
269                                                             atomics::SeqCst) {
270                         n if n == old => return Ok(()),
271                         n => n
272                     };
273                 } else {
274                     assert_eq!(old, 0);
275                     old = match self.state.compare_and_swap(old,
276                                                             old | LOCKED,
277                                                             atomics::SeqCst) {
278                         n if n == old => {
279                             // After acquiring the lock, we have access to the
280                             // flavor field, and we've regained access to our
281                             // respective native/green blocker field.
282                             let prev = if can_block {
283                                 unsafe {
284                                     *self.native_blocker.get() = 0;
285                                     mem::replace(&mut *self.flavor.get(),
286                                                  NativeAcquisition)
287                                 }
288                             } else {
289                                 unsafe {
290                                     *self.green_blocker.get() = 0;
291                                     mem::replace(&mut *self.flavor.get(),
292                                                  GreenAcquisition)
293                                 }
294                             };
295                             assert_eq!(prev, Unlocked);
296                             return Err(unsafe {
297                                 BlockedTask::cast_from_uint(task)
298                             })
299                         }
300                         n => n,
301                     };
302                 }
303             }
304         });
305
306         Guard::new(self)
307     }
308
309     // Tasks which can block are super easy. These tasks just call the blocking
310     // `lock()` function on an OS mutex
311     fn native_lock(&self, t: ~Task) {
312         Local::put(t);
313         unsafe { self.lock.lock_noguard(); }
314     }
315
316     fn native_unlock(&self) {
317         unsafe { self.lock.unlock_noguard(); }
318     }
319
320     fn green_lock(&self, t: ~Task) {
321         // Green threads flag their presence with an atomic counter, and if they
322         // fail to be the first to the mutex, they enqueue themselves on a
323         // concurrent internal queue with a stack-allocated node.
324         //
325         // FIXME: There isn't a cancellation currently of an enqueue, forcing
326         //        the unlocker to spin for a bit.
327         if self.green_cnt.fetch_add(1, atomics::SeqCst) == 0 {
328             Local::put(t);
329             return
330         }
331
332         let mut node = q::Node::new(0);
333         t.deschedule(1, |task| {
334             unsafe {
335                 node.data = task.cast_to_uint();
336                 self.q.push(&mut node);
337             }
338             Ok(())
339         });
340     }
341
342     fn green_unlock(&self) {
343         // If we're the only green thread, then no need to check the queue,
344         // otherwise the fixme above forces us to spin for a bit.
345         if self.green_cnt.fetch_sub(1, atomics::SeqCst) == 1 { return }
346         let node;
347         loop {
348             match unsafe { self.q.pop() } {
349                 Some(t) => { node = t; break; }
350                 None => Thread::yield_now(),
351             }
352         }
353         let task = unsafe { BlockedTask::cast_from_uint((*node).data) };
354         task.wake().map(|t| t.reawaken());
355     }
356
357     fn unlock(&self) {
358         // Unlocking this mutex is a little tricky. We favor any task that is
359         // manually blocked (not in each of the separate locks) in order to help
360         // provide a little fairness (green threads will wake up the pending
361         // native thread and native threads will wake up the pending green
362         // thread).
363         //
364         // There's also the question of when we unlock the actual green/native
365         // locking halves as well. If we're waking up someone, then we can wait
366         // to unlock until we've acquired the task to wake up (we're guaranteed
367         // the mutex memory is still valid when there's contenders), but as soon
368         // as we don't find any contenders we must unlock the mutex, and *then*
369         // flag the mutex as unlocked.
370         //
371         // This flagging can fail, leading to another round of figuring out if a
372         // task needs to be woken, and in this case it's ok that the "mutex
373         // halves" are unlocked, we're just mainly dealing with the atomic state
374         // of the outer mutex.
375         let flavor = unsafe { mem::replace(&mut *self.flavor.get(), Unlocked) };
376
377         let mut state = self.state.load(atomics::SeqCst);
378         let mut unlocked = false;
379         let task;
380         loop {
381             assert!(state & LOCKED != 0);
382             if state & GREEN_BLOCKED != 0 {
383                 self.unset(state, GREEN_BLOCKED);
384                 task = unsafe {
385                     *self.flavor.get() = GreenAcquisition;
386                     let task = mem::replace(&mut *self.green_blocker.get(), 0);
387                     BlockedTask::cast_from_uint(task)
388                 };
389                 break;
390             } else if state & NATIVE_BLOCKED != 0 {
391                 self.unset(state, NATIVE_BLOCKED);
392                 task = unsafe {
393                     *self.flavor.get() = NativeAcquisition;
394                     let task = mem::replace(&mut *self.native_blocker.get(), 0);
395                     BlockedTask::cast_from_uint(task)
396                 };
397                 break;
398             } else {
399                 assert_eq!(state, LOCKED);
400                 if !unlocked {
401                     match flavor {
402                         GreenAcquisition => { self.green_unlock(); }
403                         NativeAcquisition => { self.native_unlock(); }
404                         TryLockAcquisition => {}
405                         Unlocked => unreachable!()
406                     }
407                     unlocked = true;
408                 }
409                 match self.state.compare_and_swap(LOCKED, 0, atomics::SeqCst) {
410                     LOCKED => return,
411                     n => { state = n; }
412                 }
413             }
414         }
415         if !unlocked {
416             match flavor {
417                 GreenAcquisition => { self.green_unlock(); }
418                 NativeAcquisition => { self.native_unlock(); }
419                 TryLockAcquisition => {}
420                 Unlocked => unreachable!()
421             }
422         }
423
424         task.wake().map(|t| t.reawaken());
425     }
426
427     /// Loops around a CAS to unset the `bit` in `state`
428     fn unset(&self, mut state: uint, bit: uint) {
429         loop {
430             assert!(state & bit != 0);
431             let new = state ^ bit;
432             match self.state.compare_and_swap(state, new, atomics::SeqCst) {
433                 n if n == state => break,
434                 n => { state = n; }
435             }
436         }
437     }
438
439     /// Deallocates resources associated with this static mutex.
440     ///
441     /// This method is unsafe because it provides no guarantees that there are
442     /// no active users of this mutex, and safety is not guaranteed if there are
443     /// active users of this mutex.
444     ///
445     /// This method is required to ensure that there are no memory leaks on
446     /// *all* platforms. It may be the case that some platforms do not leak
447     /// memory if this method is not called, but this is not guaranteed to be
448     /// true on all platforms.
449     pub unsafe fn destroy(&self) {
450         self.lock.destroy()
451     }
452 }
453
454 impl Mutex {
455     /// Creates a new mutex in an unlocked state ready for use.
456     pub fn new() -> Mutex {
457         Mutex {
458             lock: StaticMutex {
459                 state: atomics::AtomicUint::new(0),
460                 flavor: Unsafe::new(Unlocked),
461                 green_blocker: Unsafe::new(0),
462                 native_blocker: Unsafe::new(0),
463                 green_cnt: atomics::AtomicUint::new(0),
464                 q: q::Queue::new(),
465                 lock: unsafe { mutex::StaticNativeMutex::new() },
466             }
467         }
468     }
469
470     /// Attempts to acquire this lock.
471     ///
472     /// If the lock could not be acquired at this time, then `None` is returned.
473     /// Otherwise, an RAII guard is returned. The lock will be unlocked when the
474     /// guard is dropped.
475     ///
476     /// This function does not block.
477     pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
478         self.lock.try_lock()
479     }
480
481     /// Acquires a mutex, blocking the current task until it is able to do so.
482     ///
483     /// This function will block the local task until it is available to acquire
484     /// the mutex. Upon returning, the task is the only task with the mutex
485     /// held. An RAII guard is returned to allow scoped unlock of the lock. When
486     /// the guard goes out of scope, the mutex will be unlocked.
487     pub fn lock<'a>(&'a self) -> Guard<'a> { self.lock.lock() }
488 }
489
490 impl<'a> Guard<'a> {
491     fn new<'b>(lock: &'b StaticMutex) -> Guard<'b> {
492         if cfg!(debug) {
493             // once we've acquired a lock, it's ok to access the flavor
494             assert!(unsafe { *lock.flavor.get() != Unlocked });
495             assert!(lock.state.load(atomics::SeqCst) & LOCKED != 0);
496         }
497         Guard { lock: lock }
498     }
499 }
500
501 #[unsafe_destructor]
502 impl<'a> Drop for Guard<'a> {
503     #[inline]
504     fn drop(&mut self) {
505         self.lock.unlock();
506     }
507 }
508
509 impl Drop for Mutex {
510     fn drop(&mut self) {
511         // This is actually safe b/c we know that there is no further usage of
512         // this mutex (it's up to the user to arrange for a mutex to get
513         // dropped, that's not our job)
514         unsafe { self.lock.destroy() }
515     }
516 }
517
518 #[cfg(test)]
519 mod test {
520     extern crate native;
521     use super::{Mutex, StaticMutex, MUTEX_INIT};
522
523     #[test]
524     fn smoke() {
525         let m = Mutex::new();
526         drop(m.lock());
527         drop(m.lock());
528     }
529
530     #[test]
531     fn smoke_static() {
532         static mut m: StaticMutex = MUTEX_INIT;
533         unsafe {
534             drop(m.lock());
535             drop(m.lock());
536             m.destroy();
537         }
538     }
539
540     #[test]
541     fn lots_and_lots() {
542         static mut m: StaticMutex = MUTEX_INIT;
543         static mut CNT: uint = 0;
544         static M: uint = 1000;
545         static N: uint = 3;
546
547         fn inc() {
548             for _ in range(0, M) {
549                 unsafe {
550                     let _g = m.lock();
551                     CNT += 1;
552                 }
553             }
554         }
555
556         let (tx, rx) = channel();
557         for _ in range(0, N) {
558             let tx2 = tx.clone();
559             native::task::spawn(proc() { inc(); tx2.send(()); });
560             let tx2 = tx.clone();
561             spawn(proc() { inc(); tx2.send(()); });
562         }
563
564         drop(tx);
565         for _ in range(0, 2 * N) {
566             rx.recv();
567         }
568         assert_eq!(unsafe {CNT}, M * N * 2);
569         unsafe {
570             m.destroy();
571         }
572     }
573
574     #[test]
575     fn trylock() {
576         let m = Mutex::new();
577         assert!(m.try_lock().is_some());
578     }
579 }