]> git.lizzy.rs Git - rust.git/blob - src/libsync/deque.rs
auto merge of #15421 : catharsis/rust/doc-ffi-minor-fixes, r=alexcrichton
[rust.git] / src / libsync / deque.rs
1 // Copyright 2013 The Rust Project Developers. See the COPYRIGHT
2 // file at the top-level directory of this distribution and at
3 // http://rust-lang.org/COPYRIGHT.
4 //
5 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
8 // option. This file may not be copied, modified, or distributed
9 // except according to those terms.
10
11 //! A (mostly) lock-free concurrent work-stealing deque
12 //!
13 //! This module contains an implementation of the Chase-Lev work stealing deque
14 //! described in "Dynamic Circular Work-Stealing Deque". The implementation is
15 //! heavily based on the pseudocode found in the paper.
16 //!
17 //! This implementation does not want to have the restriction of a garbage
18 //! collector for reclamation of buffers, and instead it uses a shared pool of
19 //! buffers. This shared pool is required for correctness in this
20 //! implementation.
21 //!
22 //! The only lock-synchronized portions of this deque are the buffer allocation
23 //! and deallocation portions. Otherwise all operations are lock-free.
24 //!
25 //! # Example
26 //!
27 //!     use std::sync::deque::BufferPool;
28 //!
29 //!     let mut pool = BufferPool::new();
30 //!     let (mut worker, mut stealer) = pool.deque();
31 //!
32 //!     // Only the worker may push/pop
33 //!     worker.push(1i);
34 //!     worker.pop();
35 //!
36 //!     // Stealers take data from the other end of the deque
37 //!     worker.push(1i);
38 //!     stealer.steal();
39 //!
40 //!     // Stealers can be cloned to have many stealers stealing in parallel
41 //!     worker.push(1i);
42 //!     let mut stealer2 = stealer.clone();
43 //!     stealer2.steal();
44
45 #![experimental]
46
47 // NB: the "buffer pool" strategy is not done for speed, but rather for
48 //     correctness. For more info, see the comment on `swap_buffer`
49
50 // FIXME: all atomic operations in this module use a SeqCst ordering. That is
51 //      probably overkill
52
53 use core::prelude::*;
54
55 use alloc::arc::Arc;
56 use alloc::heap::{allocate, deallocate};
57 use alloc::boxed::Box;
58 use collections::Vec;
59 use core::kinds::marker;
60 use core::mem::{forget, min_align_of, size_of, transmute};
61 use core::ptr;
62 use rustrt::exclusive::Exclusive;
63
64 use atomics::{AtomicInt, AtomicPtr, SeqCst};
65
66 // Once the queue is less than 1/K full, then it will be downsized. Note that
67 // the deque requires that this number be less than 2.
68 static K: int = 4;
69
70 // Minimum number of bits that a buffer size should be. No buffer will resize to
71 // under this value, and all deques will initially contain a buffer of this
72 // size.
73 //
74 // The size in question is 1 << MIN_BITS
75 static MIN_BITS: uint = 7;
76
77 struct Deque<T> {
78     bottom: AtomicInt,
79     top: AtomicInt,
80     array: AtomicPtr<Buffer<T>>,
81     pool: BufferPool<T>,
82 }
83
84 /// Worker half of the work-stealing deque. This worker has exclusive access to
85 /// one side of the deque, and uses `push` and `pop` method to manipulate it.
86 ///
87 /// There may only be one worker per deque.
88 pub struct Worker<T> {
89     deque: Arc<Deque<T>>,
90     noshare: marker::NoShare,
91 }
92
93 /// The stealing half of the work-stealing deque. Stealers have access to the
94 /// opposite end of the deque from the worker, and they only have access to the
95 /// `steal` method.
96 pub struct Stealer<T> {
97     deque: Arc<Deque<T>>,
98     noshare: marker::NoShare,
99 }
100
101 /// When stealing some data, this is an enumeration of the possible outcomes.
102 #[deriving(PartialEq, Show)]
103 pub enum Stolen<T> {
104     /// The deque was empty at the time of stealing
105     Empty,
106     /// The stealer lost the race for stealing data, and a retry may return more
107     /// data.
108     Abort,
109     /// The stealer has successfully stolen some data.
110     Data(T),
111 }
112
113 /// The allocation pool for buffers used by work-stealing deques. Right now this
114 /// structure is used for reclamation of memory after it is no longer in use by
115 /// deques.
116 ///
117 /// This data structure is protected by a mutex, but it is rarely used. Deques
118 /// will only use this structure when allocating a new buffer or deallocating a
119 /// previous one.
120 pub struct BufferPool<T> {
121     pool: Arc<Exclusive<Vec<Box<Buffer<T>>>>>,
122 }
123
124 /// An internal buffer used by the chase-lev deque. This structure is actually
125 /// implemented as a circular buffer, and is used as the intermediate storage of
126 /// the data in the deque.
127 ///
128 /// This type is implemented with *T instead of Vec<T> for two reasons:
129 ///
130 ///   1. There is nothing safe about using this buffer. This easily allows the
131 ///      same value to be read twice in to rust, and there is nothing to
132 ///      prevent this. The usage by the deque must ensure that one of the
133 ///      values is forgotten. Furthermore, we only ever want to manually run
134 ///      destructors for values in this buffer (on drop) because the bounds
135 ///      are defined by the deque it's owned by.
136 ///
137 ///   2. We can certainly avoid bounds checks using *T instead of Vec<T>, although
138 ///      LLVM is probably pretty good at doing this already.
139 struct Buffer<T> {
140     storage: *const T,
141     log_size: uint,
142 }
143
144 impl<T: Send> BufferPool<T> {
145     /// Allocates a new buffer pool which in turn can be used to allocate new
146     /// deques.
147     pub fn new() -> BufferPool<T> {
148         BufferPool { pool: Arc::new(Exclusive::new(Vec::new())) }
149     }
150
151     /// Allocates a new work-stealing deque which will send/receiving memory to
152     /// and from this buffer pool.
153     pub fn deque(&self) -> (Worker<T>, Stealer<T>) {
154         let a = Arc::new(Deque::new(self.clone()));
155         let b = a.clone();
156         (Worker { deque: a, noshare: marker::NoShare },
157          Stealer { deque: b, noshare: marker::NoShare })
158     }
159
160     fn alloc(&mut self, bits: uint) -> Box<Buffer<T>> {
161         unsafe {
162             let mut pool = self.pool.lock();
163             match pool.iter().position(|x| x.size() >= (1 << bits)) {
164                 Some(i) => pool.remove(i).unwrap(),
165                 None => box Buffer::new(bits)
166             }
167         }
168     }
169
170     fn free(&self, buf: Box<Buffer<T>>) {
171         unsafe {
172             let mut pool = self.pool.lock();
173             match pool.iter().position(|v| v.size() > buf.size()) {
174                 Some(i) => pool.insert(i, buf),
175                 None => pool.push(buf),
176             }
177         }
178     }
179 }
180
181 impl<T: Send> Clone for BufferPool<T> {
182     fn clone(&self) -> BufferPool<T> { BufferPool { pool: self.pool.clone() } }
183 }
184
185 impl<T: Send> Worker<T> {
186     /// Pushes data onto the front of this work queue.
187     pub fn push(&self, t: T) {
188         unsafe { self.deque.push(t) }
189     }
190     /// Pops data off the front of the work queue, returning `None` on an empty
191     /// queue.
192     pub fn pop(&self) -> Option<T> {
193         unsafe { self.deque.pop() }
194     }
195
196     /// Gets access to the buffer pool that this worker is attached to. This can
197     /// be used to create more deques which share the same buffer pool as this
198     /// deque.
199     pub fn pool<'a>(&'a self) -> &'a BufferPool<T> {
200         &self.deque.pool
201     }
202 }
203
204 impl<T: Send> Stealer<T> {
205     /// Steals work off the end of the queue (opposite of the worker's end)
206     pub fn steal(&self) -> Stolen<T> {
207         unsafe { self.deque.steal() }
208     }
209
210     /// Gets access to the buffer pool that this stealer is attached to. This
211     /// can be used to create more deques which share the same buffer pool as
212     /// this deque.
213     pub fn pool<'a>(&'a self) -> &'a BufferPool<T> {
214         &self.deque.pool
215     }
216 }
217
218 impl<T: Send> Clone for Stealer<T> {
219     fn clone(&self) -> Stealer<T> {
220         Stealer { deque: self.deque.clone(), noshare: marker::NoShare }
221     }
222 }
223
224 // Almost all of this code can be found directly in the paper so I'm not
225 // personally going to heavily comment what's going on here.
226
227 impl<T: Send> Deque<T> {
228     fn new(mut pool: BufferPool<T>) -> Deque<T> {
229         let buf = pool.alloc(MIN_BITS);
230         Deque {
231             bottom: AtomicInt::new(0),
232             top: AtomicInt::new(0),
233             array: AtomicPtr::new(unsafe { transmute(buf) }),
234             pool: pool,
235         }
236     }
237
238     unsafe fn push(&self, data: T) {
239         let mut b = self.bottom.load(SeqCst);
240         let t = self.top.load(SeqCst);
241         let mut a = self.array.load(SeqCst);
242         let size = b - t;
243         if size >= (*a).size() - 1 {
244             // You won't find this code in the chase-lev deque paper. This is
245             // alluded to in a small footnote, however. We always free a buffer
246             // when growing in order to prevent leaks.
247             a = self.swap_buffer(b, a, (*a).resize(b, t, 1));
248             b = self.bottom.load(SeqCst);
249         }
250         (*a).put(b, data);
251         self.bottom.store(b + 1, SeqCst);
252     }
253
254     unsafe fn pop(&self) -> Option<T> {
255         let b = self.bottom.load(SeqCst);
256         let a = self.array.load(SeqCst);
257         let b = b - 1;
258         self.bottom.store(b, SeqCst);
259         let t = self.top.load(SeqCst);
260         let size = b - t;
261         if size < 0 {
262             self.bottom.store(t, SeqCst);
263             return None;
264         }
265         let data = (*a).get(b);
266         if size > 0 {
267             self.maybe_shrink(b, t);
268             return Some(data);
269         }
270         if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
271             self.bottom.store(t + 1, SeqCst);
272             return Some(data);
273         } else {
274             self.bottom.store(t + 1, SeqCst);
275             forget(data); // someone else stole this value
276             return None;
277         }
278     }
279
280     unsafe fn steal(&self) -> Stolen<T> {
281         let t = self.top.load(SeqCst);
282         let old = self.array.load(SeqCst);
283         let b = self.bottom.load(SeqCst);
284         let a = self.array.load(SeqCst);
285         let size = b - t;
286         if size <= 0 { return Empty }
287         if size % (*a).size() == 0 {
288             if a == old && t == self.top.load(SeqCst) {
289                 return Empty
290             }
291             return Abort
292         }
293         let data = (*a).get(t);
294         if self.top.compare_and_swap(t, t + 1, SeqCst) == t {
295             Data(data)
296         } else {
297             forget(data); // someone else stole this value
298             Abort
299         }
300     }
301
302     unsafe fn maybe_shrink(&self, b: int, t: int) {
303         let a = self.array.load(SeqCst);
304         if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) {
305             self.swap_buffer(b, a, (*a).resize(b, t, -1));
306         }
307     }
308
309     // Helper routine not mentioned in the paper which is used in growing and
310     // shrinking buffers to swap in a new buffer into place. As a bit of a
311     // recap, the whole point that we need a buffer pool rather than just
312     // calling malloc/free directly is that stealers can continue using buffers
313     // after this method has called 'free' on it. The continued usage is simply
314     // a read followed by a forget, but we must make sure that the memory can
315     // continue to be read after we flag this buffer for reclamation.
316     unsafe fn swap_buffer(&self, b: int, old: *mut Buffer<T>,
317                           buf: Buffer<T>) -> *mut Buffer<T> {
318         let newbuf: *mut Buffer<T> = transmute(box buf);
319         self.array.store(newbuf, SeqCst);
320         let ss = (*newbuf).size();
321         self.bottom.store(b + ss, SeqCst);
322         let t = self.top.load(SeqCst);
323         if self.top.compare_and_swap(t, t + ss, SeqCst) != t {
324             self.bottom.store(b, SeqCst);
325         }
326         self.pool.free(transmute(old));
327         return newbuf;
328     }
329 }
330
331
332 #[unsafe_destructor]
333 impl<T: Send> Drop for Deque<T> {
334     fn drop(&mut self) {
335         let t = self.top.load(SeqCst);
336         let b = self.bottom.load(SeqCst);
337         let a = self.array.load(SeqCst);
338         // Free whatever is leftover in the dequeue, and then move the buffer
339         // back into the pool.
340         for i in range(t, b) {
341             let _: T = unsafe { (*a).get(i) };
342         }
343         self.pool.free(unsafe { transmute(a) });
344     }
345 }
346
347 #[inline]
348 fn buffer_alloc_size<T>(log_size: uint) -> uint {
349     (1 << log_size) * size_of::<T>()
350 }
351
352 impl<T: Send> Buffer<T> {
353     unsafe fn new(log_size: uint) -> Buffer<T> {
354         let size = buffer_alloc_size::<T>(log_size);
355         let buffer = allocate(size, min_align_of::<T>());
356         Buffer {
357             storage: buffer as *const T,
358             log_size: log_size,
359         }
360     }
361
362     fn size(&self) -> int { 1 << self.log_size }
363
364     // Apparently LLVM cannot optimize (foo % (1 << bar)) into this implicitly
365     fn mask(&self) -> int { (1 << self.log_size) - 1 }
366
367     unsafe fn elem(&self, i: int) -> *const T {
368         self.storage.offset(i & self.mask())
369     }
370
371     // This does not protect against loading duplicate values of the same cell,
372     // nor does this clear out the contents contained within. Hence, this is a
373     // very unsafe method which the caller needs to treat specially in case a
374     // race is lost.
375     unsafe fn get(&self, i: int) -> T {
376         ptr::read(self.elem(i))
377     }
378
379     // Unsafe because this unsafely overwrites possibly uninitialized or
380     // initialized data.
381     unsafe fn put(&self, i: int, t: T) {
382         ptr::write(self.elem(i) as *mut T, t);
383     }
384
385     // Again, unsafe because this has incredibly dubious ownership violations.
386     // It is assumed that this buffer is immediately dropped.
387     unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> {
388         // NB: not entirely obvious, but thanks to 2's complement,
389         // casting delta to uint and then adding gives the desired
390         // effect.
391         let buf = Buffer::new(self.log_size + delta as uint);
392         for i in range(t, b) {
393             buf.put(i, self.get(i));
394         }
395         return buf;
396     }
397 }
398
399 #[unsafe_destructor]
400 impl<T: Send> Drop for Buffer<T> {
401     fn drop(&mut self) {
402         // It is assumed that all buffers are empty on drop.
403         let size = buffer_alloc_size::<T>(self.log_size);
404         unsafe { deallocate(self.storage as *mut u8, size, min_align_of::<T>()) }
405     }
406 }
407
408 #[cfg(test)]
409 mod tests {
410     use std::prelude::*;
411     use super::{Data, BufferPool, Abort, Empty, Worker, Stealer};
412
413     use std::mem;
414     use std::rt::thread::Thread;
415     use std::rand;
416     use std::rand::Rng;
417     use atomics::{AtomicBool, INIT_ATOMIC_BOOL, SeqCst,
418                   AtomicUint, INIT_ATOMIC_UINT};
419     use std::vec;
420
421     #[test]
422     fn smoke() {
423         let pool = BufferPool::new();
424         let (w, s) = pool.deque();
425         assert_eq!(w.pop(), None);
426         assert_eq!(s.steal(), Empty);
427         w.push(1i);
428         assert_eq!(w.pop(), Some(1));
429         w.push(1);
430         assert_eq!(s.steal(), Data(1));
431         w.push(1);
432         assert_eq!(s.clone().steal(), Data(1));
433     }
434
435     #[test]
436     fn stealpush() {
437         static AMT: int = 100000;
438         let pool = BufferPool::<int>::new();
439         let (w, s) = pool.deque();
440         let t = Thread::start(proc() {
441             let mut left = AMT;
442             while left > 0 {
443                 match s.steal() {
444                     Data(i) => {
445                         assert_eq!(i, 1);
446                         left -= 1;
447                     }
448                     Abort | Empty => {}
449                 }
450             }
451         });
452
453         for _ in range(0, AMT) {
454             w.push(1);
455         }
456
457         t.join();
458     }
459
460     #[test]
461     fn stealpush_large() {
462         static AMT: int = 100000;
463         let pool = BufferPool::<(int, int)>::new();
464         let (w, s) = pool.deque();
465         let t = Thread::start(proc() {
466             let mut left = AMT;
467             while left > 0 {
468                 match s.steal() {
469                     Data((1, 10)) => { left -= 1; }
470                     Data(..) => fail!(),
471                     Abort | Empty => {}
472                 }
473             }
474         });
475
476         for _ in range(0, AMT) {
477             w.push((1, 10));
478         }
479
480         t.join();
481     }
482
483     fn stampede(w: Worker<Box<int>>, s: Stealer<Box<int>>,
484                 nthreads: int, amt: uint) {
485         for _ in range(0, amt) {
486             w.push(box 20);
487         }
488         let mut remaining = AtomicUint::new(amt);
489         let unsafe_remaining: *mut AtomicUint = &mut remaining;
490
491         let threads = range(0, nthreads).map(|_| {
492             let s = s.clone();
493             Thread::start(proc() {
494                 unsafe {
495                     while (*unsafe_remaining).load(SeqCst) > 0 {
496                         match s.steal() {
497                             Data(box 20) => {
498                                 (*unsafe_remaining).fetch_sub(1, SeqCst);
499                             }
500                             Data(..) => fail!(),
501                             Abort | Empty => {}
502                         }
503                     }
504                 }
505             })
506         }).collect::<Vec<Thread<()>>>();
507
508         while remaining.load(SeqCst) > 0 {
509             match w.pop() {
510                 Some(box 20) => { remaining.fetch_sub(1, SeqCst); }
511                 Some(..) => fail!(),
512                 None => {}
513             }
514         }
515
516         for thread in threads.move_iter() {
517             thread.join();
518         }
519     }
520
521     #[test]
522     fn run_stampede() {
523         let pool = BufferPool::<Box<int>>::new();
524         let (w, s) = pool.deque();
525         stampede(w, s, 8, 10000);
526     }
527
528     #[test]
529     fn many_stampede() {
530         static AMT: uint = 4;
531         let pool = BufferPool::<Box<int>>::new();
532         let threads = range(0, AMT).map(|_| {
533             let (w, s) = pool.deque();
534             Thread::start(proc() {
535                 stampede(w, s, 4, 10000);
536             })
537         }).collect::<Vec<Thread<()>>>();
538
539         for thread in threads.move_iter() {
540             thread.join();
541         }
542     }
543
544     #[test]
545     fn stress() {
546         static AMT: int = 100000;
547         static NTHREADS: int = 8;
548         static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
549         static mut HITS: AtomicUint = INIT_ATOMIC_UINT;
550         let pool = BufferPool::<int>::new();
551         let (w, s) = pool.deque();
552
553         let threads = range(0, NTHREADS).map(|_| {
554             let s = s.clone();
555             Thread::start(proc() {
556                 unsafe {
557                     loop {
558                         match s.steal() {
559                             Data(2) => { HITS.fetch_add(1, SeqCst); }
560                             Data(..) => fail!(),
561                             _ if DONE.load(SeqCst) => break,
562                             _ => {}
563                         }
564                     }
565                 }
566             })
567         }).collect::<Vec<Thread<()>>>();
568
569         let mut rng = rand::task_rng();
570         let mut expected = 0;
571         while expected < AMT {
572             if rng.gen_range(0i, 3) == 2 {
573                 match w.pop() {
574                     None => {}
575                     Some(2) => unsafe { HITS.fetch_add(1, SeqCst); },
576                     Some(_) => fail!(),
577                 }
578             } else {
579                 expected += 1;
580                 w.push(2);
581             }
582         }
583
584         unsafe {
585             while HITS.load(SeqCst) < AMT as uint {
586                 match w.pop() {
587                     None => {}
588                     Some(2) => { HITS.fetch_add(1, SeqCst); },
589                     Some(_) => fail!(),
590                 }
591             }
592             DONE.store(true, SeqCst);
593         }
594
595         for thread in threads.move_iter() {
596             thread.join();
597         }
598
599         assert_eq!(unsafe { HITS.load(SeqCst) }, expected as uint);
600     }
601
602     #[test]
603     #[ignore(cfg(windows))] // apparently windows scheduling is weird?
604     fn no_starvation() {
605         static AMT: int = 10000;
606         static NTHREADS: int = 4;
607         static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
608         let pool = BufferPool::<(int, uint)>::new();
609         let (w, s) = pool.deque();
610
611         let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| {
612             let s = s.clone();
613             let unique_box = box AtomicUint::new(0);
614             let thread_box = unsafe {
615                 *mem::transmute::<&Box<AtomicUint>,
616                                   *const *mut AtomicUint>(&unique_box)
617             };
618             (Thread::start(proc() {
619                 unsafe {
620                     loop {
621                         match s.steal() {
622                             Data((1, 2)) => {
623                                 (*thread_box).fetch_add(1, SeqCst);
624                             }
625                             Data(..) => fail!(),
626                             _ if DONE.load(SeqCst) => break,
627                             _ => {}
628                         }
629                     }
630                 }
631             }), unique_box)
632         }));
633
634         let mut rng = rand::task_rng();
635         let mut myhit = false;
636         'outer: loop {
637             for _ in range(0, rng.gen_range(0, AMT)) {
638                 if !myhit && rng.gen_range(0i, 3) == 2 {
639                     match w.pop() {
640                         None => {}
641                         Some((1, 2)) => myhit = true,
642                         Some(_) => fail!(),
643                     }
644                 } else {
645                     w.push((1, 2));
646                 }
647             }
648
649             for slot in hits.iter() {
650                 let amt = slot.load(SeqCst);
651                 if amt == 0 { continue 'outer; }
652             }
653             if myhit {
654                 break
655             }
656         }
657
658         unsafe { DONE.store(true, SeqCst); }
659
660         for thread in threads.move_iter() {
661             thread.join();
662         }
663     }
664 }