1 //! A single-producer single-consumer concurrent queue
3 //! This module contains the implementation of an SPSC queue which can be used
4 //! concurrently between two threads. This data structure is safe to use and
5 //! enforces the semantics that there is one pusher and one popper.
7 // The original implementation is based off:
8 // https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
10 // Note that back when the code was imported, it was licensed under the BSD-2-Clause license:
11 // http://web.archive.org/web/20110411011612/https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
13 // The original author of the code agreed to relicense it under `MIT OR Apache-2.0` in 2017, so as
14 // of today the license of this file is the same as the rest of the codebase:
15 // https://github.com/rust-lang/rust/pull/42149
17 #[cfg(all(test, not(target_os = "emscripten")))]
20 use core::cell::UnsafeCell;
23 use crate::boxed::Box;
24 use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
26 use super::cache_aligned::CacheAligned;
28 // Node within the linked list queue of messages to send
30 // FIXME: this could be an uninitialized T if we're careful enough, and
31 // that would reduce memory usage (and be a bit faster).
33 value: Option<T>, // nullable for re-use of nodes
34 cached: bool, // This node goes into the node cache
35 next: AtomicPtr<Node<T>>, // next node in the queue
38 /// The single-producer single-consumer queue. This structure is not cloneable,
39 /// but it can be safely shared in an Arc if it is guaranteed that there
40 /// is only one popper and one pusher touching the queue at any one point in
42 pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> {
44 consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
47 producer: CacheAligned<Producer<T, ProducerAddition>>,
50 struct Consumer<T, Addition> {
51 tail: UnsafeCell<*mut Node<T>>, // where to pop from
52 tail_prev: AtomicPtr<Node<T>>, // where to pop from
53 cache_bound: usize, // maximum cache size
54 cached_nodes: AtomicUsize, // number of nodes marked as cacheable
58 struct Producer<T, Addition> {
59 head: UnsafeCell<*mut Node<T>>, // where to push to
60 first: UnsafeCell<*mut Node<T>>, // where to get new nodes from
61 tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
65 unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> {}
67 unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> {}
70 fn new() -> *mut Node<T> {
71 Box::into_raw(box Node {
74 next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
79 impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
80 /// Creates a new queue. With given additional elements in the producer and
81 /// consumer portions of the queue.
83 /// Due to the performance implications of cache-contention,
84 /// we wish to keep fields used mainly by the producer on a separate cache
85 /// line than those used by the consumer.
86 /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
87 /// allocate one for small fields, so we allow users to insert additional
88 /// fields into the cache lines already allocated by this for the producer
91 /// This is unsafe as the type system doesn't enforce a single
92 /// consumer-producer relationship. It also allows the consumer to `pop`
93 /// items while there is a `peek` active due to all methods having a
94 /// non-mutable receiver.
98 /// * `bound` - This queue implementation is implemented with a linked
99 /// list, and this means that a push is always a malloc. In
100 /// order to amortize this cost, an internal cache of nodes is
101 /// maintained to prevent a malloc from always being
102 /// necessary. This bound is the limit on the size of the
103 /// cache (if desired). If the value is 0, then the cache has
104 /// no bound. Otherwise, the cache will never grow larger than
105 /// `bound` (although the queue itself could be much larger.
106 pub unsafe fn with_additions(
108 producer_addition: ProducerAddition,
109 consumer_addition: ConsumerAddition,
111 let n1 = Node::new();
112 let n2 = Node::new();
113 (*n1).next.store(n2, Ordering::Relaxed);
115 consumer: CacheAligned::new(Consumer {
116 tail: UnsafeCell::new(n2),
117 tail_prev: AtomicPtr::new(n1),
119 cached_nodes: AtomicUsize::new(0),
120 addition: consumer_addition,
122 producer: CacheAligned::new(Producer {
123 head: UnsafeCell::new(n2),
124 first: UnsafeCell::new(n1),
125 tail_copy: UnsafeCell::new(n1),
126 addition: producer_addition,
131 /// Pushes a new value onto this queue. Note that to use this function
132 /// safely, it must be externally guaranteed that there is only one pusher.
133 pub fn push(&self, t: T) {
135 // Acquire a node (which either uses a cached one or allocates a new
136 // one), and then append this to the 'head' node.
137 let n = self.alloc();
138 assert!((*n).value.is_none());
139 (*n).value = Some(t);
140 (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
141 (**self.producer.head.get()).next.store(n, Ordering::Release);
142 *(&self.producer.head).get() = n;
146 unsafe fn alloc(&self) -> *mut Node<T> {
147 // First try to see if we can consume the 'first' node for our uses.
148 if *self.producer.first.get() != *self.producer.tail_copy.get() {
149 let ret = *self.producer.first.get();
150 *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
153 // If the above fails, then update our copy of the tail and try
155 *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire);
156 if *self.producer.first.get() != *self.producer.tail_copy.get() {
157 let ret = *self.producer.first.get();
158 *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
161 // If all of that fails, then we have to allocate a new node
162 // (there's nothing in the node cache).
166 /// Attempts to pop a value from this queue. Remember that to use this type
167 /// safely you must ensure that there is only one popper at a time.
168 pub fn pop(&self) -> Option<T> {
170 // The `tail` node is not actually a used node, but rather a
171 // sentinel from where we should start popping from. Hence, look at
172 // tail's next field and see if we can use it. If we do a pop, then
173 // the current tail node is a candidate for going into the cache.
174 let tail = *self.consumer.tail.get();
175 let next = (*tail).next.load(Ordering::Acquire);
179 assert!((*next).value.is_some());
180 let ret = (*next).value.take();
182 *self.consumer.0.tail.get() = next;
183 if self.consumer.cache_bound == 0 {
184 self.consumer.tail_prev.store(tail, Ordering::Release);
186 let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
187 if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
188 self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
189 (*tail).cached = true;
193 self.consumer.tail_prev.store(tail, Ordering::Release);
195 (*self.consumer.tail_prev.load(Ordering::Relaxed))
197 .store(next, Ordering::Relaxed);
198 // We have successfully erased all references to 'tail', so
199 // now we can safely drop it.
200 let _: Box<Node<T>> = Box::from_raw(tail);
207 /// Attempts to peek at the head of the queue, returning `None` if the queue
208 /// has no data currently
211 /// The reference returned is invalid if it is not used before the consumer
212 /// pops the value off the queue. If the producer then pushes another value
213 /// onto the queue, it will overwrite the value pointed to by the reference.
214 pub fn peek(&self) -> Option<&mut T> {
215 // This is essentially the same as above with all the popping bits
218 let tail = *self.consumer.tail.get();
219 let next = (*tail).next.load(Ordering::Acquire);
220 if next.is_null() { None } else { (*next).value.as_mut() }
224 pub fn producer_addition(&self) -> &ProducerAddition {
225 &self.producer.addition
228 pub fn consumer_addition(&self) -> &ConsumerAddition {
229 &self.consumer.addition
233 impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
236 let mut cur = *self.producer.first.get();
237 while !cur.is_null() {
238 let next = (*cur).next.load(Ordering::Relaxed);
239 let _n: Box<Node<T>> = Box::from_raw(cur);