]> git.lizzy.rs Git - rust.git/blob - library/std/src/sync/mpsc/spsc_queue.rs
61f91313ea96d79f645c4cfee272826ee5947de3
[rust.git] / library / std / src / sync / mpsc / spsc_queue.rs
1 //! A single-producer single-consumer concurrent queue
2 //!
3 //! This module contains the implementation of an SPSC queue which can be used
4 //! concurrently between two threads. This data structure is safe to use and
5 //! enforces the semantics that there is one pusher and one popper.
6
7 // The original implementation is based off:
8 // https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
9 //
10 // Note that back when the code was imported, it was licensed under the BSD-2-Clause license:
11 // http://web.archive.org/web/20110411011612/https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
12 //
13 // The original author of the code agreed to relicense it under `MIT OR Apache-2.0` in 2017, so as
14 // of today the license of this file is the same as the rest of the codebase:
15 // https://github.com/rust-lang/rust/pull/42149
16
17 #[cfg(all(test, not(target_os = "emscripten")))]
18 mod tests;
19
20 use core::cell::UnsafeCell;
21 use core::ptr;
22
23 use crate::boxed::Box;
24 use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
25
26 use super::cache_aligned::CacheAligned;
27
28 // Node within the linked list queue of messages to send
29 struct Node<T> {
30     // FIXME: this could be an uninitialized T if we're careful enough, and
31     //      that would reduce memory usage (and be a bit faster).
32     //      is it worth it?
33     value: Option<T>,         // nullable for re-use of nodes
34     cached: bool,             // This node goes into the node cache
35     next: AtomicPtr<Node<T>>, // next node in the queue
36 }
37
38 /// The single-producer single-consumer queue. This structure is not cloneable,
39 /// but it can be safely shared in an Arc if it is guaranteed that there
40 /// is only one popper and one pusher touching the queue at any one point in
41 /// time.
42 pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> {
43     // consumer fields
44     consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
45
46     // producer fields
47     producer: CacheAligned<Producer<T, ProducerAddition>>,
48 }
49
50 struct Consumer<T, Addition> {
51     tail: UnsafeCell<*mut Node<T>>, // where to pop from
52     tail_prev: AtomicPtr<Node<T>>,  // where to pop from
53     cache_bound: usize,             // maximum cache size
54     cached_nodes: AtomicUsize,      // number of nodes marked as cacheable
55     addition: Addition,
56 }
57
58 struct Producer<T, Addition> {
59     head: UnsafeCell<*mut Node<T>>,      // where to push to
60     first: UnsafeCell<*mut Node<T>>,     // where to get new nodes from
61     tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
62     addition: Addition,
63 }
64
65 unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> {}
66
67 unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> {}
68
69 impl<T> Node<T> {
70     fn new() -> *mut Node<T> {
71         Box::into_raw(box Node {
72             value: None,
73             cached: false,
74             next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
75         })
76     }
77 }
78
79 impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
80     /// Creates a new queue. With given additional elements in the producer and
81     /// consumer portions of the queue.
82     ///
83     /// Due to the performance implications of cache-contention,
84     /// we wish to keep fields used mainly by the producer on a separate cache
85     /// line than those used by the consumer.
86     /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
87     /// allocate one for small fields, so we allow users to insert additional
88     /// fields into the cache lines already allocated by this for the producer
89     /// and consumer.
90     ///
91     /// This is unsafe as the type system doesn't enforce a single
92     /// consumer-producer relationship. It also allows the consumer to `pop`
93     /// items while there is a `peek` active due to all methods having a
94     /// non-mutable receiver.
95     ///
96     /// # Arguments
97     ///
98     ///   * `bound` - This queue implementation is implemented with a linked
99     ///               list, and this means that a push is always a malloc. In
100     ///               order to amortize this cost, an internal cache of nodes is
101     ///               maintained to prevent a malloc from always being
102     ///               necessary. This bound is the limit on the size of the
103     ///               cache (if desired). If the value is 0, then the cache has
104     ///               no bound. Otherwise, the cache will never grow larger than
105     ///               `bound` (although the queue itself could be much larger.
106     pub unsafe fn with_additions(
107         bound: usize,
108         producer_addition: ProducerAddition,
109         consumer_addition: ConsumerAddition,
110     ) -> Self {
111         let n1 = Node::new();
112         let n2 = Node::new();
113         (*n1).next.store(n2, Ordering::Relaxed);
114         Queue {
115             consumer: CacheAligned::new(Consumer {
116                 tail: UnsafeCell::new(n2),
117                 tail_prev: AtomicPtr::new(n1),
118                 cache_bound: bound,
119                 cached_nodes: AtomicUsize::new(0),
120                 addition: consumer_addition,
121             }),
122             producer: CacheAligned::new(Producer {
123                 head: UnsafeCell::new(n2),
124                 first: UnsafeCell::new(n1),
125                 tail_copy: UnsafeCell::new(n1),
126                 addition: producer_addition,
127             }),
128         }
129     }
130
131     /// Pushes a new value onto this queue. Note that to use this function
132     /// safely, it must be externally guaranteed that there is only one pusher.
133     pub fn push(&self, t: T) {
134         unsafe {
135             // Acquire a node (which either uses a cached one or allocates a new
136             // one), and then append this to the 'head' node.
137             let n = self.alloc();
138             assert!((*n).value.is_none());
139             (*n).value = Some(t);
140             (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
141             (**self.producer.head.get()).next.store(n, Ordering::Release);
142             *(&self.producer.head).get() = n;
143         }
144     }
145
146     unsafe fn alloc(&self) -> *mut Node<T> {
147         // First try to see if we can consume the 'first' node for our uses.
148         if *self.producer.first.get() != *self.producer.tail_copy.get() {
149             let ret = *self.producer.first.get();
150             *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
151             return ret;
152         }
153         // If the above fails, then update our copy of the tail and try
154         // again.
155         *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire);
156         if *self.producer.first.get() != *self.producer.tail_copy.get() {
157             let ret = *self.producer.first.get();
158             *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
159             return ret;
160         }
161         // If all of that fails, then we have to allocate a new node
162         // (there's nothing in the node cache).
163         Node::new()
164     }
165
166     /// Attempts to pop a value from this queue. Remember that to use this type
167     /// safely you must ensure that there is only one popper at a time.
168     pub fn pop(&self) -> Option<T> {
169         unsafe {
170             // The `tail` node is not actually a used node, but rather a
171             // sentinel from where we should start popping from. Hence, look at
172             // tail's next field and see if we can use it. If we do a pop, then
173             // the current tail node is a candidate for going into the cache.
174             let tail = *self.consumer.tail.get();
175             let next = (*tail).next.load(Ordering::Acquire);
176             if next.is_null() {
177                 return None;
178             }
179             assert!((*next).value.is_some());
180             let ret = (*next).value.take();
181
182             *self.consumer.0.tail.get() = next;
183             if self.consumer.cache_bound == 0 {
184                 self.consumer.tail_prev.store(tail, Ordering::Release);
185             } else {
186                 let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
187                 if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
188                     self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
189                     (*tail).cached = true;
190                 }
191
192                 if (*tail).cached {
193                     self.consumer.tail_prev.store(tail, Ordering::Release);
194                 } else {
195                     (*self.consumer.tail_prev.load(Ordering::Relaxed))
196                         .next
197                         .store(next, Ordering::Relaxed);
198                     // We have successfully erased all references to 'tail', so
199                     // now we can safely drop it.
200                     let _: Box<Node<T>> = Box::from_raw(tail);
201                 }
202             }
203             ret
204         }
205     }
206
207     /// Attempts to peek at the head of the queue, returning `None` if the queue
208     /// has no data currently
209     ///
210     /// # Warning
211     /// The reference returned is invalid if it is not used before the consumer
212     /// pops the value off the queue. If the producer then pushes another value
213     /// onto the queue, it will overwrite the value pointed to by the reference.
214     pub fn peek(&self) -> Option<&mut T> {
215         // This is essentially the same as above with all the popping bits
216         // stripped out.
217         unsafe {
218             let tail = *self.consumer.tail.get();
219             let next = (*tail).next.load(Ordering::Acquire);
220             if next.is_null() { None } else { (*next).value.as_mut() }
221         }
222     }
223
224     pub fn producer_addition(&self) -> &ProducerAddition {
225         &self.producer.addition
226     }
227
228     pub fn consumer_addition(&self) -> &ConsumerAddition {
229         &self.consumer.addition
230     }
231 }
232
233 impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
234     fn drop(&mut self) {
235         unsafe {
236             let mut cur = *self.producer.first.get();
237             while !cur.is_null() {
238                 let next = (*cur).next.load(Ordering::Relaxed);
239                 let _n: Box<Node<T>> = Box::from_raw(cur);
240                 cur = next;
241             }
242         }
243     }
244 }