]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpsc/spsc_queue.rs
doc: remove incomplete sentence
[rust.git] / src / libstd / sync / mpsc / spsc_queue.rs
1 /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
2  * Redistribution and use in source and binary forms, with or without
3  * modification, are permitted provided that the following conditions are met:
4  *
5  *    1. Redistributions of source code must retain the above copyright notice,
6  *       this list of conditions and the following disclaimer.
7  *
8  *    2. Redistributions in binary form must reproduce the above copyright
9  *       notice, this list of conditions and the following disclaimer in the
10  *       documentation and/or other materials provided with the distribution.
11  *
12  * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
13  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
14  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
15  * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
16  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
17  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
18  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
19  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
20  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
21  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22  *
23  * The views and conclusions contained in the software and documentation are
24  * those of the authors and should not be interpreted as representing official
25  * policies, either expressed or implied, of Dmitry Vyukov.
26  */
27
28 // http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
29
30 //! A single-producer single-consumer concurrent queue
31 //!
32 //! This module contains the implementation of an SPSC queue which can be used
33 //! concurrently between two tasks. This data structure is safe to use and
34 //! enforces the semantics that there is one pusher and one popper.
35
36 #![experimental]
37
38 use core::prelude::*;
39
40 use alloc::boxed::Box;
41 use core::mem;
42 use core::cell::UnsafeCell;
43
44 use sync::atomic::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
45
46 // Node within the linked list queue of messages to send
47 struct Node<T> {
48     // FIXME: this could be an uninitialized T if we're careful enough, and
49     //      that would reduce memory usage (and be a bit faster).
50     //      is it worth it?
51     value: Option<T>,           // nullable for re-use of nodes
52     next: AtomicPtr<Node<T>>,   // next node in the queue
53 }
54
55 /// The single-producer single-consumer queue. This structure is not cloneable,
56 /// but it can be safely shared in an Arc if it is guaranteed that there
57 /// is only one popper and one pusher touching the queue at any one point in
58 /// time.
59 pub struct Queue<T> {
60     // consumer fields
61     tail: UnsafeCell<*mut Node<T>>, // where to pop from
62     tail_prev: AtomicPtr<Node<T>>, // where to pop from
63
64     // producer fields
65     head: UnsafeCell<*mut Node<T>>,      // where to push to
66     first: UnsafeCell<*mut Node<T>>,     // where to get new nodes from
67     tail_copy: UnsafeCell<*mut Node<T>>, // between first/tail
68
69     // Cache maintenance fields. Additions and subtractions are stored
70     // separately in order to allow them to use nonatomic addition/subtraction.
71     cache_bound: uint,
72     cache_additions: AtomicUint,
73     cache_subtractions: AtomicUint,
74 }
75
76 unsafe impl<T: Send> Send for Queue<T> { }
77
78 unsafe impl<T: Send> Sync for Queue<T> { }
79
80 impl<T: Send> Node<T> {
81     fn new() -> *mut Node<T> {
82         unsafe {
83             mem::transmute(box Node {
84                 value: None,
85                 next: AtomicPtr::new(0 as *mut Node<T>),
86             })
87         }
88     }
89 }
90
91 impl<T: Send> Queue<T> {
92     /// Creates a new queue.
93     ///
94     /// This is unsafe as the type system doesn't enforce a single
95     /// consumer-producer relationship. It also allows the consumer to `pop`
96     /// items while there is a `peek` active due to all methods having a
97     /// non-mutable receiver.
98     ///
99     /// # Arguments
100     ///
101     ///   * `bound` - This queue implementation is implemented with a linked
102     ///               list, and this means that a push is always a malloc. In
103     ///               order to amortize this cost, an internal cache of nodes is
104     ///               maintained to prevent a malloc from always being
105     ///               necessary. This bound is the limit on the size of the
106     ///               cache (if desired). If the value is 0, then the cache has
107     ///               no bound. Otherwise, the cache will never grow larger than
108     ///               `bound` (although the queue itself could be much larger.
109     pub unsafe fn new(bound: uint) -> Queue<T> {
110         let n1 = Node::new();
111         let n2 = Node::new();
112         (*n1).next.store(n2, Relaxed);
113         Queue {
114             tail: UnsafeCell::new(n2),
115             tail_prev: AtomicPtr::new(n1),
116             head: UnsafeCell::new(n2),
117             first: UnsafeCell::new(n1),
118             tail_copy: UnsafeCell::new(n1),
119             cache_bound: bound,
120             cache_additions: AtomicUint::new(0),
121             cache_subtractions: AtomicUint::new(0),
122         }
123     }
124
125     /// Pushes a new value onto this queue. Note that to use this function
126     /// safely, it must be externally guaranteed that there is only one pusher.
127     pub fn push(&self, t: T) {
128         unsafe {
129             // Acquire a node (which either uses a cached one or allocates a new
130             // one), and then append this to the 'head' node.
131             let n = self.alloc();
132             assert!((*n).value.is_none());
133             (*n).value = Some(t);
134             (*n).next.store(0 as *mut Node<T>, Relaxed);
135             (**self.head.get()).next.store(n, Release);
136             *self.head.get() = n;
137         }
138     }
139
140     unsafe fn alloc(&self) -> *mut Node<T> {
141         // First try to see if we can consume the 'first' node for our uses.
142         // We try to avoid as many atomic instructions as possible here, so
143         // the addition to cache_subtractions is not atomic (plus we're the
144         // only one subtracting from the cache).
145         if *self.first.get() != *self.tail_copy.get() {
146             if self.cache_bound > 0 {
147                 let b = self.cache_subtractions.load(Relaxed);
148                 self.cache_subtractions.store(b + 1, Relaxed);
149             }
150             let ret = *self.first.get();
151             *self.first.get() = (*ret).next.load(Relaxed);
152             return ret;
153         }
154         // If the above fails, then update our copy of the tail and try
155         // again.
156         *self.tail_copy.get() = self.tail_prev.load(Acquire);
157         if *self.first.get() != *self.tail_copy.get() {
158             if self.cache_bound > 0 {
159                 let b = self.cache_subtractions.load(Relaxed);
160                 self.cache_subtractions.store(b + 1, Relaxed);
161             }
162             let ret = *self.first.get();
163             *self.first.get() = (*ret).next.load(Relaxed);
164             return ret;
165         }
166         // If all of that fails, then we have to allocate a new node
167         // (there's nothing in the node cache).
168         Node::new()
169     }
170
171     /// Attempts to pop a value from this queue. Remember that to use this type
172     /// safely you must ensure that there is only one popper at a time.
173     pub fn pop(&self) -> Option<T> {
174         unsafe {
175             // The `tail` node is not actually a used node, but rather a
176             // sentinel from where we should start popping from. Hence, look at
177             // tail's next field and see if we can use it. If we do a pop, then
178             // the current tail node is a candidate for going into the cache.
179             let tail = *self.tail.get();
180             let next = (*tail).next.load(Acquire);
181             if next.is_null() { return None }
182             assert!((*next).value.is_some());
183             let ret = (*next).value.take();
184
185             *self.tail.get() = next;
186             if self.cache_bound == 0 {
187                 self.tail_prev.store(tail, Release);
188             } else {
189                 // FIXME: this is dubious with overflow.
190                 let additions = self.cache_additions.load(Relaxed);
191                 let subtractions = self.cache_subtractions.load(Relaxed);
192                 let size = additions - subtractions;
193
194                 if size < self.cache_bound {
195                     self.tail_prev.store(tail, Release);
196                     self.cache_additions.store(additions + 1, Relaxed);
197                 } else {
198                     (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed);
199                     // We have successfully erased all references to 'tail', so
200                     // now we can safely drop it.
201                     let _: Box<Node<T>> = mem::transmute(tail);
202                 }
203             }
204             return ret;
205         }
206     }
207
208     /// Attempts to peek at the head of the queue, returning `None` if the queue
209     /// has no data currently
210     ///
211     /// # Warning
212     /// The reference returned is invalid if it is not used before the consumer
213     /// pops the value off the queue. If the producer then pushes another value
214     /// onto the queue, it will overwrite the value pointed to by the reference.
215     pub fn peek<'a>(&'a self) -> Option<&'a mut T> {
216         // This is essentially the same as above with all the popping bits
217         // stripped out.
218         unsafe {
219             let tail = *self.tail.get();
220             let next = (*tail).next.load(Acquire);
221             if next.is_null() { return None }
222             return (*next).value.as_mut();
223         }
224     }
225 }
226
227 #[unsafe_destructor]
228 impl<T: Send> Drop for Queue<T> {
229     fn drop(&mut self) {
230         unsafe {
231             let mut cur = *self.first.get();
232             while !cur.is_null() {
233                 let next = (*cur).next.load(Relaxed);
234                 let _n: Box<Node<T>> = mem::transmute(cur);
235                 cur = next;
236             }
237         }
238     }
239 }
240
241 #[cfg(test)]
242 mod test {
243     use prelude::v1::*;
244
245     use sync::Arc;
246     use super::Queue;
247     use thread::Thread;
248     use sync::mpsc::channel;
249
250     #[test]
251     fn smoke() {
252         unsafe {
253             let queue = Queue::new(0);
254             queue.push(1i);
255             queue.push(2);
256             assert_eq!(queue.pop(), Some(1i));
257             assert_eq!(queue.pop(), Some(2));
258             assert_eq!(queue.pop(), None);
259             queue.push(3);
260             queue.push(4);
261             assert_eq!(queue.pop(), Some(3));
262             assert_eq!(queue.pop(), Some(4));
263             assert_eq!(queue.pop(), None);
264         }
265     }
266
267     #[test]
268     fn peek() {
269         unsafe {
270             let queue = Queue::new(0);
271             queue.push(vec![1i]);
272
273             // Ensure the borrowchecker works
274             match queue.peek() {
275                 Some(vec) => match vec.as_slice() {
276                     // Note that `pop` is not allowed here due to borrow
277                     [1] => {}
278                     _ => return
279                 },
280                 None => unreachable!()
281             }
282
283             queue.pop();
284         }
285     }
286
287     #[test]
288     fn drop_full() {
289         unsafe {
290             let q = Queue::new(0);
291             q.push(box 1i);
292             q.push(box 2i);
293         }
294     }
295
296     #[test]
297     fn smoke_bound() {
298         unsafe {
299             let q = Queue::new(0);
300             q.push(1i);
301             q.push(2);
302             assert_eq!(q.pop(), Some(1));
303             assert_eq!(q.pop(), Some(2));
304             assert_eq!(q.pop(), None);
305             q.push(3);
306             q.push(4);
307             assert_eq!(q.pop(), Some(3));
308             assert_eq!(q.pop(), Some(4));
309             assert_eq!(q.pop(), None);
310         }
311     }
312
313     #[test]
314     fn stress() {
315         unsafe {
316             stress_bound(0);
317             stress_bound(1);
318         }
319
320         unsafe fn stress_bound(bound: uint) {
321             let q = Arc::new(Queue::new(bound));
322
323             let (tx, rx) = channel();
324             let q2 = q.clone();
325             let _t = Thread::spawn(move|| {
326                 for _ in range(0u, 100000) {
327                     loop {
328                         match q2.pop() {
329                             Some(1i) => break,
330                             Some(_) => panic!(),
331                             None => {}
332                         }
333                     }
334                 }
335                 tx.send(()).unwrap();
336             });
337             for _ in range(0i, 100000) {
338                 q.push(1);
339             }
340             rx.recv().unwrap();
341         }
342     }
343 }