]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/spsc_queue.rs
Merge pull request #20510 from tshepang/patch-6
[rust.git] / src / libstd / sync / mpsc / spsc_queue.rs
1 /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
2  * Redistribution and use in source and binary forms, with or without
3  * modification, are permitted provided that the following conditions are met:
4  *
5  *    1. Redistributions of source code must retain the above copyright notice,
6  *       this list of conditions and the following disclaimer.
7  *
8  *    2. Redistributions in binary form must reproduce the above copyright
9  *       notice, this list of conditions and the following disclaimer in the
10  *       documentation and/or other materials provided with the distribution.
11  *
12  * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
13  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
14  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
15  * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
16  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
17  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
18  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
19  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
20  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
21  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22  *
23  * The views and conclusions contained in the software and documentation are
24  * those of the authors and should not be interpreted as representing official
25  * policies, either expressed or implied, of Dmitry Vyukov.
26  */
27
28 // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
29
30 //! A single-producer single-consumer concurrent queue
31 //!
32 //! This module contains the implementation of an SPSC queue which can be used
33 //! concurrently between two tasks. This data structure is safe to use and
34 //! enforces the semantics that there is one pusher and one popper.
35
36 #![experimental]
37
38 use core::prelude::*;
39
40 use alloc::boxed::Box;
41 use core::mem;
42 use core::cell::UnsafeCell;
43
44 use sync::atomic::{AtomicPtr, AtomicUint, Ordering};
45
46 // Node within the linked list queue of messages to send
47 struct Node<T> {
48     // FIXME: this could be an uninitialized T if we're careful enough, and
49     //      that would reduce memory usage (and be a bit faster).
50     //      is it worth it?
51     value: Option<T>,           // nullable for re-use of nodes
52     next: AtomicPtr<Node<T>>,   // next node in the queue
53 }
54
55 /// The single-producer single-consumer queue. This structure is not cloneable,
56 /// but it can be safely shared in an Arc if it is guaranteed that there
57 /// is only one popper and one pusher touching the queue at any one point in
58 /// time.
59 pub struct Queue<T> {
60     // consumer fields
61     tail: UnsafeCell<*mut Node<T>>, // where to pop from
62     tail_prev: AtomicPtr<Node<T>>, // where to pop from
63
64     // producer fields
65     head: UnsafeCell<*mut Node<T>>,      // where to push to
66     first: UnsafeCell<*mut Node<T>>,     // where to get new nodes from
67     tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
68
69     // Cache maintenance fields. Additions and subtractions are stored
70     // separately in order to allow them to use nonatomic addition/subtraction.
71     cache_bound: uint,
72     cache_additions: AtomicUint,
73     cache_subtractions: AtomicUint,
74 }
75
76 unsafe impl<T: Send> Send for Queue<T> { }
77
78 unsafe impl<T: Send> Sync for Queue<T> { }
79
80 impl<T: Send> Node<T> {
81     fn new() -> *mut Node<T> {
82         unsafe {
83             mem::transmute(box Node {
84                 value: None,
85                 next: AtomicPtr::new(0 as *mut Node<T>),
86             })
87         }
88     }
89 }
90
91 impl<T: Send> Queue<T> {
92     /// Creates a new queue.
93     ///
94     /// This is unsafe as the type system doesn't enforce a single
95     /// consumer-producer relationship. It also allows the consumer to `pop`
96     /// items while there is a `peek` active due to all methods having a
97     /// non-mutable receiver.
98     ///
99     /// # Arguments
100     ///
101     ///   * `bound` - This queue implementation is implemented with a linked
102     ///               list, and this means that a push is always a malloc. In
103     ///               order to amortize this cost, an internal cache of nodes is
104     ///               maintained to prevent a malloc from always being
105     ///               necessary. This bound is the limit on the size of the
106     ///               cache (if desired). If the value is 0, then the cache has
107     ///               no bound. Otherwise, the cache will never grow larger than
108     ///               `bound` (although the queue itself could be much larger.
109     pub unsafe fn new(bound: uint) -> Queue<T> {
110         let n1 = Node::new();
111         let n2 = Node::new();
112         (*n1).next.store(n2, Ordering::Relaxed);
113         Queue {
114             tail: UnsafeCell::new(n2),
115             tail_prev: AtomicPtr::new(n1),
116             head: UnsafeCell::new(n2),
117             first: UnsafeCell::new(n1),
118             tail_copy: UnsafeCell::new(n1),
119             cache_bound: bound,
120             cache_additions: AtomicUint::new(0),
121             cache_subtractions: AtomicUint::new(0),
122         }
123     }
124
125     /// Pushes a new value onto this queue. Note that to use this function
126     /// safely, it must be externally guaranteed that there is only one pusher.
127     pub fn push(&self, t: T) {
128         unsafe {
129             // Acquire a node (which either uses a cached one or allocates a new
130             // one), and then append this to the 'head' node.
131             let n = self.alloc();
132             assert!((*n).value.is_none());
133             (*n).value = Some(t);
134             (*n).next.store(0 as *mut Node<T>, Ordering::Relaxed);
135             (**self.head.get()).next.store(n, Ordering::Release);
136             *self.head.get() = n;
137         }
138     }
139
140     unsafe fn alloc(&self) -> *mut Node<T> {
141         // First try to see if we can consume the 'first' node for our uses.
142         // We try to avoid as many atomic instructions as possible here, so
143         // the addition to cache_subtractions is not atomic (plus we're the
144         // only one subtracting from the cache).
145         if *self.first.get() != *self.tail_copy.get() {
146             if self.cache_bound > 0 {
147                 let b = self.cache_subtractions.load(Ordering::Relaxed);
148                 self.cache_subtractions.store(b + 1, Ordering::Relaxed);
149             }
150             let ret = *self.first.get();
151             *self.first.get() = (*ret).next.load(Ordering::Relaxed);
152             return ret;
153         }
154         // If the above fails, then update our copy of the tail and try
155         // again.
156         *self.tail_copy.get() = self.tail_prev.load(Ordering::Acquire);
157         if *self.first.get() != *self.tail_copy.get() {
158             if self.cache_bound > 0 {
159                 let b = self.cache_subtractions.load(Ordering::Relaxed);
160                 self.cache_subtractions.store(b + 1, Ordering::Relaxed);
161             }
162             let ret = *self.first.get();
163             *self.first.get() = (*ret).next.load(Ordering::Relaxed);
164             return ret;
165         }
166         // If all of that fails, then we have to allocate a new node
167         // (there's nothing in the node cache).
168         Node::new()
169     }
170
171     /// Attempts to pop a value from this queue. Remember that to use this type
172     /// safely you must ensure that there is only one popper at a time.
173     pub fn pop(&self) -> Option<T> {
174         unsafe {
175             // The `tail` node is not actually a used node, but rather a
176             // sentinel from where we should start popping from. Hence, look at
177             // tail's next field and see if we can use it. If we do a pop, then
178             // the current tail node is a candidate for going into the cache.
179             let tail = *self.tail.get();
180             let next = (*tail).next.load(Ordering::Acquire);
181             if next.is_null() { return None }
182             assert!((*next).value.is_some());
183             let ret = (*next).value.take();
184
185             *self.tail.get() = next;
186             if self.cache_bound == 0 {
187                 self.tail_prev.store(tail, Ordering::Release);
188             } else {
189                 // FIXME: this is dubious with overflow.
190                 let additions = self.cache_additions.load(Ordering::Relaxed);
191                 let subtractions = self.cache_subtractions.load(Ordering::Relaxed);
192                 let size = additions - subtractions;
193
194                 if size < self.cache_bound {
195                     self.tail_prev.store(tail, Ordering::Release);
196                     self.cache_additions.store(additions + 1, Ordering::Relaxed);
197                 } else {
198                     (*self.tail_prev.load(Ordering::Relaxed))
199                           .next.store(next, Ordering::Relaxed);
200                     // We have successfully erased all references to 'tail', so
201                     // now we can safely drop it.
202                     let _: Box<Node<T>> = mem::transmute(tail);
203                 }
204             }
205             return ret;
206         }
207     }
208
209     /// Attempts to peek at the head of the queue, returning `None` if the queue
210     /// has no data currently
211     ///
212     /// # Warning
213     /// The reference returned is invalid if it is not used before the consumer
214     /// pops the value off the queue. If the producer then pushes another value
215     /// onto the queue, it will overwrite the value pointed to by the reference.
216     pub fn peek<'a>(&'a self) -> Option<&'a mut T> {
217         // This is essentially the same as above with all the popping bits
218         // stripped out.
219         unsafe {
220             let tail = *self.tail.get();
221             let next = (*tail).next.load(Ordering::Acquire);
222             if next.is_null() { return None }
223             return (*next).value.as_mut();
224         }
225     }
226 }
227
228 #[unsafe_destructor]
229 impl<T: Send> Drop for Queue<T> {
230     fn drop(&mut self) {
231         unsafe {
232             let mut cur = *self.first.get();
233             while !cur.is_null() {
234                 let next = (*cur).next.load(Ordering::Relaxed);
235                 let _n: Box<Node<T>> = mem::transmute(cur);
236                 cur = next;
237             }
238         }
239     }
240 }
241
242 #[cfg(test)]
243 mod test {
244     use prelude::v1::*;
245
246     use sync::Arc;
247     use super::Queue;
248     use thread::Thread;
249     use sync::mpsc::channel;
250
251     #[test]
252     fn smoke() {
253         unsafe {
254             let queue = Queue::new(0);
255             queue.push(1i);
256             queue.push(2);
257             assert_eq!(queue.pop(), Some(1i));
258             assert_eq!(queue.pop(), Some(2));
259             assert_eq!(queue.pop(), None);
260             queue.push(3);
261             queue.push(4);
262             assert_eq!(queue.pop(), Some(3));
263             assert_eq!(queue.pop(), Some(4));
264             assert_eq!(queue.pop(), None);
265         }
266     }
267
268     #[test]
269     fn peek() {
270         unsafe {
271             let queue = Queue::new(0);
272             queue.push(vec![1i]);
273
274             // Ensure the borrowchecker works
275             match queue.peek() {
276                 Some(vec) => match vec.as_slice() {
277                     // Note that `pop` is not allowed here due to borrow
278                     [1] => {}
279                     _ => return
280                 },
281                 None => unreachable!()
282             }
283
284             queue.pop();
285         }
286     }
287
288     #[test]
289     fn drop_full() {
290         unsafe {
291             let q = Queue::new(0);
292             q.push(box 1i);
293             q.push(box 2i);
294         }
295     }
296
297     #[test]
298     fn smoke_bound() {
299         unsafe {
300             let q = Queue::new(0);
301             q.push(1i);
302             q.push(2);
303             assert_eq!(q.pop(), Some(1));
304             assert_eq!(q.pop(), Some(2));
305             assert_eq!(q.pop(), None);
306             q.push(3);
307             q.push(4);
308             assert_eq!(q.pop(), Some(3));
309             assert_eq!(q.pop(), Some(4));
310             assert_eq!(q.pop(), None);
311         }
312     }
313
314     #[test]
315     fn stress() {
316         unsafe {
317             stress_bound(0);
318             stress_bound(1);
319         }
320
321         unsafe fn stress_bound(bound: uint) {
322             let q = Arc::new(Queue::new(bound));
323
324             let (tx, rx) = channel();
325             let q2 = q.clone();
326             let _t = Thread::spawn(move|| {
327                 for _ in range(0u, 100000) {
328                     loop {
329                         match q2.pop() {
330                             Some(1i) => break,
331                             Some(_) => panic!(),
332                             None => {}
333                         }
334                     }
335                 }
336                 tx.send(()).unwrap();
337             });
338             for _ in range(0i, 100000) {
339                 q.push(1);
340             }
341             rx.recv().unwrap();
342         }
343     }
344 }