1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! A proper mutex implementation regardless of the "flavor of task" which is
12 //! acquiring the lock.
14 // # Implementation of Rust mutexes
16 // Most answers to the question of "how do I use a mutex" are "use pthreads",
17 // but for Rust this isn't quite sufficient. Green threads cannot acquire an OS
18 // mutex because they can context switch among many OS threads, leading to
19 // deadlocks with other green threads.
21 // Another problem for green threads grabbing an OS mutex is that POSIX dictates
22 // that unlocking a mutex on a different thread from where it was locked is
23 // undefined behavior. Remember that green threads can migrate among OS threads,
24 // so this would mean that we would have to pin green threads to OS threads,
25 // which is less than ideal.
27 // ## Using deschedule/reawaken
29 // We already have primitives for descheduling/reawakening tasks, so they're the
30 // first obvious choice when implementing a mutex. The idea would be to have a
31 // concurrent queue that everyone is pushed on to, and then the owner of the
32 // mutex is the one popping from the queue.
34 // Unfortunately, this is not very performant for native tasks. The suspected
35 // reason for this is that each native thread is suspended on its own condition
36 // variable, unique from all the other threads. In this situation, the kernel
37 // has no idea what the scheduling semantics are of the user program, so all of
38 // the threads are distributed among all cores on the system. This ends up
39 // having very expensive wakeups of remote cores high up in the profile when
40 // handing off the mutex among native tasks. On the other hand, when using an OS
41 // mutex, the kernel knows that all native threads are contended on the same
42 // mutex, so they're in theory all migrated to a single core (fast context
45 // ## Mixing implementations
47 // From that above information, we have two constraints. The first is that
48 // green threads can't touch os mutexes, and the second is that native tasks
49 // pretty much *must* touch an os mutex.
51 // As a compromise, the queueing implementation is used for green threads and
52 // the os mutex is used for native threads (why not have both?). This ends up
53 // leading to fairly decent performance for both native threads and green
54 // threads on various workloads (uncontended and contended).
56 // The crux of this implementation is an atomic work which is CAS'd on many
57 // times in order to manage a few flags about who's blocking where and whether
58 // it's locked or not.
62 use alloc::owned::Box;
64 use core::kinds::marker;
67 use rustrt::local::Local;
69 use rustrt::task::{BlockedTask, Task};
70 use rustrt::thread::Thread;
72 use q = mpsc_intrusive;
74 pub static LOCKED: uint = 1 << 0;
75 pub static GREEN_BLOCKED: uint = 1 << 1;
76 pub static NATIVE_BLOCKED: uint = 1 << 2;
78 /// A mutual exclusion primitive useful for protecting shared data
80 /// This mutex is an implementation of a lock for all flavors of tasks which may
81 /// be grabbing. A common problem with green threads is that they cannot grab
82 /// locks (if they reschedule during the lock a contender could deadlock the
83 /// system), but this mutex does *not* suffer this problem.
85 /// This mutex will properly block tasks waiting for the lock to become
86 /// available. The mutex can also be statically initialized or created via a
87 /// `new` constructor.
92 /// use sync::mutex::Mutex;
94 /// let m = Mutex::new();
95 /// let guard = m.lock();
97 /// drop(guard); // unlock the lock
100 // Note that this static mutex is in a *box*, not inlined into the struct
101 // itself. This is done for memory safety reasons with the usage of a
102 // StaticNativeMutex inside the static mutex above. Once a native mutex has
103 // been used once, its address can never change (it can't be moved). This
104 // mutex type can be safely moved at any time, so to ensure that the native
105 // mutex is used correctly we box the inner lock to give it a constant
107 lock: Box<StaticMutex>,
110 #[deriving(PartialEq, Show)]
118 /// The static mutex type is provided to allow for static allocation of mutexes.
120 /// Note that this is a separate type because using a Mutex correctly means that
121 /// it needs to have a destructor run. In Rust, statics are not allowed to have
122 /// destructors. As a result, a `StaticMutex` has one extra method when compared
123 /// to a `Mutex`, a `destroy` method. This method is unsafe to call, and
124 /// documentation can be found directly on the method.
129 /// use sync::mutex::{StaticMutex, MUTEX_INIT};
131 /// static mut LOCK: StaticMutex = MUTEX_INIT;
134 /// let _g = LOCK.lock();
135 /// // do some productive work
137 /// // lock is unlocked here.
139 pub struct StaticMutex {
140 /// Current set of flags on this mutex
141 state: atomics::AtomicUint,
142 /// an OS mutex used by native threads
143 lock: mutex::StaticNativeMutex,
145 /// Type of locking operation currently on this mutex
146 flavor: Unsafe<Flavor>,
147 /// uint-cast of the green thread waiting for this mutex
148 green_blocker: Unsafe<uint>,
149 /// uint-cast of the native thread waiting for this mutex
150 native_blocker: Unsafe<uint>,
152 /// A concurrent mpsc queue used by green threads, along with a count used
153 /// to figure out when to dequeue and enqueue.
155 green_cnt: atomics::AtomicUint,
158 /// An RAII implementation of a "scoped lock" of a mutex. When this structure is
159 /// dropped (falls out of scope), the lock will be unlocked.
161 pub struct Guard<'a> {
162 lock: &'a StaticMutex,
165 /// Static initialization of a mutex. This constant can be used to initialize
166 /// other mutex constants.
167 pub static MUTEX_INIT: StaticMutex = StaticMutex {
168 lock: mutex::NATIVE_MUTEX_INIT,
169 state: atomics::INIT_ATOMIC_UINT,
170 flavor: Unsafe { value: Unlocked, marker1: marker::InvariantType },
171 green_blocker: Unsafe { value: 0, marker1: marker::InvariantType },
172 native_blocker: Unsafe { value: 0, marker1: marker::InvariantType },
173 green_cnt: atomics::INIT_ATOMIC_UINT,
175 head: atomics::INIT_ATOMIC_UINT,
177 value: 0 as *mut q::Node<uint>,
178 marker1: marker::InvariantType,
181 next: atomics::INIT_ATOMIC_UINT,
187 /// Attempts to grab this lock, see `Mutex::try_lock`
188 pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
189 // Attempt to steal the mutex from an unlocked state.
191 // FIXME: this can mess up the fairness of the mutex, seems bad
192 match self.state.compare_and_swap(0, LOCKED, atomics::SeqCst) {
194 // After acquiring the mutex, we can safely access the inner
197 mem::replace(&mut *self.flavor.get(), TryLockAcquisition)
199 assert_eq!(prev, Unlocked);
200 Some(Guard::new(self))
206 /// Acquires this lock, see `Mutex::lock`
207 pub fn lock<'a>(&'a self) -> Guard<'a> {
208 // First, attempt to steal the mutex from an unlocked state. The "fast
209 // path" needs to have as few atomic instructions as possible, and this
210 // one cmpxchg is already pretty expensive.
212 // FIXME: this can mess up the fairness of the mutex, seems bad
213 match self.try_lock() {
214 Some(guard) => return guard,
218 // After we've failed the fast path, then we delegate to the different
219 // locking protocols for green/native tasks. This will select two tasks
220 // to continue further (one native, one green).
221 let t: Box<Task> = Local::take();
222 let can_block = t.can_block();
226 native_bit = NATIVE_BLOCKED;
229 native_bit = GREEN_BLOCKED;
232 // After we've arbitrated among task types, attempt to re-acquire the
233 // lock (avoids a deschedule). This is very important to do in order to
234 // allow threads coming out of the native_lock function to try their
235 // best to not hit a cvar in deschedule.
236 let mut old = match self.state.compare_and_swap(0, LOCKED,
239 let flavor = if can_block {
244 // We've acquired the lock, so this unsafe access to flavor is
246 unsafe { *self.flavor.get() = flavor; }
247 return Guard::new(self)
252 // Alright, everything else failed. We need to deschedule ourselves and
253 // flag ourselves as waiting. Note that this case should only happen
254 // regularly in native/green contention. Due to try_lock and the header
255 // of lock stealing the lock, it's also possible for native/native
256 // contention to hit this location, but as less common.
257 let t: Box<Task> = Local::take();
258 t.deschedule(1, |task| {
259 let task = unsafe { task.cast_to_uint() };
261 // These accesses are protected by the respective native/green
262 // mutexes which were acquired above.
263 let prev = if can_block {
264 unsafe { mem::replace(&mut *self.native_blocker.get(), task) }
266 unsafe { mem::replace(&mut *self.green_blocker.get(), task) }
271 assert_eq!(old & native_bit, 0);
272 // If the old state was locked, then we need to flag ourselves
273 // as blocking in the state. If the old state was unlocked, then
274 // we attempt to acquire the mutex. Everything here is a CAS
275 // loop that'll eventually make progress.
276 if old & LOCKED != 0 {
277 old = match self.state.compare_and_swap(old,
280 n if n == old => return Ok(()),
285 old = match self.state.compare_and_swap(old,
289 // After acquiring the lock, we have access to the
290 // flavor field, and we've regained access to our
291 // respective native/green blocker field.
292 let prev = if can_block {
294 *self.native_blocker.get() = 0;
295 mem::replace(&mut *self.flavor.get(),
300 *self.green_blocker.get() = 0;
301 mem::replace(&mut *self.flavor.get(),
305 assert_eq!(prev, Unlocked);
307 BlockedTask::cast_from_uint(task)
319 // Tasks which can block are super easy. These tasks just call the blocking
320 // `lock()` function on an OS mutex
321 fn native_lock(&self, t: Box<Task>) {
323 unsafe { self.lock.lock_noguard(); }
326 fn native_unlock(&self) {
327 unsafe { self.lock.unlock_noguard(); }
330 fn green_lock(&self, t: Box<Task>) {
331 // Green threads flag their presence with an atomic counter, and if they
332 // fail to be the first to the mutex, they enqueue themselves on a
333 // concurrent internal queue with a stack-allocated node.
335 // FIXME: There isn't a cancellation currently of an enqueue, forcing
336 // the unlocker to spin for a bit.
337 if self.green_cnt.fetch_add(1, atomics::SeqCst) == 0 {
342 let mut node = q::Node::new(0);
343 t.deschedule(1, |task| {
345 node.data = task.cast_to_uint();
346 self.q.push(&mut node);
352 fn green_unlock(&self) {
353 // If we're the only green thread, then no need to check the queue,
354 // otherwise the fixme above forces us to spin for a bit.
355 if self.green_cnt.fetch_sub(1, atomics::SeqCst) == 1 { return }
358 match unsafe { self.q.pop() } {
359 Some(t) => { node = t; break; }
360 None => Thread::yield_now(),
363 let task = unsafe { BlockedTask::cast_from_uint((*node).data) };
364 task.wake().map(|t| t.reawaken());
368 // Unlocking this mutex is a little tricky. We favor any task that is
369 // manually blocked (not in each of the separate locks) in order to help
370 // provide a little fairness (green threads will wake up the pending
371 // native thread and native threads will wake up the pending green
374 // There's also the question of when we unlock the actual green/native
375 // locking halves as well. If we're waking up someone, then we can wait
376 // to unlock until we've acquired the task to wake up (we're guaranteed
377 // the mutex memory is still valid when there's contenders), but as soon
378 // as we don't find any contenders we must unlock the mutex, and *then*
379 // flag the mutex as unlocked.
381 // This flagging can fail, leading to another round of figuring out if a
382 // task needs to be woken, and in this case it's ok that the "mutex
383 // halves" are unlocked, we're just mainly dealing with the atomic state
384 // of the outer mutex.
385 let flavor = unsafe { mem::replace(&mut *self.flavor.get(), Unlocked) };
387 let mut state = self.state.load(atomics::SeqCst);
388 let mut unlocked = false;
391 assert!(state & LOCKED != 0);
392 if state & GREEN_BLOCKED != 0 {
393 self.unset(state, GREEN_BLOCKED);
395 *self.flavor.get() = GreenAcquisition;
396 let task = mem::replace(&mut *self.green_blocker.get(), 0);
397 BlockedTask::cast_from_uint(task)
400 } else if state & NATIVE_BLOCKED != 0 {
401 self.unset(state, NATIVE_BLOCKED);
403 *self.flavor.get() = NativeAcquisition;
404 let task = mem::replace(&mut *self.native_blocker.get(), 0);
405 BlockedTask::cast_from_uint(task)
409 assert_eq!(state, LOCKED);
412 GreenAcquisition => { self.green_unlock(); }
413 NativeAcquisition => { self.native_unlock(); }
414 TryLockAcquisition => {}
415 Unlocked => unreachable!(),
419 match self.state.compare_and_swap(LOCKED, 0, atomics::SeqCst) {
427 GreenAcquisition => { self.green_unlock(); }
428 NativeAcquisition => { self.native_unlock(); }
429 TryLockAcquisition => {}
430 Unlocked => unreachable!(),
434 task.wake().map(|t| t.reawaken());
437 /// Loops around a CAS to unset the `bit` in `state`
438 fn unset(&self, mut state: uint, bit: uint) {
440 assert!(state & bit != 0);
441 let new = state ^ bit;
442 match self.state.compare_and_swap(state, new, atomics::SeqCst) {
443 n if n == state => break,
449 /// Deallocates resources associated with this static mutex.
451 /// This method is unsafe because it provides no guarantees that there are
452 /// no active users of this mutex, and safety is not guaranteed if there are
453 /// active users of this mutex.
455 /// This method is required to ensure that there are no memory leaks on
456 /// *all* platforms. It may be the case that some platforms do not leak
457 /// memory if this method is not called, but this is not guaranteed to be
458 /// true on all platforms.
459 pub unsafe fn destroy(&self) {
465 /// Creates a new mutex in an unlocked state ready for use.
466 pub fn new() -> Mutex {
468 lock: box StaticMutex {
469 state: atomics::AtomicUint::new(0),
470 flavor: Unsafe::new(Unlocked),
471 green_blocker: Unsafe::new(0),
472 native_blocker: Unsafe::new(0),
473 green_cnt: atomics::AtomicUint::new(0),
475 lock: unsafe { mutex::StaticNativeMutex::new() },
480 /// Attempts to acquire this lock.
482 /// If the lock could not be acquired at this time, then `None` is returned.
483 /// Otherwise, an RAII guard is returned. The lock will be unlocked when the
484 /// guard is dropped.
486 /// This function does not block.
487 pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
491 /// Acquires a mutex, blocking the current task until it is able to do so.
493 /// This function will block the local task until it is available to acquire
494 /// the mutex. Upon returning, the task is the only task with the mutex
495 /// held. An RAII guard is returned to allow scoped unlock of the lock. When
496 /// the guard goes out of scope, the mutex will be unlocked.
497 pub fn lock<'a>(&'a self) -> Guard<'a> { self.lock.lock() }
501 fn new<'b>(lock: &'b StaticMutex) -> Guard<'b> {
503 // once we've acquired a lock, it's ok to access the flavor
504 assert!(unsafe { *lock.flavor.get() != Unlocked });
505 assert!(lock.state.load(atomics::SeqCst) & LOCKED != 0);
512 impl<'a> Drop for Guard<'a> {
519 impl Drop for Mutex {
521 // This is actually safe b/c we know that there is no further usage of
522 // this mutex (it's up to the user to arrange for a mutex to get
523 // dropped, that's not our job)
524 unsafe { self.lock.destroy() }
531 use super::{Mutex, StaticMutex, MUTEX_INIT};
536 let m = Mutex::new();
543 static mut m: StaticMutex = MUTEX_INIT;
553 static mut m: StaticMutex = MUTEX_INIT;
554 static mut CNT: uint = 0;
555 static M: uint = 1000;
559 for _ in range(0, M) {
567 let (tx, rx) = channel();
568 for _ in range(0, N) {
569 let tx2 = tx.clone();
570 native::task::spawn(proc() { inc(); tx2.send(()); });
571 let tx2 = tx.clone();
572 spawn(proc() { inc(); tx2.send(()); });
576 for _ in range(0, 2 * N) {
579 assert_eq!(unsafe {CNT}, M * N * 2);
587 let m = Mutex::new();
588 assert!(m.try_lock().is_some());