1 //! A single-producer single-consumer concurrent queue
3 //! This module contains the implementation of an SPSC queue which can be used
4 //! concurrently between two threads. This data structure is safe to use and
5 //! enforces the semantics that there is one pusher and one popper.
7 // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
11 use core::cell::UnsafeCell;
13 use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
15 use super::cache_aligned::CacheAligned;
17 // Node within the linked list queue of messages to send
19 // FIXME: this could be an uninitialized T if we're careful enough, and
20 // that would reduce memory usage (and be a bit faster).
22 value: Option<T>, // nullable for re-use of nodes
23 cached: bool, // This node goes into the node cache
24 next: AtomicPtr<Node<T>>, // next node in the queue
27 /// The single-producer single-consumer queue. This structure is not cloneable,
28 /// but it can be safely shared in an Arc if it is guaranteed that there
29 /// is only one popper and one pusher touching the queue at any one point in
31 pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> {
33 consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
36 producer: CacheAligned<Producer<T, ProducerAddition>>,
39 struct Consumer<T, Addition> {
40 tail: UnsafeCell<*mut Node<T>>, // where to pop from
41 tail_prev: AtomicPtr<Node<T>>, // where to pop from
42 cache_bound: usize, // maximum cache size
43 cached_nodes: AtomicUsize, // number of nodes marked as cachable
47 struct Producer<T, Addition> {
48 head: UnsafeCell<*mut Node<T>>, // where to push to
49 first: UnsafeCell<*mut Node<T>>, // where to get new nodes from
50 tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
54 unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> { }
56 unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> { }
59 fn new() -> *mut Node<T> {
60 Box::into_raw(box Node {
63 next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
68 impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
70 /// Creates a new queue. With given additional elements in the producer and
71 /// consumer portions of the queue.
73 /// Due to the performance implications of cache-contention,
74 /// we wish to keep fields used mainly by the producer on a separate cache
75 /// line than those used by the consumer.
76 /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
77 /// allocate one for small fields, so we allow users to insert additional
78 /// fields into the cache lines already allocated by this for the producer
81 /// This is unsafe as the type system doesn't enforce a single
82 /// consumer-producer relationship. It also allows the consumer to `pop`
83 /// items while there is a `peek` active due to all methods having a
84 /// non-mutable receiver.
88 /// * `bound` - This queue implementation is implemented with a linked
89 /// list, and this means that a push is always a malloc. In
90 /// order to amortize this cost, an internal cache of nodes is
91 /// maintained to prevent a malloc from always being
92 /// necessary. This bound is the limit on the size of the
93 /// cache (if desired). If the value is 0, then the cache has
94 /// no bound. Otherwise, the cache will never grow larger than
95 /// `bound` (although the queue itself could be much larger.
96 pub unsafe fn with_additions(
98 producer_addition: ProducerAddition,
99 consumer_addition: ConsumerAddition,
101 let n1 = Node::new();
102 let n2 = Node::new();
103 (*n1).next.store(n2, Ordering::Relaxed);
105 consumer: CacheAligned::new(Consumer {
106 tail: UnsafeCell::new(n2),
107 tail_prev: AtomicPtr::new(n1),
109 cached_nodes: AtomicUsize::new(0),
110 addition: consumer_addition
112 producer: CacheAligned::new(Producer {
113 head: UnsafeCell::new(n2),
114 first: UnsafeCell::new(n1),
115 tail_copy: UnsafeCell::new(n1),
116 addition: producer_addition
121 /// Pushes a new value onto this queue. Note that to use this function
122 /// safely, it must be externally guaranteed that there is only one pusher.
123 pub fn push(&self, t: T) {
125 // Acquire a node (which either uses a cached one or allocates a new
126 // one), and then append this to the 'head' node.
127 let n = self.alloc();
128 assert!((*n).value.is_none());
129 (*n).value = Some(t);
130 (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
131 (**self.producer.head.get()).next.store(n, Ordering::Release);
132 *(&self.producer.head).get() = n;
136 unsafe fn alloc(&self) -> *mut Node<T> {
137 // First try to see if we can consume the 'first' node for our uses.
138 if *self.producer.first.get() != *self.producer.tail_copy.get() {
139 let ret = *self.producer.first.get();
140 *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
143 // If the above fails, then update our copy of the tail and try
145 *self.producer.0.tail_copy.get() =
146 self.consumer.tail_prev.load(Ordering::Acquire);
147 if *self.producer.first.get() != *self.producer.tail_copy.get() {
148 let ret = *self.producer.first.get();
149 *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
152 // If all of that fails, then we have to allocate a new node
153 // (there's nothing in the node cache).
157 /// Attempts to pop a value from this queue. Remember that to use this type
158 /// safely you must ensure that there is only one popper at a time.
159 pub fn pop(&self) -> Option<T> {
161 // The `tail` node is not actually a used node, but rather a
162 // sentinel from where we should start popping from. Hence, look at
163 // tail's next field and see if we can use it. If we do a pop, then
164 // the current tail node is a candidate for going into the cache.
165 let tail = *self.consumer.tail.get();
166 let next = (*tail).next.load(Ordering::Acquire);
167 if next.is_null() { return None }
168 assert!((*next).value.is_some());
169 let ret = (*next).value.take();
171 *self.consumer.0.tail.get() = next;
172 if self.consumer.cache_bound == 0 {
173 self.consumer.tail_prev.store(tail, Ordering::Release);
175 let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
176 if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
177 self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
178 (*tail).cached = true;
182 self.consumer.tail_prev.store(tail, Ordering::Release);
184 (*self.consumer.tail_prev.load(Ordering::Relaxed))
185 .next.store(next, Ordering::Relaxed);
186 // We have successfully erased all references to 'tail', so
187 // now we can safely drop it.
188 let _: Box<Node<T>> = Box::from_raw(tail);
195 /// Attempts to peek at the head of the queue, returning `None` if the queue
196 /// has no data currently
199 /// The reference returned is invalid if it is not used before the consumer
200 /// pops the value off the queue. If the producer then pushes another value
201 /// onto the queue, it will overwrite the value pointed to by the reference.
202 pub fn peek(&self) -> Option<&mut T> {
203 // This is essentially the same as above with all the popping bits
206 let tail = *self.consumer.tail.get();
207 let next = (*tail).next.load(Ordering::Acquire);
208 if next.is_null() { None } else { (*next).value.as_mut() }
212 pub fn producer_addition(&self) -> &ProducerAddition {
213 &self.producer.addition
216 pub fn consumer_addition(&self) -> &ConsumerAddition {
217 &self.consumer.addition
221 impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
224 let mut cur = *self.producer.first.get();
225 while !cur.is_null() {
226 let next = (*cur).next.load(Ordering::Relaxed);
227 let _n: Box<Node<T>> = Box::from_raw(cur);
234 #[cfg(all(test, not(target_os = "emscripten")))]
239 use sync::mpsc::channel;
244 let queue = Queue::with_additions(0, (), ());
247 assert_eq!(queue.pop(), Some(1));
248 assert_eq!(queue.pop(), Some(2));
249 assert_eq!(queue.pop(), None);
252 assert_eq!(queue.pop(), Some(3));
253 assert_eq!(queue.pop(), Some(4));
254 assert_eq!(queue.pop(), None);
261 let queue = Queue::with_additions(0, (), ());
264 // Ensure the borrowchecker works
267 assert_eq!(&*vec, &[1]);
269 None => unreachable!()
274 assert_eq!(&*vec, &[1]);
276 None => unreachable!()
284 let q: Queue<Box<_>> = Queue::with_additions(0, (), ());
293 let q = Queue::with_additions(0, (), ());
296 assert_eq!(q.pop(), Some(1));
297 assert_eq!(q.pop(), Some(2));
298 assert_eq!(q.pop(), None);
301 assert_eq!(q.pop(), Some(3));
302 assert_eq!(q.pop(), Some(4));
303 assert_eq!(q.pop(), None);
314 unsafe fn stress_bound(bound: usize) {
315 let q = Arc::new(Queue::with_additions(bound, (), ()));
317 let (tx, rx) = channel();
319 let _t = thread::spawn(move|| {
329 tx.send(()).unwrap();