1 /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
2 * Redistribution and use in source and binary forms, with or without
3 * modification, are permitted provided that the following conditions are met:
5 * 1. Redistributions of source code must retain the above copyright notice,
6 * this list of conditions and the following disclaimer.
8 * 2. Redistributions in binary form must reproduce the above copyright
9 * notice, this list of conditions and the following disclaimer in the
10 * documentation and/or other materials provided with the distribution.
12 * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
13 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
14 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
15 * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
16 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
17 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
18 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
19 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
20 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
21 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23 * The views and conclusions contained in the software and documentation are
24 * those of the authors and should not be interpreted as representing official
25 * policies, either expressed or implied, of Dmitry Vyukov.
28 // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
30 //! A single-producer single-consumer concurrent queue
32 //! This module contains the implementation of an SPSC queue which can be used
33 //! concurrently between two tasks. This data structure is safe to use and
34 //! enforces the semantics that there is one pusher and one popper.
40 use alloc::boxed::Box;
42 use core::cell::UnsafeCell;
44 use sync::atomic::{AtomicPtr, AtomicUint, Ordering};
46 // Node within the linked list queue of messages to send
48 // FIXME: this could be an uninitialized T if we're careful enough, and
49 // that would reduce memory usage (and be a bit faster).
51 value: Option<T>, // nullable for re-use of nodes
52 next: AtomicPtr<Node<T>>, // next node in the queue
55 /// The single-producer single-consumer queue. This structure is not cloneable,
56 /// but it can be safely shared in an Arc if it is guaranteed that there
57 /// is only one popper and one pusher touching the queue at any one point in
61 tail: UnsafeCell<*mut Node<T>>, // where to pop from
62 tail_prev: AtomicPtr<Node<T>>, // where to pop from
65 head: UnsafeCell<*mut Node<T>>, // where to push to
66 first: UnsafeCell<*mut Node<T>>, // where to get new nodes from
67 tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
69 // Cache maintenance fields. Additions and subtractions are stored
70 // separately in order to allow them to use nonatomic addition/subtraction.
72 cache_additions: AtomicUint,
73 cache_subtractions: AtomicUint,
76 unsafe impl<T: Send> Send for Queue<T> { }
78 unsafe impl<T: Send> Sync for Queue<T> { }
80 impl<T: Send> Node<T> {
81 fn new() -> *mut Node<T> {
83 mem::transmute(box Node {
85 next: AtomicPtr::new(0 as *mut Node<T>),
91 impl<T: Send> Queue<T> {
92 /// Creates a new queue.
94 /// This is unsafe as the type system doesn't enforce a single
95 /// consumer-producer relationship. It also allows the consumer to `pop`
96 /// items while there is a `peek` active due to all methods having a
97 /// non-mutable receiver.
101 /// * `bound` - This queue implementation is implemented with a linked
102 /// list, and this means that a push is always a malloc. In
103 /// order to amortize this cost, an internal cache of nodes is
104 /// maintained to prevent a malloc from always being
105 /// necessary. This bound is the limit on the size of the
106 /// cache (if desired). If the value is 0, then the cache has
107 /// no bound. Otherwise, the cache will never grow larger than
108 /// `bound` (although the queue itself could be much larger.
109 pub unsafe fn new(bound: uint) -> Queue<T> {
110 let n1 = Node::new();
111 let n2 = Node::new();
112 (*n1).next.store(n2, Ordering::Relaxed);
114 tail: UnsafeCell::new(n2),
115 tail_prev: AtomicPtr::new(n1),
116 head: UnsafeCell::new(n2),
117 first: UnsafeCell::new(n1),
118 tail_copy: UnsafeCell::new(n1),
120 cache_additions: AtomicUint::new(0),
121 cache_subtractions: AtomicUint::new(0),
125 /// Pushes a new value onto this queue. Note that to use this function
126 /// safely, it must be externally guaranteed that there is only one pusher.
127 pub fn push(&self, t: T) {
129 // Acquire a node (which either uses a cached one or allocates a new
130 // one), and then append this to the 'head' node.
131 let n = self.alloc();
132 assert!((*n).value.is_none());
133 (*n).value = Some(t);
134 (*n).next.store(0 as *mut Node<T>, Ordering::Relaxed);
135 (**self.head.get()).next.store(n, Ordering::Release);
136 *self.head.get() = n;
140 unsafe fn alloc(&self) -> *mut Node<T> {
141 // First try to see if we can consume the 'first' node for our uses.
142 // We try to avoid as many atomic instructions as possible here, so
143 // the addition to cache_subtractions is not atomic (plus we're the
144 // only one subtracting from the cache).
145 if *self.first.get() != *self.tail_copy.get() {
146 if self.cache_bound > 0 {
147 let b = self.cache_subtractions.load(Ordering::Relaxed);
148 self.cache_subtractions.store(b + 1, Ordering::Relaxed);
150 let ret = *self.first.get();
151 *self.first.get() = (*ret).next.load(Ordering::Relaxed);
154 // If the above fails, then update our copy of the tail and try
156 *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire);
157 if *self.first.get() != *self.tail_copy.get() {
158 if self.cache_bound > 0 {
159 let b = self.cache_subtractions.load(Ordering::Relaxed);
160 self.cache_subtractions.store(b + 1, Ordering::Relaxed);
162 let ret = *self.first.get();
163 *self.first.get() = (*ret).next.load(Ordering::Relaxed);
166 // If all of that fails, then we have to allocate a new node
167 // (there's nothing in the node cache).
171 /// Attempts to pop a value from this queue. Remember that to use this type
172 /// safely you must ensure that there is only one popper at a time.
173 pub fn pop(&self) -> Option<T> {
175 // The `tail` node is not actually a used node, but rather a
176 // sentinel from where we should start popping from. Hence, look at
177 // tail's next field and see if we can use it. If we do a pop, then
178 // the current tail node is a candidate for going into the cache.
179 let tail = *self.tail.get();
180 let next = (*tail).next.load(Ordering::Acquire);
181 if next.is_null() { return None }
182 assert!((*next).value.is_some());
183 let ret = (*next).value.take();
185 *self.tail.get() = next;
186 if self.cache_bound == 0 {
187 self.tail_prev.store(tail, Ordering::Release);
189 // FIXME: this is dubious with overflow.
190 let additions = self.cache_additions.load(Ordering::Relaxed);
191 let subtractions = self.cache_subtractions.load(Ordering::Relaxed);
192 let size = additions - subtractions;
194 if size < self.cache_bound {
195 self.tail_prev.store(tail, Ordering::Release);
196 self.cache_additions.store(additions + 1, Ordering::Relaxed);
198 (*self.tail_prev.load(Ordering::Relaxed))
199 .next.store(next, Ordering::Relaxed);
200 // We have successfully erased all references to 'tail', so
201 // now we can safely drop it.
202 let _: Box<Node<T>> = mem::transmute(tail);
209 /// Attempts to peek at the head of the queue, returning `None` if the queue
210 /// has no data currently
213 /// The reference returned is invalid if it is not used before the consumer
214 /// pops the value off the queue. If the producer then pushes another value
215 /// onto the queue, it will overwrite the value pointed to by the reference.
216 pub fn peek<'a>(&'a self) -> Option<&'a mut T> {
217 // This is essentially the same as above with all the popping bits
220 let tail = *self.tail.get();
221 let next = (*tail).next.load(Ordering::Acquire);
222 if next.is_null() { return None }
223 return (*next).value.as_mut();
229 impl<T: Send> Drop for Queue<T> {
232 let mut cur = *self.first.get();
233 while !cur.is_null() {
234 let next = (*cur).next.load(Ordering::Relaxed);
235 let _n: Box<Node<T>> = mem::transmute(cur);
249 use sync::mpsc::channel;
254 let queue = Queue::new(0);
257 assert_eq!(queue.pop(), Some(1i));
258 assert_eq!(queue.pop(), Some(2));
259 assert_eq!(queue.pop(), None);
262 assert_eq!(queue.pop(), Some(3));
263 assert_eq!(queue.pop(), Some(4));
264 assert_eq!(queue.pop(), None);
271 let queue = Queue::new(0);
272 queue.push(vec![1i]);
274 // Ensure the borrowchecker works
276 Some(vec) => match vec.as_slice() {
277 // Note that `pop` is not allowed here due to borrow
281 None => unreachable!()
291 let q = Queue::new(0);
300 let q = Queue::new(0);
303 assert_eq!(q.pop(), Some(1));
304 assert_eq!(q.pop(), Some(2));
305 assert_eq!(q.pop(), None);
308 assert_eq!(q.pop(), Some(3));
309 assert_eq!(q.pop(), Some(4));
310 assert_eq!(q.pop(), None);
321 unsafe fn stress_bound(bound: uint) {
322 let q = Arc::new(Queue::new(bound));
324 let (tx, rx) = channel();
326 let _t = Thread::spawn(move|| {
327 for _ in range(0u, 100000) {
336 tx.send(()).unwrap();
338 for _ in range(0i, 100000) {