]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/spsc_queue.rs
Ensure `record_layout_for_printing()` is inlined.
[rust.git] / src / libstd / sync / mpsc / spsc_queue.rs
1 //! A single-producer single-consumer concurrent queue
2 //!
3 //! This module contains the implementation of an SPSC queue which can be used
4 //! concurrently between two threads. This data structure is safe to use and
5 //! enforces the semantics that there is one pusher and one popper.
6
7 // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
8
9 use boxed::Box;
10 use core::ptr;
11 use core::cell::UnsafeCell;
12
13 use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
14
15 use super::cache_aligned::CacheAligned;
16
17 // Node within the linked list queue of messages to send
18 struct Node<T> {
19     // FIXME: this could be an uninitialized T if we're careful enough, and
20     //      that would reduce memory usage (and be a bit faster).
21     //      is it worth it?
22     value: Option<T>,           // nullable for re-use of nodes
23     cached: bool,               // This node goes into the node cache
24     next: AtomicPtr<Node<T>>,   // next node in the queue
25 }
26
27 /// The single-producer single-consumer queue. This structure is not cloneable,
28 /// but it can be safely shared in an Arc if it is guaranteed that there
29 /// is only one popper and one pusher touching the queue at any one point in
30 /// time.
31 pub struct Queue<T, ProducerAddition=(), ConsumerAddition=()> {
32     // consumer fields
33     consumer: CacheAligned<Consumer<T, ConsumerAddition>>,
34
35     // producer fields
36     producer: CacheAligned<Producer<T, ProducerAddition>>,
37 }
38
39 struct Consumer<T, Addition> {
40     tail: UnsafeCell<*mut Node<T>>, // where to pop from
41     tail_prev: AtomicPtr<Node<T>>, // where to pop from
42     cache_bound: usize, // maximum cache size
43     cached_nodes: AtomicUsize, // number of nodes marked as cachable
44     addition: Addition,
45 }
46
47 struct Producer<T, Addition> {
48     head: UnsafeCell<*mut Node<T>>,      // where to push to
49     first: UnsafeCell<*mut Node<T>>,     // where to get new nodes from
50     tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
51     addition: Addition,
52 }
53
54 unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Send for Queue<T, P, C> { }
55
56 unsafe impl<T: Send, P: Send + Sync, C: Send + Sync> Sync for Queue<T, P, C> { }
57
58 impl<T> Node<T> {
59     fn new() -> *mut Node<T> {
60         Box::into_raw(box Node {
61             value: None,
62             cached: false,
63             next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
64         })
65     }
66 }
67
68 impl<T, ProducerAddition, ConsumerAddition> Queue<T, ProducerAddition, ConsumerAddition> {
69
70     /// Creates a new queue. With given additional elements in the producer and
71     /// consumer portions of the queue.
72     ///
73     /// Due to the performance implications of cache-contention,
74     /// we wish to keep fields used mainly by the producer on a separate cache
75     /// line than those used by the consumer.
76     /// Since cache lines are usually 64 bytes, it is unreasonably expensive to
77     /// allocate one for small fields, so we allow users to insert additional
78     /// fields into the cache lines already allocated by this for the producer
79     /// and consumer.
80     ///
81     /// This is unsafe as the type system doesn't enforce a single
82     /// consumer-producer relationship. It also allows the consumer to `pop`
83     /// items while there is a `peek` active due to all methods having a
84     /// non-mutable receiver.
85     ///
86     /// # Arguments
87     ///
88     ///   * `bound` - This queue implementation is implemented with a linked
89     ///               list, and this means that a push is always a malloc. In
90     ///               order to amortize this cost, an internal cache of nodes is
91     ///               maintained to prevent a malloc from always being
92     ///               necessary. This bound is the limit on the size of the
93     ///               cache (if desired). If the value is 0, then the cache has
94     ///               no bound. Otherwise, the cache will never grow larger than
95     ///               `bound` (although the queue itself could be much larger.
96     pub unsafe fn with_additions(
97         bound: usize,
98         producer_addition: ProducerAddition,
99         consumer_addition: ConsumerAddition,
100     ) -> Self {
101         let n1 = Node::new();
102         let n2 = Node::new();
103         (*n1).next.store(n2, Ordering::Relaxed);
104         Queue {
105             consumer: CacheAligned::new(Consumer {
106                 tail: UnsafeCell::new(n2),
107                 tail_prev: AtomicPtr::new(n1),
108                 cache_bound: bound,
109                 cached_nodes: AtomicUsize::new(0),
110                 addition: consumer_addition
111             }),
112             producer: CacheAligned::new(Producer {
113                 head: UnsafeCell::new(n2),
114                 first: UnsafeCell::new(n1),
115                 tail_copy: UnsafeCell::new(n1),
116                 addition: producer_addition
117             }),
118         }
119     }
120
121     /// Pushes a new value onto this queue. Note that to use this function
122     /// safely, it must be externally guaranteed that there is only one pusher.
123     pub fn push(&self, t: T) {
124         unsafe {
125             // Acquire a node (which either uses a cached one or allocates a new
126             // one), and then append this to the 'head' node.
127             let n = self.alloc();
128             assert!((*n).value.is_none());
129             (*n).value = Some(t);
130             (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
131             (**self.producer.head.get()).next.store(n, Ordering::Release);
132             *(&self.producer.head).get() = n;
133         }
134     }
135
136     unsafe fn alloc(&self) -> *mut Node<T> {
137         // First try to see if we can consume the 'first' node for our uses.
138         if *self.producer.first.get() != *self.producer.tail_copy.get() {
139             let ret = *self.producer.first.get();
140             *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
141             return ret;
142         }
143         // If the above fails, then update our copy of the tail and try
144         // again.
145         *self.producer.0.tail_copy.get() =
146             self.consumer.tail_prev.load(Ordering::Acquire);
147         if *self.producer.first.get() != *self.producer.tail_copy.get() {
148             let ret = *self.producer.first.get();
149             *self.producer.0.first.get() = (*ret).next.load(Ordering::Relaxed);
150             return ret;
151         }
152         // If all of that fails, then we have to allocate a new node
153         // (there's nothing in the node cache).
154         Node::new()
155     }
156
157     /// Attempts to pop a value from this queue. Remember that to use this type
158     /// safely you must ensure that there is only one popper at a time.
159     pub fn pop(&self) -> Option<T> {
160         unsafe {
161             // The `tail` node is not actually a used node, but rather a
162             // sentinel from where we should start popping from. Hence, look at
163             // tail's next field and see if we can use it. If we do a pop, then
164             // the current tail node is a candidate for going into the cache.
165             let tail = *self.consumer.tail.get();
166             let next = (*tail).next.load(Ordering::Acquire);
167             if next.is_null() { return None }
168             assert!((*next).value.is_some());
169             let ret = (*next).value.take();
170
171             *self.consumer.0.tail.get() = next;
172             if self.consumer.cache_bound == 0 {
173                 self.consumer.tail_prev.store(tail, Ordering::Release);
174             } else {
175                 let cached_nodes = self.consumer.cached_nodes.load(Ordering::Relaxed);
176                 if cached_nodes < self.consumer.cache_bound && !(*tail).cached {
177                     self.consumer.cached_nodes.store(cached_nodes, Ordering::Relaxed);
178                     (*tail).cached = true;
179                 }
180
181                 if (*tail).cached {
182                     self.consumer.tail_prev.store(tail, Ordering::Release);
183                 } else {
184                     (*self.consumer.tail_prev.load(Ordering::Relaxed))
185                         .next.store(next, Ordering::Relaxed);
186                     // We have successfully erased all references to 'tail', so
187                     // now we can safely drop it.
188                     let _: Box<Node<T>> = Box::from_raw(tail);
189                 }
190             }
191             ret
192         }
193     }
194
195     /// Attempts to peek at the head of the queue, returning `None` if the queue
196     /// has no data currently
197     ///
198     /// # Warning
199     /// The reference returned is invalid if it is not used before the consumer
200     /// pops the value off the queue. If the producer then pushes another value
201     /// onto the queue, it will overwrite the value pointed to by the reference.
202     pub fn peek(&self) -> Option<&mut T> {
203         // This is essentially the same as above with all the popping bits
204         // stripped out.
205         unsafe {
206             let tail = *self.consumer.tail.get();
207             let next = (*tail).next.load(Ordering::Acquire);
208             if next.is_null() { None } else { (*next).value.as_mut() }
209         }
210     }
211
212     pub fn producer_addition(&self) -> &ProducerAddition {
213         &self.producer.addition
214     }
215
216     pub fn consumer_addition(&self) -> &ConsumerAddition {
217         &self.consumer.addition
218     }
219 }
220
221 impl<T, ProducerAddition, ConsumerAddition> Drop for Queue<T, ProducerAddition, ConsumerAddition> {
222     fn drop(&mut self) {
223         unsafe {
224             let mut cur = *self.producer.first.get();
225             while !cur.is_null() {
226                 let next = (*cur).next.load(Ordering::Relaxed);
227                 let _n: Box<Node<T>> = Box::from_raw(cur);
228                 cur = next;
229             }
230         }
231     }
232 }
233
234 #[cfg(all(test, not(target_os = "emscripten")))]
235 mod tests {
236     use sync::Arc;
237     use super::Queue;
238     use thread;
239     use sync::mpsc::channel;
240
241     #[test]
242     fn smoke() {
243         unsafe {
244             let queue = Queue::with_additions(0, (), ());
245             queue.push(1);
246             queue.push(2);
247             assert_eq!(queue.pop(), Some(1));
248             assert_eq!(queue.pop(), Some(2));
249             assert_eq!(queue.pop(), None);
250             queue.push(3);
251             queue.push(4);
252             assert_eq!(queue.pop(), Some(3));
253             assert_eq!(queue.pop(), Some(4));
254             assert_eq!(queue.pop(), None);
255         }
256     }
257
258     #[test]
259     fn peek() {
260         unsafe {
261             let queue = Queue::with_additions(0, (), ());
262             queue.push(vec![1]);
263
264             // Ensure the borrowchecker works
265             match queue.peek() {
266                 Some(vec) => {
267                     assert_eq!(&*vec, &[1]);
268                 },
269                 None => unreachable!()
270             }
271
272             match queue.pop() {
273                 Some(vec) => {
274                     assert_eq!(&*vec, &[1]);
275                 },
276                 None => unreachable!()
277             }
278         }
279     }
280
281     #[test]
282     fn drop_full() {
283         unsafe {
284             let q: Queue<Box<_>> = Queue::with_additions(0, (), ());
285             q.push(box 1);
286             q.push(box 2);
287         }
288     }
289
290     #[test]
291     fn smoke_bound() {
292         unsafe {
293             let q = Queue::with_additions(0, (), ());
294             q.push(1);
295             q.push(2);
296             assert_eq!(q.pop(), Some(1));
297             assert_eq!(q.pop(), Some(2));
298             assert_eq!(q.pop(), None);
299             q.push(3);
300             q.push(4);
301             assert_eq!(q.pop(), Some(3));
302             assert_eq!(q.pop(), Some(4));
303             assert_eq!(q.pop(), None);
304         }
305     }
306
307     #[test]
308     fn stress() {
309         unsafe {
310             stress_bound(0);
311             stress_bound(1);
312         }
313
314         unsafe fn stress_bound(bound: usize) {
315             let q = Arc::new(Queue::with_additions(bound, (), ()));
316
317             let (tx, rx) = channel();
318             let q2 = q.clone();
319             let _t = thread::spawn(move|| {
320                 for _ in 0..100000 {
321                     loop {
322                         match q2.pop() {
323                             Some(1) => break,
324                             Some(_) => panic!(),
325                             None => {}
326                         }
327                     }
328                 }
329                 tx.send(()).unwrap();
330             });
331             for _ in 0..100000 {
332                 q.push(1);
333             }
334             rx.recv().unwrap();
335         }
336     }
337 }