]> git.lizzy.rs Git - rust.git/blob - src/libsync/mutex.rs
Stabilization for `owned` (now `boxed`) and `cell`
[rust.git] / src / libsync / mutex.rs
1 // Copyright 2014 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! A proper mutex implementation regardless of the "flavor of task" which is
12 //! acquiring the lock.
13
14 // # Implementation of Rust mutexes
15 //
16 // Most answers to the question of "how do I use a mutex" are "use pthreads",
17 // but for Rust this isn't quite sufficient. Green threads cannot acquire an OS
18 // mutex because they can context switch among many OS threads, leading to
19 // deadlocks with other green threads.
20 //
21 // Another problem for green threads grabbing an OS mutex is that POSIX dictates
22 // that unlocking a mutex on a different thread from where it was locked is
23 // undefined behavior. Remember that green threads can migrate among OS threads,
24 // so this would mean that we would have to pin green threads to OS threads,
25 // which is less than ideal.
26 //
27 // ## Using deschedule/reawaken
28 //
29 // We already have primitives for descheduling/reawakening tasks, so they're the
30 // first obvious choice when implementing a mutex. The idea would be to have a
31 // concurrent queue that everyone is pushed on to, and then the owner of the
32 // mutex is the one popping from the queue.
33 //
34 // Unfortunately, this is not very performant for native tasks. The suspected
35 // reason for this is that each native thread is suspended on its own condition
36 // variable, unique from all the other threads. In this situation, the kernel
37 // has no idea what the scheduling semantics are of the user program, so all of
38 // the threads are distributed among all cores on the system. This ends up
39 // having very expensive wakeups of remote cores high up in the profile when
40 // handing off the mutex among native tasks. On the other hand, when using an OS
41 // mutex, the kernel knows that all native threads are contended on the same
42 // mutex, so they're in theory all migrated to a single core (fast context
43 // switching).
44 //
45 // ## Mixing implementations
46 //
47 // From that above information, we have two constraints. The first is that
48 // green threads can't touch os mutexes, and the second is that native tasks
49 // pretty much *must* touch an os mutex.
50 //
51 // As a compromise, the queueing implementation is used for green threads and
52 // the os mutex is used for native threads (why not have both?). This ends up
53 // leading to fairly decent performance for both native threads and green
54 // threads on various workloads (uncontended and contended).
55 //
56 // The crux of this implementation is an atomic work which is CAS'd on many
57 // times in order to manage a few flags about who's blocking where and whether
58 // it's locked or not.
59
60 use core::prelude::*;
61
62 use alloc::boxed::Box;
63 use core::atomics;
64 use core::kinds::marker;
65 use core::mem;
66 use core::ty::Unsafe;
67 use rustrt::local::Local;
68 use rustrt::mutex;
69 use rustrt::task::{BlockedTask, Task};
70 use rustrt::thread::Thread;
71
72 use q = mpsc_intrusive;
73
74 pub static LOCKED: uint = 1 << 0;
75 pub static GREEN_BLOCKED: uint = 1 << 1;
76 pub static NATIVE_BLOCKED: uint = 1 << 2;
77
78 /// A mutual exclusion primitive useful for protecting shared data
79 ///
80 /// This mutex is an implementation of a lock for all flavors of tasks which may
81 /// be grabbing. A common problem with green threads is that they cannot grab
82 /// locks (if they reschedule during the lock a contender could deadlock the
83 /// system), but this mutex does *not* suffer this problem.
84 ///
85 /// This mutex will properly block tasks waiting for the lock to become
86 /// available. The mutex can also be statically initialized or created via a
87 /// `new` constructor.
88 ///
89 /// # Example
90 ///
91 /// ```rust
92 /// use sync::mutex::Mutex;
93 ///
94 /// let m = Mutex::new();
95 /// let guard = m.lock();
96 /// // do some work
97 /// drop(guard); // unlock the lock
98 /// ```
99 pub struct Mutex {
100     // Note that this static mutex is in a *box*, not inlined into the struct
101     // itself. This is done for memory safety reasons with the usage of a
102     // StaticNativeMutex inside the static mutex above. Once a native mutex has
103     // been used once, its address can never change (it can't be moved). This
104     // mutex type can be safely moved at any time, so to ensure that the native
105     // mutex is used correctly we box the inner lock to give it a constant
106     // address.
107     lock: Box<StaticMutex>,
108 }
109
110 #[deriving(PartialEq, Show)]
111 enum Flavor {
112     Unlocked,
113     TryLockAcquisition,
114     GreenAcquisition,
115     NativeAcquisition,
116 }
117
118 /// The static mutex type is provided to allow for static allocation of mutexes.
119 ///
120 /// Note that this is a separate type because using a Mutex correctly means that
121 /// it needs to have a destructor run. In Rust, statics are not allowed to have
122 /// destructors. As a result, a `StaticMutex` has one extra method when compared
123 /// to a `Mutex`, a `destroy` method. This method is unsafe to call, and
124 /// documentation can be found directly on the method.
125 ///
126 /// # Example
127 ///
128 /// ```rust
129 /// use sync::mutex::{StaticMutex, MUTEX_INIT};
130 ///
131 /// static mut LOCK: StaticMutex = MUTEX_INIT;
132 ///
133 /// unsafe {
134 ///     let _g = LOCK.lock();
135 ///     // do some productive work
136 /// }
137 /// // lock is unlocked here.
138 /// ```
139 pub struct StaticMutex {
140     /// Current set of flags on this mutex
141     state: atomics::AtomicUint,
142     /// an OS mutex used by native threads
143     lock: mutex::StaticNativeMutex,
144
145     /// Type of locking operation currently on this mutex
146     flavor: Unsafe<Flavor>,
147     /// uint-cast of the green thread waiting for this mutex
148     green_blocker: Unsafe<uint>,
149     /// uint-cast of the native thread waiting for this mutex
150     native_blocker: Unsafe<uint>,
151
152     /// A concurrent mpsc queue used by green threads, along with a count used
153     /// to figure out when to dequeue and enqueue.
154     q: q::Queue<uint>,
155     green_cnt: atomics::AtomicUint,
156 }
157
158 /// An RAII implementation of a "scoped lock" of a mutex. When this structure is
159 /// dropped (falls out of scope), the lock will be unlocked.
160 #[must_use]
161 pub struct Guard<'a> {
162     lock: &'a StaticMutex,
163 }
164
165 /// Static initialization of a mutex. This constant can be used to initialize
166 /// other mutex constants.
167 pub static MUTEX_INIT: StaticMutex = StaticMutex {
168     lock: mutex::NATIVE_MUTEX_INIT,
169     state: atomics::INIT_ATOMIC_UINT,
170     flavor: Unsafe { value: Unlocked, marker1: marker::InvariantType },
171     green_blocker: Unsafe { value: 0, marker1: marker::InvariantType },
172     native_blocker: Unsafe { value: 0, marker1: marker::InvariantType },
173     green_cnt: atomics::INIT_ATOMIC_UINT,
174     q: q::Queue {
175         head: atomics::INIT_ATOMIC_UINT,
176         tail: Unsafe {
177             value: 0 as *mut q::Node<uint>,
178             marker1: marker::InvariantType,
179         },
180         stub: q::DummyNode {
181             next: atomics::INIT_ATOMIC_UINT,
182         }
183     }
184 };
185
186 impl StaticMutex {
187     /// Attempts to grab this lock, see `Mutex::try_lock`
188     pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
189         // Attempt to steal the mutex from an unlocked state.
190         //
191         // FIXME: this can mess up the fairness of the mutex, seems bad
192         match self.state.compare_and_swap(0, LOCKED, atomics::SeqCst) {
193             0 => {
194                 // After acquiring the mutex, we can safely access the inner
195                 // fields.
196                 let prev = unsafe {
197                     mem::replace(&mut *self.flavor.get(), TryLockAcquisition)
198                 };
199                 assert_eq!(prev, Unlocked);
200                 Some(Guard::new(self))
201             }
202             _ => None
203         }
204     }
205
206     /// Acquires this lock, see `Mutex::lock`
207     pub fn lock<'a>(&'a self) -> Guard<'a> {
208         // First, attempt to steal the mutex from an unlocked state. The "fast
209         // path" needs to have as few atomic instructions as possible, and this
210         // one cmpxchg is already pretty expensive.
211         //
212         // FIXME: this can mess up the fairness of the mutex, seems bad
213         match self.try_lock() {
214             Some(guard) => return guard,
215             None => {}
216         }
217
218         // After we've failed the fast path, then we delegate to the different
219         // locking protocols for green/native tasks. This will select two tasks
220         // to continue further (one native, one green).
221         let t: Box<Task> = Local::take();
222         let can_block = t.can_block();
223         let native_bit;
224         if can_block {
225             self.native_lock(t);
226             native_bit = NATIVE_BLOCKED;
227         } else {
228             self.green_lock(t);
229             native_bit = GREEN_BLOCKED;
230         }
231
232         // After we've arbitrated among task types, attempt to re-acquire the
233         // lock (avoids a deschedule). This is very important to do in order to
234         // allow threads coming out of the native_lock function to try their
235         // best to not hit a cvar in deschedule.
236         let mut old = match self.state.compare_and_swap(0, LOCKED,
237                                                         atomics::SeqCst) {
238             0 => {
239                 let flavor = if can_block {
240                     NativeAcquisition
241                 } else {
242                     GreenAcquisition
243                 };
244                 // We've acquired the lock, so this unsafe access to flavor is
245                 // allowed.
246                 unsafe { *self.flavor.get() = flavor; }
247                 return Guard::new(self)
248             }
249             old => old,
250         };
251
252         // Alright, everything else failed. We need to deschedule ourselves and
253         // flag ourselves as waiting. Note that this case should only happen
254         // regularly in native/green contention. Due to try_lock and the header
255         // of lock stealing the lock, it's also possible for native/native
256         // contention to hit this location, but as less common.
257         let t: Box<Task> = Local::take();
258         t.deschedule(1, |task| {
259             let task = unsafe { task.cast_to_uint() };
260
261             // These accesses are protected by the respective native/green
262             // mutexes which were acquired above.
263             let prev = if can_block {
264                 unsafe { mem::replace(&mut *self.native_blocker.get(), task) }
265             } else {
266                 unsafe { mem::replace(&mut *self.green_blocker.get(), task) }
267             };
268             assert_eq!(prev, 0);
269
270             loop {
271                 assert_eq!(old & native_bit, 0);
272                 // If the old state was locked, then we need to flag ourselves
273                 // as blocking in the state. If the old state was unlocked, then
274                 // we attempt to acquire the mutex. Everything here is a CAS
275                 // loop that'll eventually make progress.
276                 if old & LOCKED != 0 {
277                     old = match self.state.compare_and_swap(old,
278                                                             old | native_bit,
279                                                             atomics::SeqCst) {
280                         n if n == old => return Ok(()),
281                         n => n
282                     };
283                 } else {
284                     assert_eq!(old, 0);
285                     old = match self.state.compare_and_swap(old,
286                                                             old | LOCKED,
287                                                             atomics::SeqCst) {
288                         n if n == old => {
289                             // After acquiring the lock, we have access to the
290                             // flavor field, and we've regained access to our
291                             // respective native/green blocker field.
292                             let prev = if can_block {
293                                 unsafe {
294                                     *self.native_blocker.get() = 0;
295                                     mem::replace(&mut *self.flavor.get(),
296                                                  NativeAcquisition)
297                                 }
298                             } else {
299                                 unsafe {
300                                     *self.green_blocker.get() = 0;
301                                     mem::replace(&mut *self.flavor.get(),
302                                                  GreenAcquisition)
303                                 }
304                             };
305                             assert_eq!(prev, Unlocked);
306                             return Err(unsafe {
307                                 BlockedTask::cast_from_uint(task)
308                             })
309                         }
310                         n => n,
311                     };
312                 }
313             }
314         });
315
316         Guard::new(self)
317     }
318
319     // Tasks which can block are super easy. These tasks just call the blocking
320     // `lock()` function on an OS mutex
321     fn native_lock(&self, t: Box<Task>) {
322         Local::put(t);
323         unsafe { self.lock.lock_noguard(); }
324     }
325
326     fn native_unlock(&self) {
327         unsafe { self.lock.unlock_noguard(); }
328     }
329
330     fn green_lock(&self, t: Box<Task>) {
331         // Green threads flag their presence with an atomic counter, and if they
332         // fail to be the first to the mutex, they enqueue themselves on a
333         // concurrent internal queue with a stack-allocated node.
334         //
335         // FIXME: There isn't a cancellation currently of an enqueue, forcing
336         //        the unlocker to spin for a bit.
337         if self.green_cnt.fetch_add(1, atomics::SeqCst) == 0 {
338             Local::put(t);
339             return
340         }
341
342         let mut node = q::Node::new(0);
343         t.deschedule(1, |task| {
344             unsafe {
345                 node.data = task.cast_to_uint();
346                 self.q.push(&mut node);
347             }
348             Ok(())
349         });
350     }
351
352     fn green_unlock(&self) {
353         // If we're the only green thread, then no need to check the queue,
354         // otherwise the fixme above forces us to spin for a bit.
355         if self.green_cnt.fetch_sub(1, atomics::SeqCst) == 1 { return }
356         let node;
357         loop {
358             match unsafe { self.q.pop() } {
359                 Some(t) => { node = t; break; }
360                 None => Thread::yield_now(),
361             }
362         }
363         let task = unsafe { BlockedTask::cast_from_uint((*node).data) };
364         task.wake().map(|t| t.reawaken());
365     }
366
367     fn unlock(&self) {
368         // Unlocking this mutex is a little tricky. We favor any task that is
369         // manually blocked (not in each of the separate locks) in order to help
370         // provide a little fairness (green threads will wake up the pending
371         // native thread and native threads will wake up the pending green
372         // thread).
373         //
374         // There's also the question of when we unlock the actual green/native
375         // locking halves as well. If we're waking up someone, then we can wait
376         // to unlock until we've acquired the task to wake up (we're guaranteed
377         // the mutex memory is still valid when there's contenders), but as soon
378         // as we don't find any contenders we must unlock the mutex, and *then*
379         // flag the mutex as unlocked.
380         //
381         // This flagging can fail, leading to another round of figuring out if a
382         // task needs to be woken, and in this case it's ok that the "mutex
383         // halves" are unlocked, we're just mainly dealing with the atomic state
384         // of the outer mutex.
385         let flavor = unsafe { mem::replace(&mut *self.flavor.get(), Unlocked) };
386
387         let mut state = self.state.load(atomics::SeqCst);
388         let mut unlocked = false;
389         let task;
390         loop {
391             assert!(state & LOCKED != 0);
392             if state & GREEN_BLOCKED != 0 {
393                 self.unset(state, GREEN_BLOCKED);
394                 task = unsafe {
395                     *self.flavor.get() = GreenAcquisition;
396                     let task = mem::replace(&mut *self.green_blocker.get(), 0);
397                     BlockedTask::cast_from_uint(task)
398                 };
399                 break;
400             } else if state & NATIVE_BLOCKED != 0 {
401                 self.unset(state, NATIVE_BLOCKED);
402                 task = unsafe {
403                     *self.flavor.get() = NativeAcquisition;
404                     let task = mem::replace(&mut *self.native_blocker.get(), 0);
405                     BlockedTask::cast_from_uint(task)
406                 };
407                 break;
408             } else {
409                 assert_eq!(state, LOCKED);
410                 if !unlocked {
411                     match flavor {
412                         GreenAcquisition => { self.green_unlock(); }
413                         NativeAcquisition => { self.native_unlock(); }
414                         TryLockAcquisition => {}
415                         Unlocked => unreachable!(),
416                     }
417                     unlocked = true;
418                 }
419                 match self.state.compare_and_swap(LOCKED, 0, atomics::SeqCst) {
420                     LOCKED => return,
421                     n => { state = n; }
422                 }
423             }
424         }
425         if !unlocked {
426             match flavor {
427                 GreenAcquisition => { self.green_unlock(); }
428                 NativeAcquisition => { self.native_unlock(); }
429                 TryLockAcquisition => {}
430                 Unlocked => unreachable!(),
431             }
432         }
433
434         task.wake().map(|t| t.reawaken());
435     }
436
437     /// Loops around a CAS to unset the `bit` in `state`
438     fn unset(&self, mut state: uint, bit: uint) {
439         loop {
440             assert!(state & bit != 0);
441             let new = state ^ bit;
442             match self.state.compare_and_swap(state, new, atomics::SeqCst) {
443                 n if n == state => break,
444                 n => { state = n; }
445             }
446         }
447     }
448
449     /// Deallocates resources associated with this static mutex.
450     ///
451     /// This method is unsafe because it provides no guarantees that there are
452     /// no active users of this mutex, and safety is not guaranteed if there are
453     /// active users of this mutex.
454     ///
455     /// This method is required to ensure that there are no memory leaks on
456     /// *all* platforms. It may be the case that some platforms do not leak
457     /// memory if this method is not called, but this is not guaranteed to be
458     /// true on all platforms.
459     pub unsafe fn destroy(&self) {
460         self.lock.destroy()
461     }
462 }
463
464 impl Mutex {
465     /// Creates a new mutex in an unlocked state ready for use.
466     pub fn new() -> Mutex {
467         Mutex {
468             lock: box StaticMutex {
469                 state: atomics::AtomicUint::new(0),
470                 flavor: Unsafe::new(Unlocked),
471                 green_blocker: Unsafe::new(0),
472                 native_blocker: Unsafe::new(0),
473                 green_cnt: atomics::AtomicUint::new(0),
474                 q: q::Queue::new(),
475                 lock: unsafe { mutex::StaticNativeMutex::new() },
476             }
477         }
478     }
479
480     /// Attempts to acquire this lock.
481     ///
482     /// If the lock could not be acquired at this time, then `None` is returned.
483     /// Otherwise, an RAII guard is returned. The lock will be unlocked when the
484     /// guard is dropped.
485     ///
486     /// This function does not block.
487     pub fn try_lock<'a>(&'a self) -> Option<Guard<'a>> {
488         self.lock.try_lock()
489     }
490
491     /// Acquires a mutex, blocking the current task until it is able to do so.
492     ///
493     /// This function will block the local task until it is available to acquire
494     /// the mutex. Upon returning, the task is the only task with the mutex
495     /// held. An RAII guard is returned to allow scoped unlock of the lock. When
496     /// the guard goes out of scope, the mutex will be unlocked.
497     pub fn lock<'a>(&'a self) -> Guard<'a> { self.lock.lock() }
498 }
499
500 impl<'a> Guard<'a> {
501     fn new<'b>(lock: &'b StaticMutex) -> Guard<'b> {
502         if cfg!(debug) {
503             // once we've acquired a lock, it's ok to access the flavor
504             assert!(unsafe { *lock.flavor.get() != Unlocked });
505             assert!(lock.state.load(atomics::SeqCst) & LOCKED != 0);
506         }
507         Guard { lock: lock }
508     }
509 }
510
511 #[unsafe_destructor]
512 impl<'a> Drop for Guard<'a> {
513     #[inline]
514     fn drop(&mut self) {
515         self.lock.unlock();
516     }
517 }
518
519 impl Drop for Mutex {
520     fn drop(&mut self) {
521         // This is actually safe b/c we know that there is no further usage of
522         // this mutex (it's up to the user to arrange for a mutex to get
523         // dropped, that's not our job)
524         unsafe { self.lock.destroy() }
525     }
526 }
527
528 #[cfg(test)]
529 mod test {
530     use std::prelude::*;
531     use super::{Mutex, StaticMutex, MUTEX_INIT};
532     use native;
533
534     #[test]
535     fn smoke() {
536         let m = Mutex::new();
537         drop(m.lock());
538         drop(m.lock());
539     }
540
541     #[test]
542     fn smoke_static() {
543         static mut m: StaticMutex = MUTEX_INIT;
544         unsafe {
545             drop(m.lock());
546             drop(m.lock());
547             m.destroy();
548         }
549     }
550
551     #[test]
552     fn lots_and_lots() {
553         static mut m: StaticMutex = MUTEX_INIT;
554         static mut CNT: uint = 0;
555         static M: uint = 1000;
556         static N: uint = 3;
557
558         fn inc() {
559             for _ in range(0, M) {
560                 unsafe {
561                     let _g = m.lock();
562                     CNT += 1;
563                 }
564             }
565         }
566
567         let (tx, rx) = channel();
568         for _ in range(0, N) {
569             let tx2 = tx.clone();
570             native::task::spawn(proc() { inc(); tx2.send(()); });
571             let tx2 = tx.clone();
572             spawn(proc() { inc(); tx2.send(()); });
573         }
574
575         drop(tx);
576         for _ in range(0, 2 * N) {
577             rx.recv();
578         }
579         assert_eq!(unsafe {CNT}, M * N * 2);
580         unsafe {
581             m.destroy();
582         }
583     }
584
585     #[test]
586     fn trylock() {
587         let m = Mutex::new();
588         assert!(m.try_lock().is_some());
589     }
590 }