]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/spsc_queue.rs
Auto merge of #28396 - arielb1:misplaced-binding, r=eddyb
[rust.git] / src / libstd / sync / mpsc / spsc_queue.rs
1 /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
2  * Redistribution and use in source and binary forms, with or without
3  * modification, are permitted provided that the following conditions are met:
4  *
5  *    1. Redistributions of source code must retain the above copyright notice,
6  *       this list of conditions and the following disclaimer.
7  *
8  *    2. Redistributions in binary form must reproduce the above copyright
9  *       notice, this list of conditions and the following disclaimer in the
10  *       documentation and/or other materials provided with the distribution.
11  *
12  * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
13  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
14  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
15  * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
16  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
17  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
18  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
19  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
20  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
21  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22  *
23  * The views and conclusions contained in the software and documentation are
24  * those of the authors and should not be interpreted as representing official
25  * policies, either expressed or implied, of Dmitry Vyukov.
26  */
27
28 // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
29
30 //! A single-producer single-consumer concurrent queue
31 //!
32 //! This module contains the implementation of an SPSC queue which can be used
33 //! concurrently between two threads. This data structure is safe to use and
34 //! enforces the semantics that there is one pusher and one popper.
35
36 use alloc::boxed::Box;
37 use core::ptr;
38 use core::cell::UnsafeCell;
39
40 use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
41
42 // Node within the linked list queue of messages to send
43 struct Node<T> {
44     // FIXME: this could be an uninitialized T if we're careful enough, and
45     //      that would reduce memory usage (and be a bit faster).
46     //      is it worth it?
47     value: Option<T>,           // nullable for re-use of nodes
48     next: AtomicPtr<Node<T>>,   // next node in the queue
49 }
50
51 /// The single-producer single-consumer queue. This structure is not cloneable,
52 /// but it can be safely shared in an Arc if it is guaranteed that there
53 /// is only one popper and one pusher touching the queue at any one point in
54 /// time.
55 pub struct Queue<T> {
56     // consumer fields
57     tail: UnsafeCell<*mut Node<T>>, // where to pop from
58     tail_prev: AtomicPtr<Node<T>>, // where to pop from
59
60     // producer fields
61     head: UnsafeCell<*mut Node<T>>,      // where to push to
62     first: UnsafeCell<*mut Node<T>>,     // where to get new nodes from
63     tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
64
65     // Cache maintenance fields. Additions and subtractions are stored
66     // separately in order to allow them to use nonatomic addition/subtraction.
67     cache_bound: usize,
68     cache_additions: AtomicUsize,
69     cache_subtractions: AtomicUsize,
70 }
71
72 unsafe impl<T: Send> Send for Queue<T> { }
73
74 unsafe impl<T: Send> Sync for Queue<T> { }
75
76 impl<T> Node<T> {
77     fn new() -> *mut Node<T> {
78         Box::into_raw(box Node {
79             value: None,
80             next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
81         })
82     }
83 }
84
85 impl<T> Queue<T> {
86     /// Creates a new queue.
87     ///
88     /// This is unsafe as the type system doesn't enforce a single
89     /// consumer-producer relationship. It also allows the consumer to `pop`
90     /// items while there is a `peek` active due to all methods having a
91     /// non-mutable receiver.
92     ///
93     /// # Arguments
94     ///
95     ///   * `bound` - This queue implementation is implemented with a linked
96     ///               list, and this means that a push is always a malloc. In
97     ///               order to amortize this cost, an internal cache of nodes is
98     ///               maintained to prevent a malloc from always being
99     ///               necessary. This bound is the limit on the size of the
100     ///               cache (if desired). If the value is 0, then the cache has
101     ///               no bound. Otherwise, the cache will never grow larger than
102     ///               `bound` (although the queue itself could be much larger.
103     pub unsafe fn new(bound: usize) -> Queue<T> {
104         let n1 = Node::new();
105         let n2 = Node::new();
106         (*n1).next.store(n2, Ordering::Relaxed);
107         Queue {
108             tail: UnsafeCell::new(n2),
109             tail_prev: AtomicPtr::new(n1),
110             head: UnsafeCell::new(n2),
111             first: UnsafeCell::new(n1),
112             tail_copy: UnsafeCell::new(n1),
113             cache_bound: bound,
114             cache_additions: AtomicUsize::new(0),
115             cache_subtractions: AtomicUsize::new(0),
116         }
117     }
118
119     /// Pushes a new value onto this queue. Note that to use this function
120     /// safely, it must be externally guaranteed that there is only one pusher.
121     pub fn push(&self, t: T) {
122         unsafe {
123             // Acquire a node (which either uses a cached one or allocates a new
124             // one), and then append this to the 'head' node.
125             let n = self.alloc();
126             assert!((*n).value.is_none());
127             (*n).value = Some(t);
128             (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
129             (**self.head.get()).next.store(n, Ordering::Release);
130             *self.head.get() = n;
131         }
132     }
133
134     unsafe fn alloc(&self) -> *mut Node<T> {
135         // First try to see if we can consume the 'first' node for our uses.
136         // We try to avoid as many atomic instructions as possible here, so
137         // the addition to cache_subtractions is not atomic (plus we're the
138         // only one subtracting from the cache).
139         if *self.first.get() != *self.tail_copy.get() {
140             if self.cache_bound > 0 {
141                 let b = self.cache_subtractions.load(Ordering::Relaxed);
142                 self.cache_subtractions.store(b + 1, Ordering::Relaxed);
143             }
144             let ret = *self.first.get();
145             *self.first.get() = (*ret).next.load(Ordering::Relaxed);
146             return ret;
147         }
148         // If the above fails, then update our copy of the tail and try
149         // again.
150         *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire);
151         if *self.first.get() != *self.tail_copy.get() {
152             if self.cache_bound > 0 {
153                 let b = self.cache_subtractions.load(Ordering::Relaxed);
154                 self.cache_subtractions.store(b + 1, Ordering::Relaxed);
155             }
156             let ret = *self.first.get();
157             *self.first.get() = (*ret).next.load(Ordering::Relaxed);
158             return ret;
159         }
160         // If all of that fails, then we have to allocate a new node
161         // (there's nothing in the node cache).
162         Node::new()
163     }
164
165     /// Attempts to pop a value from this queue. Remember that to use this type
166     /// safely you must ensure that there is only one popper at a time.
167     pub fn pop(&self) -> Option<T> {
168         unsafe {
169             // The `tail` node is not actually a used node, but rather a
170             // sentinel from where we should start popping from. Hence, look at
171             // tail's next field and see if we can use it. If we do a pop, then
172             // the current tail node is a candidate for going into the cache.
173             let tail = *self.tail.get();
174             let next = (*tail).next.load(Ordering::Acquire);
175             if next.is_null() { return None }
176             assert!((*next).value.is_some());
177             let ret = (*next).value.take();
178
179             *self.tail.get() = next;
180             if self.cache_bound == 0 {
181                 self.tail_prev.store(tail, Ordering::Release);
182             } else {
183                 // FIXME: this is dubious with overflow.
184                 let additions = self.cache_additions.load(Ordering::Relaxed);
185                 let subtractions = self.cache_subtractions.load(Ordering::Relaxed);
186                 let size = additions - subtractions;
187
188                 if size < self.cache_bound {
189                     self.tail_prev.store(tail, Ordering::Release);
190                     self.cache_additions.store(additions + 1, Ordering::Relaxed);
191                 } else {
192                     (*self.tail_prev.load(Ordering::Relaxed))
193                           .next.store(next, Ordering::Relaxed);
194                     // We have successfully erased all references to 'tail', so
195                     // now we can safely drop it.
196                     let _: Box<Node<T>> = Box::from_raw(tail);
197                 }
198             }
199             ret
200         }
201     }
202
203     /// Attempts to peek at the head of the queue, returning `None` if the queue
204     /// has no data currently
205     ///
206     /// # Warning
207     /// The reference returned is invalid if it is not used before the consumer
208     /// pops the value off the queue. If the producer then pushes another value
209     /// onto the queue, it will overwrite the value pointed to by the reference.
210     pub fn peek(&self) -> Option<&mut T> {
211         // This is essentially the same as above with all the popping bits
212         // stripped out.
213         unsafe {
214             let tail = *self.tail.get();
215             let next = (*tail).next.load(Ordering::Acquire);
216             if next.is_null() { None } else { (*next).value.as_mut() }
217         }
218     }
219 }
220
221 impl<T> Drop for Queue<T> {
222     fn drop(&mut self) {
223         unsafe {
224             let mut cur = *self.first.get();
225             while !cur.is_null() {
226                 let next = (*cur).next.load(Ordering::Relaxed);
227                 let _n: Box<Node<T>> = Box::from_raw(cur);
228                 cur = next;
229             }
230         }
231     }
232 }
233
234 #[cfg(test)]
235 mod tests {
236     use prelude::v1::*;
237
238     use sync::Arc;
239     use super::Queue;
240     use thread;
241     use sync::mpsc::channel;
242
243     #[test]
244     fn smoke() {
245         unsafe {
246             let queue = Queue::new(0);
247             queue.push(1);
248             queue.push(2);
249             assert_eq!(queue.pop(), Some(1));
250             assert_eq!(queue.pop(), Some(2));
251             assert_eq!(queue.pop(), None);
252             queue.push(3);
253             queue.push(4);
254             assert_eq!(queue.pop(), Some(3));
255             assert_eq!(queue.pop(), Some(4));
256             assert_eq!(queue.pop(), None);
257         }
258     }
259
260     #[test]
261     fn peek() {
262         unsafe {
263             let queue = Queue::new(0);
264             queue.push(vec![1]);
265
266             // Ensure the borrowchecker works
267             match queue.peek() {
268                 Some(vec) => match &**vec {
269                     // Note that `pop` is not allowed here due to borrow
270                     [1] => {}
271                     _ => return
272                 },
273                 None => unreachable!()
274             }
275
276             queue.pop();
277         }
278     }
279
280     #[test]
281     fn drop_full() {
282         unsafe {
283             let q: Queue<Box<_>> = Queue::new(0);
284             q.push(box 1);
285             q.push(box 2);
286         }
287     }
288
289     #[test]
290     fn smoke_bound() {
291         unsafe {
292             let q = Queue::new(0);
293             q.push(1);
294             q.push(2);
295             assert_eq!(q.pop(), Some(1));
296             assert_eq!(q.pop(), Some(2));
297             assert_eq!(q.pop(), None);
298             q.push(3);
299             q.push(4);
300             assert_eq!(q.pop(), Some(3));
301             assert_eq!(q.pop(), Some(4));
302             assert_eq!(q.pop(), None);
303         }
304     }
305
306     #[test]
307     fn stress() {
308         unsafe {
309             stress_bound(0);
310             stress_bound(1);
311         }
312
313         unsafe fn stress_bound(bound: usize) {
314             let q = Arc::new(Queue::new(bound));
315
316             let (tx, rx) = channel();
317             let q2 = q.clone();
318             let _t = thread::spawn(move|| {
319                 for _ in 0..100000 {
320                     loop {
321                         match q2.pop() {
322                             Some(1) => break,
323                             Some(_) => panic!(),
324                             None => {}
325                         }
326                     }
327                 }
328                 tx.send(()).unwrap();
329             });
330             for _ in 0..100000 {
331                 q.push(1);
332             }
333             rx.recv().unwrap();
334         }
335     }
336 }