]> git.lizzy.rs Git - rust.git/blob - library/std/src/sync/mpsc/spsc_queue.rs
Merge commit '266e96785ab71834b917bf474f130a6d8fdecd4b' into sync_cg_clif-2022-10-23
[rust.git] / library / std / src / sync / mpsc / spsc_queue.rs
1 //! A single-producer single-consumer concurrent queue
2 //!
3 //! This module contains the implementation of an SPSC queue which can be used
4 //! concurrently between two threads. This data structure is safe to use and
5 //! enforces the semantics that there is one pusher and one popper.
6
7 // https://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
8
9 #[cfg(all(test, not(target_os = "emscripten")))]
10 mod tests;
11
12 use core::cell::UnsafeCell;
13 use core::ptr;
14
15 use crate::boxed::Box;
16 use crate::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
17
18 use super::cache_aligned::CacheAligned;
19
20 // Node within the linked list queue of messages to send
21 struct Node<T> {
22     // FIXME: this could be an uninitialized T if we're careful enough, and
23     //      that would reduce memory usage (and be a bit faster).
24     //      is it worth it?
25     value: Option<T>,         // nullable for re-use of nodes
26     cached: bool,             // This node goes into the node cache
27     next: AtomicPtr<Node<T>>, // next node in the queue
28 }
29
30 /// The single-producer single-consumer queue. This structure is not cloneable,
31 /// but it can be safely shared in an Arc if it is guaranteed that there
32 /// is only one popper and one pusher touching the queue at any one point in
33 /// time.
34 pub struct Queue<T, ProducerAddition = (), ConsumerAddition = ()> {
35     // consumer fields
36     consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
37
38     // producer fields
39     producer: CacheAligned<Producer<T, ProducerAddition>>,
40 }
41
42 struct Consumer<T, Addition> {
43     tail: UnsafeCell<*mut Node<T>>, // where to pop from
44     tail_prev: AtomicPtr<Node<T>>,  // where to pop from
45     cache_bound: usize,             // maximum cache size
46     cached_nodes: AtomicUsize,      // number of nodes marked as cacheable
47     addition: Addition,
48 }
49
50 struct Producer<T, Addition> {
51     head: UnsafeCell<*mut Node<T>>,      // where to push to
52     first: UnsafeCell<*mut Node<T>>,     // where to get new nodes from
53     tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
54     addition: Addition,
55 }
56
57 unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> {}
58
59 unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> {}
60
61 impl<T> Node<T> {
62     fn new() -> *mut Node<T> {
63         Box::into_raw(box Node {
64             value: None,
65             cached: false,
66             next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
67         })
68     }
69 }
70
71 impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
72     /// Creates a new queue. With given additional elements in the producer and
73     /// consumer portions of the queue.
74     ///
75     /// Due to the performance implications of cache-contention,
76     /// we wish to keep fields used mainly by the producer on a separate cache
77     /// line than those used by the consumer.
78     /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
79     /// allocate one for small fields, so we allow users to insert additional
80     /// fields into the cache lines already allocated by this for the producer
81     /// and consumer.
82     ///
83     /// This is unsafe as the type system doesn't enforce a single
84     /// consumer-producer relationship. It also allows the consumer to `pop`
85     /// items while there is a `peek` active due to all methods having a
86     /// non-mutable receiver.
87     ///
88     /// # Arguments
89     ///
90     ///   * `bound` - This queue implementation is implemented with a linked
91     ///               list, and this means that a push is always a malloc. In
92     ///               order to amortize this cost, an internal cache of nodes is
93     ///               maintained to prevent a malloc from always being
94     ///               necessary. This bound is the limit on the size of the
95     ///               cache (if desired). If the value is 0, then the cache has
96     ///               no bound. Otherwise, the cache will never grow larger than
97     ///               `bound` (although the queue itself could be much larger.
98     pub unsafe fn with_additions(
99         bound: usize,
100         producer_addition: ProducerAddition,
101         consumer_addition: ConsumerAddition,
102     ) -> Self {
103         let n1 = Node::new();
104         let n2 = Node::new();
105         (*n1).next.store(n2, Ordering::Relaxed);
106         Queue {
107             consumer: CacheAligned::new(Consumer {
108                 tail: UnsafeCell::new(n2),
109                 tail_prev: AtomicPtr::new(n1),
110                 cache_bound: bound,
111                 cached_nodes: AtomicUsize::new(0),
112                 addition: consumer_addition,
113             }),
114             producer: CacheAligned::new(Producer {
115                 head: UnsafeCell::new(n2),
116                 first: UnsafeCell::new(n1),
117                 tail_copy: UnsafeCell::new(n1),
118                 addition: producer_addition,
119             }),
120         }
121     }
122
123     /// Pushes a new value onto this queue. Note that to use this function
124     /// safely, it must be externally guaranteed that there is only one pusher.
125     pub fn push(&self, t: T) {
126         unsafe {
127             // Acquire a node (which either uses a cached one or allocates a new
128             // one), and then append this to the 'head' node.
129             let n = self.alloc();
130             assert!((*n).value.is_none());
131             (*n).value = Some(t);
132             (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
133             (**self.producer.head.get()).next.store(n, Ordering::Release);
134             *(&self.producer.head).get() = n;
135         }
136     }
137
138     unsafe fn alloc(&self) -> *mut Node<T> {
139         // First try to see if we can consume the 'first' node for our uses.
140         if *self.producer.first.get() != *self.producer.tail_copy.get() {
141             let ret = *self.producer.first.get();
142             *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
143             return ret;
144         }
145         // If the above fails, then update our copy of the tail and try
146         // again.
147         *self.producer.0.tail_copy.get() = self.consumer.tail_prev.load(Ordering::Acquire);
148         if *self.producer.first.get() != *self.producer.tail_copy.get() {
149             let ret = *self.producer.first.get();
150             *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
151             return ret;
152         }
153         // If all of that fails, then we have to allocate a new node
154         // (there's nothing in the node cache).
155         Node::new()
156     }
157
158     /// Attempts to pop a value from this queue. Remember that to use this type
159     /// safely you must ensure that there is only one popper at a time.
160     pub fn pop(&self) -> Option<T> {
161         unsafe {
162             // The `tail` node is not actually a used node, but rather a
163             // sentinel from where we should start popping from. Hence, look at
164             // tail's next field and see if we can use it. If we do a pop, then
165             // the current tail node is a candidate for going into the cache.
166             let tail = *self.consumer.tail.get();
167             let next = (*tail).next.load(Ordering::Acquire);
168             if next.is_null() {
169                 return None;
170             }
171             assert!((*next).value.is_some());
172             let ret = (*next).value.take();
173
174             *self.consumer.0.tail.get() = next;
175             if self.consumer.cache_bound == 0 {
176                 self.consumer.tail_prev.store(tail, Ordering::Release);
177             } else {
178                 let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
179                 if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
180                     self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
181                     (*tail).cached = true;
182                 }
183
184                 if (*tail).cached {
185                     self.consumer.tail_prev.store(tail, Ordering::Release);
186                 } else {
187                     (*self.consumer.tail_prev.load(Ordering::Relaxed))
188                         .next
189                         .store(next, Ordering::Relaxed);
190                     // We have successfully erased all references to 'tail', so
191                     // now we can safely drop it.
192                     let _: Box<Node<T>> = Box::from_raw(tail);
193                 }
194             }
195             ret
196         }
197     }
198
199     /// Attempts to peek at the head of the queue, returning `None` if the queue
200     /// has no data currently
201     ///
202     /// # Warning
203     /// The reference returned is invalid if it is not used before the consumer
204     /// pops the value off the queue. If the producer then pushes another value
205     /// onto the queue, it will overwrite the value pointed to by the reference.
206     pub fn peek(&self) -> Option<&mut T> {
207         // This is essentially the same as above with all the popping bits
208         // stripped out.
209         unsafe {
210             let tail = *self.consumer.tail.get();
211             let next = (*tail).next.load(Ordering::Acquire);
212             if next.is_null() { None } else { (*next).value.as_mut() }
213         }
214     }
215
216     pub fn producer_addition(&self) -> &ProducerAddition {
217         &self.producer.addition
218     }
219
220     pub fn consumer_addition(&self) -> &ConsumerAddition {
221         &self.consumer.addition
222     }
223 }
224
225 impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
226     fn drop(&mut self) {
227         unsafe {
228             let mut cur = *self.producer.first.get();
229             while !cur.is_null() {
230                 let next = (*cur).next.load(Ordering::Relaxed);
231                 let _n: Box<Node<T>> = Box::from_raw(cur);
232                 cur = next;
233             }
234         }
235     }
236 }