1 /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
2 * Redistribution and use in source and binary forms, with or without
3 * modification, are permitted provided that the following conditions are met:
5 * 1. Redistributions of source code must retain the above copyright notice,
6 * this list of conditions and the following disclaimer.
8 * 2. Redistributions in binary form must reproduce the above copyright
9 * notice, this list of conditions and the following disclaimer in the
10 * documentation and/or other materials provided with the distribution.
12 * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
13 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
14 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
15 * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
16 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
17 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
18 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
19 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
20 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
21 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23 * The views and conclusions contained in the software and documentation are
24 * those of the authors and should not be interpreted as representing official
25 * policies, either expressed or implied, of Dmitry Vyukov.
28 #![allow(missing_doc, dead_code)]
30 // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
34 use num::next_power_of_two;
35 use option::{Option, Some, None};
36 use sync::arc::UnsafeArc;
37 use sync::atomics::{AtomicUint,Relaxed,Release,Acquire};
50 enqueue_pos: AtomicUint,
52 dequeue_pos: AtomicUint,
57 priv state: UnsafeArc<State<T>>,
60 impl<T: Send> State<T> {
61 fn with_capacity(capacity: uint) -> State<T> {
62 let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 {
66 // use next power of 2 as capacity
67 next_power_of_two(capacity)
72 let buffer = slice::from_fn(capacity, |i| {
73 Node { sequence:AtomicUint::new(i), value: None }
80 enqueue_pos: AtomicUint::new(0),
82 dequeue_pos: AtomicUint::new(0),
87 fn push(&mut self, value: T) -> bool {
89 let mut pos = self.enqueue_pos.load(Relaxed);
91 let node = &mut self.buffer[pos & mask];
92 let seq = node.sequence.load(Acquire);
93 let diff: int = seq as int - pos as int;
96 let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed);
97 if enqueue_pos == pos {
98 node.value = Some(value);
99 node.sequence.store(pos+1, Release);
107 pos = self.enqueue_pos.load(Relaxed);
113 fn pop(&mut self) -> Option<T> {
114 let mask = self.mask;
115 let mut pos = self.dequeue_pos.load(Relaxed);
117 let node = &mut self.buffer[pos & mask];
118 let seq = node.sequence.load(Acquire);
119 let diff: int = seq as int - (pos + 1) as int;
121 let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed);
122 if dequeue_pos == pos {
123 let value = node.value.take();
124 node.sequence.store(pos + mask + 1, Release);
132 pos = self.dequeue_pos.load(Relaxed);
138 impl<T: Send> Queue<T> {
139 pub fn with_capacity(capacity: uint) -> Queue<T> {
141 state: UnsafeArc::new(State::with_capacity(capacity))
145 pub fn push(&mut self, value: T) -> bool {
146 unsafe { (*self.state.get()).push(value) }
149 pub fn pop(&mut self) -> Option<T> {
150 unsafe { (*self.state.get()).pop() }
154 impl<T: Send> Clone for Queue<T> {
155 fn clone(&self) -> Queue<T> {
157 state: self.state.clone()
173 let mut q = Queue::with_capacity(nthreads*nmsgs);
174 assert_eq!(None, q.pop());
175 let (tx, rx) = channel();
177 for _ in range(0, nthreads) {
180 native::task::spawn(proc() {
182 for i in range(0, nmsgs) {
189 let mut completion_rxs = ~[];
190 for _ in range(0, nthreads) {
191 let (tx, rx) = channel();
192 completion_rxs.push(rx);
194 native::task::spawn(proc() {
202 if i == nmsgs { break }
210 for rx in completion_rxs.mut_iter() {
211 assert_eq!(nmsgs, rx.recv());
213 for _ in range(0, nthreads) {