1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
11 //! A (mostly) lock-free concurrent work-stealing deque
13 //! This module contains an implementation of the Chase-Lev work stealing deque
14 //! described in "Dynamic Circular Work-Stealing Deque". The implementation is
15 //! heavily based on the pseudocode found in the paper.
17 //! This implementation does not want to have the restriction of a garbage
18 //! collector for reclamation of buffers, and instead it uses a shared pool of
19 //! buffers. This shared pool is required for correctness in this
22 //! The only lock-synchronized portions of this deque are the buffer allocation
23 //! and deallocation portions. Otherwise all operations are lock-free.
27 //! use std::sync::deque::BufferPool;
29 //! let mut pool = BufferPool::new();
30 //! let (mut worker, mut stealer) = pool.deque();
32 //! // Only the worker may push/pop
36 //! // Stealers take data from the other end of the deque
40 //! // Stealers can be cloned to have many stealers stealing in parallel
42 //! let mut stealer2 = stealer.clone();
47 // NB: the "buffer pool" strategy is not done for speed, but rather for
48 // correctness. For more info, see the comment on `swap_buffer`
50 // FIXME: all atomic operations in this module use a SeqCst ordering. That is
56 use alloc::heap::{allocate, deallocate};
57 use alloc::boxed::Box;
59 use core::kinds::marker;
60 use core::mem::{forget, min_align_of, size_of, transmute};
62 use rustrt::exclusive::Exclusive;
64 use atomics::{AtomicInt, AtomicPtr, SeqCst};
66 // Once the queue is less than 1/K full, then it will be downsized. Note that
67 // the deque requires that this number be less than 2.
70 // Minimum number of bits that a buffer size should be. No buffer will resize to
71 // under this value, and all deques will initially contain a buffer of this
74 // The size in question is 1 << MIN_BITS
75 static MIN_BITS: uint = 7;
80 array: AtomicPtr<Buffer<T>>,
84 /// Worker half of the work-stealing deque. This worker has exclusive access to
85 /// one side of the deque, and uses `push` and `pop` method to manipulate it.
87 /// There may only be one worker per deque.
88 pub struct Worker<T> {
90 noshare: marker::NoShare,
93 /// The stealing half of the work-stealing deque. Stealers have access to the
94 /// opposite end of the deque from the worker, and they only have access to the
96 pub struct Stealer<T> {
98 noshare: marker::NoShare,
101 /// When stealing some data, this is an enumeration of the possible outcomes.
102 #[deriving(PartialEq, Show)]
104 /// The deque was empty at the time of stealing
106 /// The stealer lost the race for stealing data, and a retry may return more
109 /// The stealer has successfully stolen some data.
113 /// The allocation pool for buffers used by work-stealing deques. Right now this
114 /// structure is used for reclamation of memory after it is no longer in use by
117 /// This data structure is protected by a mutex, but it is rarely used. Deques
118 /// will only use this structure when allocating a new buffer or deallocating a
120 pub struct BufferPool<T> {
121 pool: Arc<Exclusive<Vec<Box<Buffer<T>>>>>,
124 /// An internal buffer used by the chase-lev deque. This structure is actually
125 /// implemented as a circular buffer, and is used as the intermediate storage of
126 /// the data in the deque.
128 /// This type is implemented with *T instead of Vec<T> for two reasons:
130 /// 1. There is nothing safe about using this buffer. This easily allows the
131 /// same value to be read twice in to rust, and there is nothing to
132 /// prevent this. The usage by the deque must ensure that one of the
133 /// values is forgotten. Furthermore, we only ever want to manually run
134 /// destructors for values in this buffer (on drop) because the bounds
135 /// are defined by the deque it's owned by.
137 /// 2. We can certainly avoid bounds checks using *T instead of Vec<T>, although
138 /// LLVM is probably pretty good at doing this already.
144 impl<T: Send> BufferPool<T> {
145 /// Allocates a new buffer pool which in turn can be used to allocate new
147 pub fn new() -> BufferPool<T> {
148 BufferPool { pool: Arc::new(Exclusive::new(Vec::new())) }
151 /// Allocates a new work-stealing deque which will send/receiving memory to
152 /// and from this buffer pool.
153 pub fn deque(&self) -> (Worker<T>, Stealer<T>) {
154 let a = Arc::new(Deque::new(self.clone()));
156 (Worker { deque: a, noshare: marker::NoShare },
157 Stealer { deque: b, noshare: marker::NoShare })
160 fn alloc(&mut self, bits: uint) -> Box<Buffer<T>> {
162 let mut pool = self.pool.lock();
163 match pool.iter().position(|x| x.size() >= (1 << bits)) {
164 Some(i) => pool.remove(i).unwrap(),
165 None => box Buffer::new(bits)
170 fn free(&self, buf: Box<Buffer<T>>) {
172 let mut pool = self.pool.lock();
173 match pool.iter().position(|v| v.size() > buf.size()) {
174 Some(i) => pool.insert(i, buf),
175 None => pool.push(buf),
181 impl<T: Send> Clone for BufferPool<T> {
182 fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } }
185 impl<T: Send> Worker<T> {
186 /// Pushes data onto the front of this work queue.
187 pub fn push(&self, t: T) {
188 unsafe { self.deque.push(t) }
190 /// Pops data off the front of the work queue, returning `None` on an empty
192 pub fn pop(&self) -> Option<T> {
193 unsafe { self.deque.pop() }
196 /// Gets access to the buffer pool that this worker is attached to. This can
197 /// be used to create more deques which share the same buffer pool as this
199 pub fn pool<'a>(&'a self) -> &'a BufferPool<T> {
204 impl<T: Send> Stealer<T> {
205 /// Steals work off the end of the queue (opposite of the worker's end)
206 pub fn steal(&self) -> Stolen<T> {
207 unsafe { self.deque.steal() }
210 /// Gets access to the buffer pool that this stealer is attached to. This
211 /// can be used to create more deques which share the same buffer pool as
213 pub fn pool<'a>(&'a self) -> &'a BufferPool<T> {
218 impl<T: Send> Clone for Stealer<T> {
219 fn clone(&self) -> Stealer<T> {
220 Stealer { deque: self.deque.clone(), noshare: marker::NoShare }
224 // Almost all of this code can be found directly in the paper so I'm not
225 // personally going to heavily comment what's going on here.
227 impl<T: Send> Deque<T> {
228 fn new(mut pool: BufferPool<T>) -> Deque<T> {
229 let buf = pool.alloc(MIN_BITS);
231 bottom: AtomicInt::new(0),
232 top: AtomicInt::new(0),
233 array: AtomicPtr::new(unsafe { transmute(buf) }),
238 unsafe fn push(&self, data: T) {
239 let mut b = self.bottom.load(SeqCst);
240 let t = self.top.load(SeqCst);
241 let mut a = self.array.load(SeqCst);
243 if size >= (*a).size() - 1 {
244 // You won't find this code in the chase-lev deque paper. This is
245 // alluded to in a small footnote, however. We always free a buffer
246 // when growing in order to prevent leaks.
247 a = self.swap_buffer(b, a, (*a).resize(b, t, 1));
248 b = self.bottom.load(SeqCst);
251 self.bottom.store(b + 1, SeqCst);
254 unsafe fn pop(&self) -> Option<T> {
255 let b = self.bottom.load(SeqCst);
256 let a = self.array.load(SeqCst);
258 self.bottom.store(b, SeqCst);
259 let t = self.top.load(SeqCst);
262 self.bottom.store(t, SeqCst);
265 let data = (*a).get(b);
267 self.maybe_shrink(b, t);
270 if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
271 self.bottom.store(t + 1, SeqCst);
274 self.bottom.store(t + 1, SeqCst);
275 forget(data); // someone else stole this value
280 unsafe fn steal(&self) -> Stolen<T> {
281 let t = self.top.load(SeqCst);
282 let old = self.array.load(SeqCst);
283 let b = self.bottom.load(SeqCst);
284 let a = self.array.load(SeqCst);
286 if size <= 0 { return Empty }
287 if size % (*a).size() == 0 {
288 if a == old && t == self.top.load(SeqCst) {
293 let data = (*a).get(t);
294 if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
297 forget(data); // someone else stole this value
302 unsafe fn maybe_shrink(&self, b: int, t: int) {
303 let a = self.array.load(SeqCst);
304 if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) {
305 self.swap_buffer(b, a, (*a).resize(b, t, -1));
309 // Helper routine not mentioned in the paper which is used in growing and
310 // shrinking buffers to swap in a new buffer into place. As a bit of a
311 // recap, the whole point that we need a buffer pool rather than just
312 // calling malloc/free directly is that stealers can continue using buffers
313 // after this method has called 'free' on it. The continued usage is simply
314 // a read followed by a forget, but we must make sure that the memory can
315 // continue to be read after we flag this buffer for reclamation.
316 unsafe fn swap_buffer(&self, b: int, old: *mut Buffer<T>,
317 buf: Buffer<T>) -> *mut Buffer<T> {
318 let newbuf: *mut Buffer<T> = transmute(box buf);
319 self.array.store(newbuf, SeqCst);
320 let ss = (*newbuf).size();
321 self.bottom.store(b + ss, SeqCst);
322 let t = self.top.load(SeqCst);
323 if self.top.compare_and_swap(t, t + ss, SeqCst) != t {
324 self.bottom.store(b, SeqCst);
326 self.pool.free(transmute(old));
333 impl<T: Send> Drop for Deque<T> {
335 let t = self.top.load(SeqCst);
336 let b = self.bottom.load(SeqCst);
337 let a = self.array.load(SeqCst);
338 // Free whatever is leftover in the dequeue, and then move the buffer
339 // back into the pool.
340 for i in range(t, b) {
341 let _: T = unsafe { (*a).get(i) };
343 self.pool.free(unsafe { transmute(a) });
348 fn buffer_alloc_size<T>(log_size: uint) -> uint {
349 (1 << log_size) * size_of::<T>()
352 impl<T: Send> Buffer<T> {
353 unsafe fn new(log_size: uint) -> Buffer<T> {
354 let size = buffer_alloc_size::<T>(log_size);
355 let buffer = allocate(size, min_align_of::<T>());
357 storage: buffer as *const T,
362 fn size(&self) -> int { 1 << self.log_size }
364 // Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly
365 fn mask(&self) -> int { (1 << self.log_size) - 1 }
367 unsafe fn elem(&self, i: int) -> *const T {
368 self.storage.offset(i & self.mask())
371 // This does not protect against loading duplicate values of the same cell,
372 // nor does this clear out the contents contained within. Hence, this is a
373 // very unsafe method which the caller needs to treat specially in case a
375 unsafe fn get(&self, i: int) -> T {
376 ptr::read(self.elem(i))
379 // Unsafe because this unsafely overwrites possibly uninitialized or
381 unsafe fn put(&self, i: int, t: T) {
382 ptr::write(self.elem(i) as *mut T, t);
385 // Again, unsafe because this has incredibly dubious ownership violations.
386 // It is assumed that this buffer is immediately dropped.
387 unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> {
388 // NB: not entirely obvious, but thanks to 2's complement,
389 // casting delta to uint and then adding gives the desired
391 let buf = Buffer::new(self.log_size + delta as uint);
392 for i in range(t, b) {
393 buf.put(i, self.get(i));
400 impl<T: Send> Drop for Buffer<T> {
402 // It is assumed that all buffers are empty on drop.
403 let size = buffer_alloc_size::<T>(self.log_size);
404 unsafe { deallocate(self.storage as *mut u8, size, min_align_of::<T>()) }
411 use super::{Data, BufferPool, Abort, Empty, Worker, Stealer};
414 use std::rt::thread::Thread;
417 use atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst,
418 AtomicUint, INIT_ATOMIC_UINT};
423 let pool = BufferPool::new();
424 let (w, s) = pool.deque();
425 assert_eq!(w.pop(), None);
426 assert_eq!(s.steal(), Empty);
428 assert_eq!(w.pop(), Some(1));
430 assert_eq!(s.steal(), Data(1));
432 assert_eq!(s.clone().steal(), Data(1));
437 static AMT: int = 100000;
438 let pool = BufferPool::<int>::new();
439 let (w, s) = pool.deque();
440 let t = Thread::start(proc() {
453 for _ in range(0, AMT) {
461 fn stealpush_large() {
462 static AMT: int = 100000;
463 let pool = BufferPool::<(int, int)>::new();
464 let (w, s) = pool.deque();
465 let t = Thread::start(proc() {
469 Data((1, 10)) => { left -= 1; }
476 for _ in range(0, AMT) {
483 fn stampede(w: Worker<Box<int>>, s: Stealer<Box<int>>,
484 nthreads: int, amt: uint) {
485 for _ in range(0, amt) {
488 let mut remaining = AtomicUint::new(amt);
489 let unsafe_remaining: *mut AtomicUint = &mut remaining;
491 let threads = range(0, nthreads).map(|_| {
493 Thread::start(proc() {
495 while (*unsafe_remaining).load(SeqCst) > 0 {
498 (*unsafe_remaining).fetch_sub(1, SeqCst);
506 }).collect::<Vec<Thread<()>>>();
508 while remaining.load(SeqCst) > 0 {
510 Some(box 20) => { remaining.fetch_sub(1, SeqCst); }
516 for thread in threads.move_iter() {
523 let pool = BufferPool::<Box<int>>::new();
524 let (w, s) = pool.deque();
525 stampede(w, s, 8, 10000);
530 static AMT: uint = 4;
531 let pool = BufferPool::<Box<int>>::new();
532 let threads = range(0, AMT).map(|_| {
533 let (w, s) = pool.deque();
534 Thread::start(proc() {
535 stampede(w, s, 4, 10000);
537 }).collect::<Vec<Thread<()>>>();
539 for thread in threads.move_iter() {
546 static AMT: int = 100000;
547 static NTHREADS: int = 8;
548 static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
549 static mut HITS: AtomicUint = INIT_ATOMIC_UINT;
550 let pool = BufferPool::<int>::new();
551 let (w, s) = pool.deque();
553 let threads = range(0, NTHREADS).map(|_| {
555 Thread::start(proc() {
559 Data(2) => { HITS.fetch_add(1, SeqCst); }
561 _ if DONE.load(SeqCst) => break,
567 }).collect::<Vec<Thread<()>>>();
569 let mut rng = rand::task_rng();
570 let mut expected = 0;
571 while expected < AMT {
572 if rng.gen_range(0i, 3) == 2 {
575 Some(2) => unsafe { HITS.fetch_add(1, SeqCst); },
585 while HITS.load(SeqCst) < AMT as uint {
588 Some(2) => { HITS.fetch_add(1, SeqCst); },
592 DONE.store(true, SeqCst);
595 for thread in threads.move_iter() {
599 assert_eq!(unsafe { HITS.load(SeqCst) }, expected as uint);
603 #[ignore(cfg(windows))] // apparently windows scheduling is weird?
605 static AMT: int = 10000;
606 static NTHREADS: int = 4;
607 static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
608 let pool = BufferPool::<(int, uint)>::new();
609 let (w, s) = pool.deque();
611 let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| {
613 let unique_box = box AtomicUint::new(0);
614 let thread_box = unsafe {
615 *mem::transmute::<&Box<AtomicUint>,
616 *const *mut AtomicUint>(&unique_box)
618 (Thread::start(proc() {
623 (*thread_box).fetch_add(1, SeqCst);
626 _ if DONE.load(SeqCst) => break,
634 let mut rng = rand::task_rng();
635 let mut myhit = false;
637 for _ in range(0, rng.gen_range(0, AMT)) {
638 if !myhit && rng.gen_range(0i, 3) == 2 {
641 Some((1, 2)) => myhit = true,
649 for slot in hits.iter() {
650 let amt = slot.load(SeqCst);
651 if amt == 0 { continue 'outer; }
658 unsafe { DONE.store(true, SeqCst); }
660 for thread in threads.move_iter() {