1 /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
2 * Redistribution and use in source and binary forms, with or without
3 * modification, are permitted provided that the following conditions are met:
5 * 1. Redistributions of source code must retain the above copyright notice,
6 * this list of conditions and the following disclaimer.
8 * 2. Redistributions in binary form must reproduce the above copyright
9 * notice, this list of conditions and the following disclaimer in the
10 * documentation and/or other materials provided with the distribution.
12 * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
13 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
14 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
15 * EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
16 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
17 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
18 * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
19 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
20 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
21 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23 * The views and conclusions contained in the software and documentation are
24 * those of the authors and should not be interpreted as representing official
25 * policies, either expressed or implied, of Dmitry Vyukov.
28 //! A mostly lock-free multi-producer, single consumer queue.
30 //! This module implements an intrusive MPSC queue. This queue is incredibly
31 //! unsafe (due to use of unsafe pointers for nodes), and hence is not public.
35 // http://www.1024cores.net/home/lock-free-algorithms
36 // /queues/intrusive-mpsc-node-based-queue
44 // NB: all links are done as AtomicUint instead of AtomicPtr to allow for static
48 pub next: atomics::AtomicUint,
52 pub struct DummyNode {
53 pub next: atomics::AtomicUint,
57 pub head: atomics::AtomicUint,
58 pub tail: Unsafe<*mut Node<T>>,
62 impl<T: Send> Queue<T> {
63 pub fn new() -> Queue<T> {
65 head: atomics::AtomicUint::new(0),
66 tail: Unsafe::new(0 as *mut Node<T>),
68 next: atomics::AtomicUint::new(0),
73 pub unsafe fn push(&self, node: *mut Node<T>) {
74 (*node).next.store(0, atomics::Release);
75 let prev = self.head.swap(node as uint, atomics::AcqRel);
77 // Note that this code is slightly modified to allow static
78 // initialization of these queues with rust's flavor of static
81 self.stub.next.store(node as uint, atomics::Release);
83 let prev = prev as *mut Node<T>;
84 (*prev).next.store(node as uint, atomics::Release);
88 /// You'll note that the other MPSC queue in std::sync is non-intrusive and
89 /// returns a `PopResult` here to indicate when the queue is inconsistent.
90 /// An "inconsistent state" in the other queue means that a pusher has
91 /// pushed, but it hasn't finished linking the rest of the chain.
93 /// This queue also suffers from this problem, but I currently haven't been
94 /// able to detangle when this actually happens. This code is translated
95 /// verbatim from the website above, and is more complicated than the
96 /// non-intrusive version.
98 /// Right now consumers of this queue must be ready for this fact. Just
99 /// because `pop` returns `None` does not mean that there is not data
101 pub unsafe fn pop(&self) -> Option<*mut Node<T>> {
102 let tail = *self.tail.get();
103 let mut tail = if !tail.is_null() {tail} else {
104 mem::transmute(&self.stub)
106 let mut next = (*tail).next(atomics::Relaxed);
107 if tail as uint == &self.stub as *DummyNode as uint {
111 *self.tail.get() = next;
113 next = (*next).next(atomics::Relaxed);
116 *self.tail.get() = next;
119 let head = self.head.load(atomics::Acquire) as *mut Node<T>;
123 let stub = mem::transmute(&self.stub);
125 next = (*tail).next(atomics::Relaxed);
127 *self.tail.get() = next;
134 impl<T: Send> Node<T> {
135 pub fn new(t: T) -> Node<T> {
138 next: atomics::AtomicUint::new(0),
141 pub unsafe fn next(&self, ord: atomics::Ordering) -> *mut Node<T> {
142 mem::transmute::<uint, *mut Node<T>>(self.next.load(ord))