]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/spsc_queue.rs
core: Fix size_hint for signed integer Range<T> iterators
[rust.git] / src / libstd / sync / mpsc / spsc_queue.rs
1 /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
2  * Redistribution and use in source and binary forms, with or without
3  * modification, are permitted provided that the following conditions are met:
4  *
5  *    1. Redistributions of source code must retain the above copyright notice,
6  *       this list of conditions and the following disclaimer.
7  *
8  *    2. Redistributions in binary form must reproduce the above copyright
9  *       notice, this list of conditions and the following disclaimer in the
10  *       documentation and/or other materials provided with the distribution.
11  *
12  * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
13  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
14  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
15  * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
16  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
17  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
18  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
19  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
20  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
21  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22  *
23  * The views and conclusions contained in the software and documentation are
24  * those of the authors and should not be interpreted as representing official
25  * policies, either expressed or implied, of Dmitry Vyukov.
26  */
27
28 // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
29
30 //! A single-producer single-consumer concurrent queue
31 //!
32 //! This module contains the implementation of an SPSC queue which can be used
33 //! concurrently between two tasks. This data structure is safe to use and
34 //! enforces the semantics that there is one pusher and one popper.
35
36 #![unstable(feature = "std_misc")]
37
38 use core::prelude::*;
39
40 use alloc::boxed;
41 use alloc::boxed::Box;
42 use core::ptr;
43 use core::cell::UnsafeCell;
44
45 use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
46
47 // Node within the linked list queue of messages to send
48 struct Node<T> {
49     // FIXME: this could be an uninitialized T if we're careful enough, and
50     //      that would reduce memory usage (and be a bit faster).
51     //      is it worth it?
52     value: Option<T>,           // nullable for re-use of nodes
53     next: AtomicPtr<Node<T>>,   // next node in the queue
54 }
55
56 /// The single-producer single-consumer queue. This structure is not cloneable,
57 /// but it can be safely shared in an Arc if it is guaranteed that there
58 /// is only one popper and one pusher touching the queue at any one point in
59 /// time.
60 pub struct Queue<T> {
61     // consumer fields
62     tail: UnsafeCell<*mut Node<T>>, // where to pop from
63     tail_prev: AtomicPtr<Node<T>>, // where to pop from
64
65     // producer fields
66     head: UnsafeCell<*mut Node<T>>,      // where to push to
67     first: UnsafeCell<*mut Node<T>>,     // where to get new nodes from
68     tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
69
70     // Cache maintenance fields. Additions and subtractions are stored
71     // separately in order to allow them to use nonatomic addition/subtraction.
72     cache_bound: usize,
73     cache_additions: AtomicUsize,
74     cache_subtractions: AtomicUsize,
75 }
76
77 unsafe impl<T: Send> Send for Queue<T> { }
78
79 unsafe impl<T: Send> Sync for Queue<T> { }
80
81 impl<T> Node<T> {
82     fn new() -> *mut Node<T> {
83         unsafe {
84             boxed::into_raw(box Node {
85                 value: None,
86                 next: AtomicPtr::new(ptr::null_mut::<Node<T>>()),
87             })
88         }
89     }
90 }
91
92 impl<T> Queue<T> {
93     /// Creates a new queue.
94     ///
95     /// This is unsafe as the type system doesn't enforce a single
96     /// consumer-producer relationship. It also allows the consumer to `pop`
97     /// items while there is a `peek` active due to all methods having a
98     /// non-mutable receiver.
99     ///
100     /// # Arguments
101     ///
102     ///   * `bound` - This queue implementation is implemented with a linked
103     ///               list, and this means that a push is always a malloc. In
104     ///               order to amortize this cost, an internal cache of nodes is
105     ///               maintained to prevent a malloc from always being
106     ///               necessary. This bound is the limit on the size of the
107     ///               cache (if desired). If the value is 0, then the cache has
108     ///               no bound. Otherwise, the cache will never grow larger than
109     ///               `bound` (although the queue itself could be much larger.
110     pub unsafe fn new(bound: usize) -> Queue<T> {
111         let n1 = Node::new();
112         let n2 = Node::new();
113         (*n1).next.store(n2, Ordering::Relaxed);
114         Queue {
115             tail: UnsafeCell::new(n2),
116             tail_prev: AtomicPtr::new(n1),
117             head: UnsafeCell::new(n2),
118             first: UnsafeCell::new(n1),
119             tail_copy: UnsafeCell::new(n1),
120             cache_bound: bound,
121             cache_additions: AtomicUsize::new(0),
122             cache_subtractions: AtomicUsize::new(0),
123         }
124     }
125
126     /// Pushes a new value onto this queue. Note that to use this function
127     /// safely, it must be externally guaranteed that there is only one pusher.
128     pub fn push(&self, t: T) {
129         unsafe {
130             // Acquire a node (which either uses a cached one or allocates a new
131             // one), and then append this to the 'head' node.
132             let n = self.alloc();
133             assert!((*n).value.is_none());
134             (*n).value = Some(t);
135             (*n).next.store(ptr::null_mut(), Ordering::Relaxed);
136             (**self.head.get()).next.store(n, Ordering::Release);
137             *self.head.get() = n;
138         }
139     }
140
141     unsafe fn alloc(&self) -> *mut Node<T> {
142         // First try to see if we can consume the 'first' node for our uses.
143         // We try to avoid as many atomic instructions as possible here, so
144         // the addition to cache_subtractions is not atomic (plus we're the
145         // only one subtracting from the cache).
146         if *self.first.get() != *self.tail_copy.get() {
147             if self.cache_bound > 0 {
148                 let b = self.cache_subtractions.load(Ordering::Relaxed);
149                 self.cache_subtractions.store(b + 1, Ordering::Relaxed);
150             }
151             let ret = *self.first.get();
152             *self.first.get() = (*ret).next.load(Ordering::Relaxed);
153             return ret;
154         }
155         // If the above fails, then update our copy of the tail and try
156         // again.
157         *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire);
158         if *self.first.get() != *self.tail_copy.get() {
159             if self.cache_bound > 0 {
160                 let b = self.cache_subtractions.load(Ordering::Relaxed);
161                 self.cache_subtractions.store(b + 1, Ordering::Relaxed);
162             }
163             let ret = *self.first.get();
164             *self.first.get() = (*ret).next.load(Ordering::Relaxed);
165             return ret;
166         }
167         // If all of that fails, then we have to allocate a new node
168         // (there's nothing in the node cache).
169         Node::new()
170     }
171
172     /// Attempts to pop a value from this queue. Remember that to use this type
173     /// safely you must ensure that there is only one popper at a time.
174     pub fn pop(&self) -> Option<T> {
175         unsafe {
176             // The `tail` node is not actually a used node, but rather a
177             // sentinel from where we should start popping from. Hence, look at
178             // tail's next field and see if we can use it. If we do a pop, then
179             // the current tail node is a candidate for going into the cache.
180             let tail = *self.tail.get();
181             let next = (*tail).next.load(Ordering::Acquire);
182             if next.is_null() { return None }
183             assert!((*next).value.is_some());
184             let ret = (*next).value.take();
185
186             *self.tail.get() = next;
187             if self.cache_bound == 0 {
188                 self.tail_prev.store(tail, Ordering::Release);
189             } else {
190                 // FIXME: this is dubious with overflow.
191                 let additions = self.cache_additions.load(Ordering::Relaxed);
192                 let subtractions = self.cache_subtractions.load(Ordering::Relaxed);
193                 let size = additions - subtractions;
194
195                 if size < self.cache_bound {
196                     self.tail_prev.store(tail, Ordering::Release);
197                     self.cache_additions.store(additions + 1, Ordering::Relaxed);
198                 } else {
199                     (*self.tail_prev.load(Ordering::Relaxed))
200                           .next.store(next, Ordering::Relaxed);
201                     // We have successfully erased all references to 'tail', so
202                     // now we can safely drop it.
203                     let _: Box<Node<T>> = Box::from_raw(tail);
204                 }
205             }
206             return ret;
207         }
208     }
209
210     /// Attempts to peek at the head of the queue, returning `None` if the queue
211     /// has no data currently
212     ///
213     /// # Warning
214     /// The reference returned is invalid if it is not used before the consumer
215     /// pops the value off the queue. If the producer then pushes another value
216     /// onto the queue, it will overwrite the value pointed to by the reference.
217     pub fn peek<'a>(&'a self) -> Option<&'a mut T> {
218         // This is essentially the same as above with all the popping bits
219         // stripped out.
220         unsafe {
221             let tail = *self.tail.get();
222             let next = (*tail).next.load(Ordering::Acquire);
223             if next.is_null() { return None }
224             return (*next).value.as_mut();
225         }
226     }
227 }
228
229 #[unsafe_destructor]
230 impl<T> Drop for Queue<T> {
231     fn drop(&mut self) {
232         unsafe {
233             let mut cur = *self.first.get();
234             while !cur.is_null() {
235                 let next = (*cur).next.load(Ordering::Relaxed);
236                 let _n: Box<Node<T>> = Box::from_raw(cur);
237                 cur = next;
238             }
239         }
240     }
241 }
242
243 #[cfg(test)]
244 mod test {
245     use prelude::v1::*;
246
247     use sync::Arc;
248     use super::Queue;
249     use thread;
250     use sync::mpsc::channel;
251
252     #[test]
253     fn smoke() {
254         unsafe {
255             let queue = Queue::new(0);
256             queue.push(1);
257             queue.push(2);
258             assert_eq!(queue.pop(), Some(1));
259             assert_eq!(queue.pop(), Some(2));
260             assert_eq!(queue.pop(), None);
261             queue.push(3);
262             queue.push(4);
263             assert_eq!(queue.pop(), Some(3));
264             assert_eq!(queue.pop(), Some(4));
265             assert_eq!(queue.pop(), None);
266         }
267     }
268
269     #[test]
270     fn peek() {
271         unsafe {
272             let queue = Queue::new(0);
273             queue.push(vec![1]);
274
275             // Ensure the borrowchecker works
276             match queue.peek() {
277                 Some(vec) => match &**vec {
278                     // Note that `pop` is not allowed here due to borrow
279                     [1] => {}
280                     _ => return
281                 },
282                 None => unreachable!()
283             }
284
285             queue.pop();
286         }
287     }
288
289     #[test]
290     fn drop_full() {
291         unsafe {
292             let q: Queue<Box<_>> = Queue::new(0);
293             q.push(box 1);
294             q.push(box 2);
295         }
296     }
297
298     #[test]
299     fn smoke_bound() {
300         unsafe {
301             let q = Queue::new(0);
302             q.push(1);
303             q.push(2);
304             assert_eq!(q.pop(), Some(1));
305             assert_eq!(q.pop(), Some(2));
306             assert_eq!(q.pop(), None);
307             q.push(3);
308             q.push(4);
309             assert_eq!(q.pop(), Some(3));
310             assert_eq!(q.pop(), Some(4));
311             assert_eq!(q.pop(), None);
312         }
313     }
314
315     #[test]
316     fn stress() {
317         unsafe {
318             stress_bound(0);
319             stress_bound(1);
320         }
321
322         unsafe fn stress_bound(bound: usize) {
323             let q = Arc::new(Queue::new(bound));
324
325             let (tx, rx) = channel();
326             let q2 = q.clone();
327             let _t = thread::spawn(move|| {
328                 for _ in 0..100000 {
329                     loop {
330                         match q2.pop() {
331                             Some(1) => break,
332                             Some(_) => panic!(),
333                             None => {}
334                         }
335                     }
336                 }
337                 tx.send(()).unwrap();
338             });
339             for _ in 0..100000 {
340                 q.push(1);
341             }
342             rx.recv().unwrap();
343         }
344     }
345 }