]> git.lizzy.rs Git - rust.git/blob - src/libsync/spsc_queue.rs
55d2f3062baea48a6ef393f624e5a6106ada997f
[rust.git] / src / libsync / spsc_queue.rs
1 /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
2  * Redistribution and use in source and binary forms, with or without
3  * modification, are permitted provided that the following conditions are met:
4  *
5  *    1. Redistributions of source code must retain the above copyright notice,
6  *       this list of conditions and the following disclaimer.
7  *
8  *    2. Redistributions in binary form must reproduce the above copyright
9  *       notice, this list of conditions and the following disclaimer in the
10  *       documentation and/or other materials provided with the distribution.
11  *
12  * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
13  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
14  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
15  * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
16  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
17  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
18  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
19  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
20  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
21  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22  *
23  * The views and conclusions contained in the software and documentation are
24  * those of the authors and should not be interpreted as representing official
25  * policies, either expressed or implied, of Dmitry Vyukov.
26  */
27
28 // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
29
30 //! A single-producer single-consumer concurrent queue
31 //!
32 //! This module contains the implementation of an SPSC queue which can be used
33 //! concurrently between two tasks. This data structure is safe to use and
34 //! enforces the semantics that there is one pusher and one popper.
35
36 #![experimental]
37
38 use core::prelude::*;
39
40 use alloc::owned::Box;
41 use core::mem;
42 use core::ty::Unsafe;
43
44 use atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
45
46 // Node within the linked list queue of messages to send
47 struct Node<T> {
48     // FIXME: this could be an uninitialized T if we're careful enough, and
49     //      that would reduce memory usage (and be a bit faster).
50     //      is it worth it?
51     value: Option<T>,           // nullable for re-use of nodes
52     next: AtomicPtr<Node<T>>,   // next node in the queue
53 }
54
55 /// The single-producer single-consumer queue. This structure is not cloneable,
56 /// but it can be safely shared in an Arc if it is guaranteed that there
57 /// is only one popper and one pusher touching the queue at any one point in
58 /// time.
59 pub struct Queue<T> {
60     // consumer fields
61     tail: Unsafe<*mut Node<T>>, // where to pop from
62     tail_prev: AtomicPtr<Node<T>>, // where to pop from
63
64     // producer fields
65     head: Unsafe<*mut Node<T>>,      // where to push to
66     first: Unsafe<*mut Node<T>>,     // where to get new nodes from
67     tail_copy: Unsafe<*mut Node<T>>, // between first/tail
68
69     // Cache maintenance fields. Additions and subtractions are stored
70     // separately in order to allow them to use nonatomic addition/subtraction.
71     cache_bound: uint,
72     cache_additions: AtomicUint,
73     cache_subtractions: AtomicUint,
74 }
75
76 impl<T: Send> Node<T> {
77     fn new() -> *mut Node<T> {
78         unsafe {
79             mem::transmute(box Node {
80                 value: None,
81                 next: AtomicPtr::new(0 as *mut Node<T>),
82             })
83         }
84     }
85 }
86
87 impl<T: Send> Queue<T> {
88     /// Creates a new queue. The producer returned is connected to the consumer
89     /// to push all data to the consumer.
90     ///
91     /// # Arguments
92     ///
93     ///   * `bound` - This queue implementation is implemented with a linked
94     ///               list, and this means that a push is always a malloc. In
95     ///               order to amortize this cost, an internal cache of nodes is
96     ///               maintained to prevent a malloc from always being
97     ///               necessary. This bound is the limit on the size of the
98     ///               cache (if desired). If the value is 0, then the cache has
99     ///               no bound. Otherwise, the cache will never grow larger than
100     ///               `bound` (although the queue itself could be much larger.
101     pub fn new(bound: uint) -> Queue<T> {
102         let n1 = Node::new();
103         let n2 = Node::new();
104         unsafe { (*n1).next.store(n2, Relaxed) }
105         Queue {
106             tail: Unsafe::new(n2),
107             tail_prev: AtomicPtr::new(n1),
108             head: Unsafe::new(n2),
109             first: Unsafe::new(n1),
110             tail_copy: Unsafe::new(n1),
111             cache_bound: bound,
112             cache_additions: AtomicUint::new(0),
113             cache_subtractions: AtomicUint::new(0),
114         }
115     }
116
117     /// Pushes a new value onto this queue. Note that to use this function
118     /// safely, it must be externally guaranteed that there is only one pusher.
119     pub fn push(&self, t: T) {
120         unsafe {
121             // Acquire a node (which either uses a cached one or allocates a new
122             // one), and then append this to the 'head' node.
123             let n = self.alloc();
124             assert!((*n).value.is_none());
125             (*n).value = Some(t);
126             (*n).next.store(0 as *mut Node<T>, Relaxed);
127             (**self.head.get()).next.store(n, Release);
128             *self.head.get() = n;
129         }
130     }
131
132     unsafe fn alloc(&self) -> *mut Node<T> {
133         // First try to see if we can consume the 'first' node for our uses.
134         // We try to avoid as many atomic instructions as possible here, so
135         // the addition to cache_subtractions is not atomic (plus we're the
136         // only one subtracting from the cache).
137         if *self.first.get() != *self.tail_copy.get() {
138             if self.cache_bound > 0 {
139                 let b = self.cache_subtractions.load(Relaxed);
140                 self.cache_subtractions.store(b + 1, Relaxed);
141             }
142             let ret = *self.first.get();
143             *self.first.get() = (*ret).next.load(Relaxed);
144             return ret;
145         }
146         // If the above fails, then update our copy of the tail and try
147         // again.
148         *self.tail_copy.get() = self.tail_prev.load(Acquire);
149         if *self.first.get() != *self.tail_copy.get() {
150             if self.cache_bound > 0 {
151                 let b = self.cache_subtractions.load(Relaxed);
152                 self.cache_subtractions.store(b + 1, Relaxed);
153             }
154             let ret = *self.first.get();
155             *self.first.get() = (*ret).next.load(Relaxed);
156             return ret;
157         }
158         // If all of that fails, then we have to allocate a new node
159         // (there's nothing in the node cache).
160         Node::new()
161     }
162
163     /// Attempts to pop a value from this queue. Remember that to use this type
164     /// safely you must ensure that there is only one popper at a time.
165     pub fn pop(&self) -> Option<T> {
166         unsafe {
167             // The `tail` node is not actually a used node, but rather a
168             // sentinel from where we should start popping from. Hence, look at
169             // tail's next field and see if we can use it. If we do a pop, then
170             // the current tail node is a candidate for going into the cache.
171             let tail = *self.tail.get();
172             let next = (*tail).next.load(Acquire);
173             if next.is_null() { return None }
174             assert!((*next).value.is_some());
175             let ret = (*next).value.take();
176
177             *self.tail.get() = next;
178             if self.cache_bound == 0 {
179                 self.tail_prev.store(tail, Release);
180             } else {
181                 // FIXME: this is dubious with overflow.
182                 let additions = self.cache_additions.load(Relaxed);
183                 let subtractions = self.cache_subtractions.load(Relaxed);
184                 let size = additions - subtractions;
185
186                 if size < self.cache_bound {
187                     self.tail_prev.store(tail, Release);
188                     self.cache_additions.store(additions + 1, Relaxed);
189                 } else {
190                     (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed);
191                     // We have successfully erased all references to 'tail', so
192                     // now we can safely drop it.
193                     let _: Box<Node<T>> = mem::transmute(tail);
194                 }
195             }
196             return ret;
197         }
198     }
199
200     /// Attempts to peek at the head of the queue, returning `None` if the queue
201     /// has no data currently
202     pub fn peek<'a>(&'a self) -> Option<&'a mut T> {
203         // This is essentially the same as above with all the popping bits
204         // stripped out.
205         unsafe {
206             let tail = *self.tail.get();
207             let next = (*tail).next.load(Acquire);
208             if next.is_null() { return None }
209             return (*next).value.as_mut();
210         }
211     }
212 }
213
214 #[unsafe_destructor]
215 impl<T: Send> Drop for Queue<T> {
216     fn drop(&mut self) {
217         unsafe {
218             let mut cur = *self.first.get();
219             while !cur.is_null() {
220                 let next = (*cur).next.load(Relaxed);
221                 let _n: Box<Node<T>> = mem::transmute(cur);
222                 cur = next;
223             }
224         }
225     }
226 }
227
228 #[cfg(test)]
229 mod test {
230     use std::prelude::*;
231
232     use alloc::arc::Arc;
233     use native;
234
235     use super::Queue;
236
237     #[test]
238     fn smoke() {
239         let q = Queue::new(0);
240         q.push(1);
241         q.push(2);
242         assert_eq!(q.pop(), Some(1));
243         assert_eq!(q.pop(), Some(2));
244         assert_eq!(q.pop(), None);
245         q.push(3);
246         q.push(4);
247         assert_eq!(q.pop(), Some(3));
248         assert_eq!(q.pop(), Some(4));
249         assert_eq!(q.pop(), None);
250     }
251
252     #[test]
253     fn drop_full() {
254         let q = Queue::new(0);
255         q.push(box 1);
256         q.push(box 2);
257     }
258
259     #[test]
260     fn smoke_bound() {
261         let q = Queue::new(1);
262         q.push(1);
263         q.push(2);
264         assert_eq!(q.pop(), Some(1));
265         assert_eq!(q.pop(), Some(2));
266         assert_eq!(q.pop(), None);
267         q.push(3);
268         q.push(4);
269         assert_eq!(q.pop(), Some(3));
270         assert_eq!(q.pop(), Some(4));
271         assert_eq!(q.pop(), None);
272     }
273
274     #[test]
275     fn stress() {
276         stress_bound(0);
277         stress_bound(1);
278
279         fn stress_bound(bound: uint) {
280             let a = Arc::new(Queue::new(bound));
281             let b = a.clone();
282             let (tx, rx) = channel();
283             native::task::spawn(proc() {
284                 for _ in range(0, 100000) {
285                     loop {
286                         match b.pop() {
287                             Some(1) => break,
288                             Some(_) => fail!(),
289                             None => {}
290                         }
291                     }
292                 }
293                 tx.send(());
294             });
295             for _ in range(0, 100000) {
296                 a.push(1);
297             }
298             rx.recv();
299         }
300     }
301 }