1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! A proper mutex implementation regardless of the "flavor of task" which is
12 //! acquiring the lock.
14 // # Implementation of Rust mutexes
16 // Most answers to the question of "how do I use a mutex" are "use pthreads",
17 // but for Rust this isn't quite sufficient. Green threads cannot acquire an OS
18 // mutex because they can context switch among many OS threads, leading to
19 // deadlocks with other green threads.
21 // Another problem for green threads grabbing an OS mutex is that POSIX dictates
22 // that unlocking a mutex on a different thread from where it was locked is
23 // undefined behavior. Remember that green threads can migrate among OS threads,
24 // so this would mean that we would have to pin green threads to OS threads,
25 // which is less than ideal.
27 // ## Using deschedule/reawaken
29 // We already have primitives for descheduling/reawakening tasks, so they're the
30 // first obvious choice when implementing a mutex. The idea would be to have a
31 // concurrent queue that everyone is pushed on to, and then the owner of the
32 // mutex is the one popping from the queue.
34 // Unfortunately, this is not very performant for native tasks. The suspected
35 // reason for this is that each native thread is suspended on its own condition
36 // variable, unique from all the other threads. In this situation, the kernel
37 // has no idea what the scheduling semantics are of the user program, so all of
38 // the threads are distributed among all cores on the system. This ends up
39 // having very expensive wakeups of remote cores high up in the profile when
40 // handing off the mutex among native tasks. On the other hand, when using an OS
41 // mutex, the kernel knows that all native threads are contended on the same
42 // mutex, so they're in theory all migrated to a single core (fast context
45 // ## Mixing implementations
47 // From that above information, we have two constraints. The first is that
48 // green threads can't touch os mutexes, and the second is that native tasks
49 // pretty much *must* touch an os mutex.
51 // As a compromise, the queueing implementation is used for green threads and
52 // the os mutex is used for native threads (why not have both?). This ends up
53 // leading to fairly decent performance for both native threads and green
54 // threads on various workloads (uncontended and contended).
56 // The crux of this implementation is an atomic work which is CAS'd on many
57 // times in order to manage a few flags about who's blocking where and whether
58 // it's locked or not.
60 use std::kinds::marker;
62 use std::rt::local::Local;
63 use std::rt::task::{BlockedTask, Task};
64 use std::rt::thread::Thread;
65 use std::sync::atomics;
67 use std::unstable::mutex;
69 use q = mpsc_intrusive;
71 pub static LOCKED: uint = 1 << 0;
72 pub static GREEN_BLOCKED: uint = 1 << 1;
73 pub static NATIVE_BLOCKED: uint = 1 << 2;
75 /// A mutual exclusion primitive useful for protecting shared data
77 /// This mutex is an implementation of a lock for all flavors of tasks which may
78 /// be grabbing. A common problem with green threads is that they cannot grab
79 /// locks (if they reschedule during the lock a contender could deadlock the
80 /// system), but this mutex does *not* suffer this problem.
82 /// This mutex will properly block tasks waiting for the lock to become
83 /// available. The mutex can also be statically initialized or created via a
84 /// `new` constructor.
89 /// use sync::mutex::Mutex;
91 /// let m = Mutex::new();
92 /// let guard = m.lock();
94 /// drop(guard); // unlock the lock
100 #[deriving(Eq, Show)]
108 /// The static mutex type is provided to allow for static allocation of mutexes.
110 /// Note that this is a separate type because using a Mutex correctly means that
111 /// it needs to have a destructor run. In Rust, statics are not allowed to have
112 /// destructors. As a result, a `StaticMutex` has one extra method when compared
113 /// to a `Mutex`, a `destroy` method. This method is unsafe to call, and
114 /// documentation can be found directly on the method.
119 /// use sync::mutex::{StaticMutex, MUTEX_INIT};
121 /// static mut LOCK: StaticMutex = MUTEX_INIT;
124 /// let _g = LOCK.lock();
125 /// // do some productive work
127 /// // lock is unlocked here.
129 pub struct StaticMutex {
130 /// Current set of flags on this mutex
131 state: atomics::AtomicUint,
132 /// an OS mutex used by native threads
133 lock: mutex::StaticNativeMutex,
135 /// Type of locking operation currently on this mutex
136 flavor: Unsafe<Flavor>,
137 /// uint-cast of the green thread waiting for this mutex
138 green_blocker: Unsafe<uint>,
139 /// uint-cast of the native thread waiting for this mutex
140 native_blocker: Unsafe<uint>,
142 /// A concurrent mpsc queue used by green threads, along with a count used
143 /// to figure out when to dequeue and enqueue.
145 green_cnt: atomics::AtomicUint,
148 /// An RAII implementation of a "scoped lock" of a mutex. When this structure is
149 /// dropped (falls out of scope), the lock will be unlocked.
151 pub struct Guard<'a> {
152 lock: &'a StaticMutex,
155 /// Static initialization of a mutex. This constant can be used to initialize
156 /// other mutex constants.
157 pub static MUTEX_INIT: StaticMutex = StaticMutex {
158 lock: mutex::NATIVE_MUTEX_INIT,
159 state: atomics::INIT_ATOMIC_UINT,
160 flavor: Unsafe { value: Unlocked, marker1: marker::InvariantType },
161 green_blocker: Unsafe { value: 0, marker1: marker::InvariantType },
162 native_blocker: Unsafe { value: 0, marker1: marker::InvariantType },
163 green_cnt: atomics::INIT_ATOMIC_UINT,
165 head: atomics::INIT_ATOMIC_UINT,
167 value: 0 as *mut q::Node<uint>,
168 marker1: marker::InvariantType,
171 next: atomics::INIT_ATOMIC_UINT,
177 /// Attempts to grab this lock, see `Mutex::try_lock`
178 pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
179 // Attempt to steal the mutex from an unlocked state.
181 // FIXME: this can mess up the fairness of the mutex, seems bad
182 match self.state.compare_and_swap(0, LOCKED, atomics::SeqCst) {
184 // After acquiring the mutex, we can safely access the inner
187 mem::replace(&mut *self.flavor.get(), TryLockAcquisition)
189 assert_eq!(prev, Unlocked);
190 Some(Guard::new(self))
196 /// Acquires this lock, see `Mutex::lock`
197 pub fn lock<'a>(&'a self) -> Guard<'a> {
198 // First, attempt to steal the mutex from an unlocked state. The "fast
199 // path" needs to have as few atomic instructions as possible, and this
200 // one cmpxchg is already pretty expensive.
202 // FIXME: this can mess up the fairness of the mutex, seems bad
203 match self.try_lock() {
204 Some(guard) => return guard,
208 // After we've failed the fast path, then we delegate to the differnet
209 // locking protocols for green/native tasks. This will select two tasks
210 // to continue further (one native, one green).
211 let t: ~Task = Local::take();
212 let can_block = t.can_block();
216 native_bit = NATIVE_BLOCKED;
219 native_bit = GREEN_BLOCKED;
222 // After we've arbitrated among task types, attempt to re-acquire the
223 // lock (avoids a deschedule). This is very important to do in order to
224 // allow threads coming out of the native_lock function to try their
225 // best to not hit a cvar in deschedule.
226 let mut old = match self.state.compare_and_swap(0, LOCKED,
229 let flavor = if can_block {
234 // We've acquired the lock, so this unsafe access to flavor is
236 unsafe { *self.flavor.get() = flavor; }
237 return Guard::new(self)
242 // Alright, everything else failed. We need to deschedule ourselves and
243 // flag ourselves as waiting. Note that this case should only happen
244 // regularly in native/green contention. Due to try_lock and the header
245 // of lock stealing the lock, it's also possible for native/native
246 // contention to hit this location, but as less common.
247 let t: ~Task = Local::take();
248 t.deschedule(1, |task| {
249 let task = unsafe { task.cast_to_uint() };
251 // These accesses are protected by the respective native/green
252 // mutexes which were acquired above.
253 let prev = if can_block {
254 unsafe { mem::replace(&mut *self.native_blocker.get(), task) }
256 unsafe { mem::replace(&mut *self.green_blocker.get(), task) }
261 assert_eq!(old & native_bit, 0);
262 // If the old state was locked, then we need to flag ourselves
263 // as blocking in the state. If the old state was unlocked, then
264 // we attempt to acquire the mutex. Everything here is a CAS
265 // loop that'll eventually make progress.
266 if old & LOCKED != 0 {
267 old = match self.state.compare_and_swap(old,
270 n if n == old => return Ok(()),
275 old = match self.state.compare_and_swap(old,
279 // After acquiring the lock, we have access to the
280 // flavor field, and we've regained access to our
281 // respective native/green blocker field.
282 let prev = if can_block {
284 *self.native_blocker.get() = 0;
285 mem::replace(&mut *self.flavor.get(),
290 *self.green_blocker.get() = 0;
291 mem::replace(&mut *self.flavor.get(),
295 assert_eq!(prev, Unlocked);
297 BlockedTask::cast_from_uint(task)
309 // Tasks which can block are super easy. These tasks just call the blocking
310 // `lock()` function on an OS mutex
311 fn native_lock(&self, t: ~Task) {
313 unsafe { self.lock.lock_noguard(); }
316 fn native_unlock(&self) {
317 unsafe { self.lock.unlock_noguard(); }
320 fn green_lock(&self, t: ~Task) {
321 // Green threads flag their presence with an atomic counter, and if they
322 // fail to be the first to the mutex, they enqueue themselves on a
323 // concurrent internal queue with a stack-allocated node.
325 // FIXME: There isn't a cancellation currently of an enqueue, forcing
326 // the unlocker to spin for a bit.
327 if self.green_cnt.fetch_add(1, atomics::SeqCst) == 0 {
332 let mut node = q::Node::new(0);
333 t.deschedule(1, |task| {
335 node.data = task.cast_to_uint();
336 self.q.push(&mut node);
342 fn green_unlock(&self) {
343 // If we're the only green thread, then no need to check the queue,
344 // otherwise the fixme above forces us to spin for a bit.
345 if self.green_cnt.fetch_sub(1, atomics::SeqCst) == 1 { return }
348 match unsafe { self.q.pop() } {
349 Some(t) => { node = t; break; }
350 None => Thread::yield_now(),
353 let task = unsafe { BlockedTask::cast_from_uint((*node).data) };
354 task.wake().map(|t| t.reawaken());
358 // Unlocking this mutex is a little tricky. We favor any task that is
359 // manually blocked (not in each of the separate locks) in order to help
360 // provide a little fairness (green threads will wake up the pending
361 // native thread and native threads will wake up the pending green
364 // There's also the question of when we unlock the actual green/native
365 // locking halves as well. If we're waking up someone, then we can wait
366 // to unlock until we've acquired the task to wake up (we're guaranteed
367 // the mutex memory is still valid when there's contenders), but as soon
368 // as we don't find any contenders we must unlock the mutex, and *then*
369 // flag the mutex as unlocked.
371 // This flagging can fail, leading to another round of figuring out if a
372 // task needs to be woken, and in this case it's ok that the "mutex
373 // halves" are unlocked, we're just mainly dealing with the atomic state
374 // of the outer mutex.
375 let flavor = unsafe { mem::replace(&mut *self.flavor.get(), Unlocked) };
377 let mut state = self.state.load(atomics::SeqCst);
378 let mut unlocked = false;
381 assert!(state & LOCKED != 0);
382 if state & GREEN_BLOCKED != 0 {
383 self.unset(state, GREEN_BLOCKED);
385 *self.flavor.get() = GreenAcquisition;
386 let task = mem::replace(&mut *self.green_blocker.get(), 0);
387 BlockedTask::cast_from_uint(task)
390 } else if state & NATIVE_BLOCKED != 0 {
391 self.unset(state, NATIVE_BLOCKED);
393 *self.flavor.get() = NativeAcquisition;
394 let task = mem::replace(&mut *self.native_blocker.get(), 0);
395 BlockedTask::cast_from_uint(task)
399 assert_eq!(state, LOCKED);
402 GreenAcquisition => { self.green_unlock(); }
403 NativeAcquisition => { self.native_unlock(); }
404 TryLockAcquisition => {}
405 Unlocked => unreachable!()
409 match self.state.compare_and_swap(LOCKED, 0, atomics::SeqCst) {
417 GreenAcquisition => { self.green_unlock(); }
418 NativeAcquisition => { self.native_unlock(); }
419 TryLockAcquisition => {}
420 Unlocked => unreachable!()
424 task.wake().map(|t| t.reawaken());
427 /// Loops around a CAS to unset the `bit` in `state`
428 fn unset(&self, mut state: uint, bit: uint) {
430 assert!(state & bit != 0);
431 let new = state ^ bit;
432 match self.state.compare_and_swap(state, new, atomics::SeqCst) {
433 n if n == state => break,
439 /// Deallocates resources associated with this static mutex.
441 /// This method is unsafe because it provides no guarantees that there are
442 /// no active users of this mutex, and safety is not guaranteed if there are
443 /// active users of this mutex.
445 /// This method is required to ensure that there are no memory leaks on
446 /// *all* platforms. It may be the case that some platforms do not leak
447 /// memory if this method is not called, but this is not guaranteed to be
448 /// true on all platforms.
449 pub unsafe fn destroy(&self) {
455 /// Creates a new mutex in an unlocked state ready for use.
456 pub fn new() -> Mutex {
459 state: atomics::AtomicUint::new(0),
460 flavor: Unsafe::new(Unlocked),
461 green_blocker: Unsafe::new(0),
462 native_blocker: Unsafe::new(0),
463 green_cnt: atomics::AtomicUint::new(0),
465 lock: unsafe { mutex::StaticNativeMutex::new() },
470 /// Attempts to acquire this lock.
472 /// If the lock could not be acquired at this time, then `None` is returned.
473 /// Otherwise, an RAII guard is returned. The lock will be unlocked when the
474 /// guard is dropped.
476 /// This function does not block.
477 pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
481 /// Acquires a mutex, blocking the current task until it is able to do so.
483 /// This function will block the local task until it is available to acquire
484 /// the mutex. Upon returning, the task is the only task with the mutex
485 /// held. An RAII guard is returned to allow scoped unlock of the lock. When
486 /// the guard goes out of scope, the mutex will be unlocked.
487 pub fn lock<'a>(&'a self) -> Guard<'a> { self.lock.lock() }
491 fn new<'b>(lock: &'b StaticMutex) -> Guard<'b> {
493 // once we've acquired a lock, it's ok to access the flavor
494 assert!(unsafe { *lock.flavor.get() != Unlocked });
495 assert!(lock.state.load(atomics::SeqCst) & LOCKED != 0);
502 impl<'a> Drop for Guard<'a> {
509 impl Drop for Mutex {
511 // This is actually safe b/c we know that there is no further usage of
512 // this mutex (it's up to the user to arrange for a mutex to get
513 // dropped, that's not our job)
514 unsafe { self.lock.destroy() }
521 use super::{Mutex, StaticMutex, MUTEX_INIT};
525 let m = Mutex::new();
532 static mut m: StaticMutex = MUTEX_INIT;
542 static mut m: StaticMutex = MUTEX_INIT;
543 static mut CNT: uint = 0;
544 static M: uint = 1000;
548 for _ in range(0, M) {
556 let (tx, rx) = channel();
557 for _ in range(0, N) {
558 let tx2 = tx.clone();
559 native::task::spawn(proc() { inc(); tx2.send(()); });
560 let tx2 = tx.clone();
561 spawn(proc() { inc(); tx2.send(()); });
565 for _ in range(0, 2 * N) {
568 assert_eq!(unsafe {CNT}, M * N * 2);
576 let m = Mutex::new();
577 assert!(m.try_lock().is_some());