]> git.lizzy.rs Git - rust.git/blob - src/libstd/sync/mpmc_bounded_queue.rs
Convert most code to new inner attribute syntax.
[rust.git] / src / libstd / sync / mpmc_bounded_queue.rs
1 /* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
2  * Redistribution and use in source and binary forms, with or without
3  * modification, are permitted provided that the following conditions are met:
4  *
5  *    1. Redistributions of source code must retain the above copyright notice,
6  *       this list of conditions and the following disclaimer.
7  *
8  *    2. Redistributions in binary form must reproduce the above copyright
9  *       notice, this list of conditions and the following disclaimer in the
10  *       documentation and/or other materials provided with the distribution.
11  *
12  * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
13  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
14  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
15  * SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
16  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
17  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
18  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
19  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
20  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
21  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
22  *
23  * The views and conclusions contained in the software and documentation are
24  * those of the authors and should not be interpreted as representing official
25  * policies, either expressed or implied, of Dmitry Vyukov.
26  */
27
28 #![allow(missing_doc, dead_code)]
29
30 // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
31
32 use clone::Clone;
33 use kinds::Send;
34 use num::next_power_of_two;
35 use option::{Option, Some, None};
36 use sync::arc::UnsafeArc;
37 use sync::atomics::{AtomicUint,Relaxed,Release,Acquire};
38 use slice;
39
40 struct Node<T> {
41     sequence: AtomicUint,
42     value: Option<T>,
43 }
44
45 struct State<T> {
46     pad0: [u8, ..64],
47     buffer: ~[Node<T>],
48     mask: uint,
49     pad1: [u8, ..64],
50     enqueue_pos: AtomicUint,
51     pad2: [u8, ..64],
52     dequeue_pos: AtomicUint,
53     pad3: [u8, ..64],
54 }
55
56 pub struct Queue<T> {
57     priv state: UnsafeArc<State<T>>,
58 }
59
60 impl<T: Send> State<T> {
61     fn with_capacity(capacity: uint) -> State<T> {
62         let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 {
63             if capacity < 2 {
64                 2u
65             } else {
66                 // use next power of 2 as capacity
67                 next_power_of_two(capacity)
68             }
69         } else {
70             capacity
71         };
72         let buffer = slice::from_fn(capacity, |i| {
73             Node { sequence:AtomicUint::new(i), value: None }
74         });
75         State{
76             pad0: [0, ..64],
77             buffer: buffer,
78             mask: capacity-1,
79             pad1: [0, ..64],
80             enqueue_pos: AtomicUint::new(0),
81             pad2: [0, ..64],
82             dequeue_pos: AtomicUint::new(0),
83             pad3: [0, ..64],
84         }
85     }
86
87     fn push(&mut self, value: T) -> bool {
88         let mask = self.mask;
89         let mut pos = self.enqueue_pos.load(Relaxed);
90         loop {
91             let node = &mut self.buffer[pos & mask];
92             let seq = node.sequence.load(Acquire);
93             let diff: int = seq as int - pos as int;
94
95             if diff == 0 {
96                 let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed);
97                 if enqueue_pos == pos {
98                     node.value = Some(value);
99                     node.sequence.store(pos+1, Release);
100                     break
101                 } else {
102                     pos = enqueue_pos;
103                 }
104             } else if diff < 0 {
105                 return false
106             } else {
107                 pos = self.enqueue_pos.load(Relaxed);
108             }
109         }
110         true
111     }
112
113     fn pop(&mut self) -> Option<T> {
114         let mask = self.mask;
115         let mut pos = self.dequeue_pos.load(Relaxed);
116         loop {
117             let node = &mut self.buffer[pos & mask];
118             let seq = node.sequence.load(Acquire);
119             let diff: int = seq as int - (pos + 1) as int;
120             if diff == 0 {
121                 let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed);
122                 if dequeue_pos == pos {
123                     let value = node.value.take();
124                     node.sequence.store(pos + mask + 1, Release);
125                     return value
126                 } else {
127                     pos = dequeue_pos;
128                 }
129             } else if diff < 0 {
130                 return None
131             } else {
132                 pos = self.dequeue_pos.load(Relaxed);
133             }
134         }
135     }
136 }
137
138 impl<T: Send> Queue<T> {
139     pub fn with_capacity(capacity: uint) -> Queue<T> {
140         Queue{
141             state: UnsafeArc::new(State::with_capacity(capacity))
142         }
143     }
144
145     pub fn push(&mut self, value: T) -> bool {
146         unsafe { (*self.state.get()).push(value) }
147     }
148
149     pub fn pop(&mut self) -> Option<T> {
150         unsafe { (*self.state.get()).pop() }
151     }
152 }
153
154 impl<T: Send> Clone for Queue<T> {
155     fn clone(&self) -> Queue<T> {
156         Queue {
157             state: self.state.clone()
158         }
159     }
160 }
161
162 #[cfg(test)]
163 mod tests {
164     use prelude::*;
165     use option::*;
166     use super::Queue;
167     use native;
168
169     #[test]
170     fn test() {
171         let nthreads = 8u;
172         let nmsgs = 1000u;
173         let mut q = Queue::with_capacity(nthreads*nmsgs);
174         assert_eq!(None, q.pop());
175         let (tx, rx) = channel();
176
177         for _ in range(0, nthreads) {
178             let q = q.clone();
179             let tx = tx.clone();
180             native::task::spawn(proc() {
181                 let mut q = q;
182                 for i in range(0, nmsgs) {
183                     assert!(q.push(i));
184                 }
185                 tx.send(());
186             });
187         }
188
189         let mut completion_rxs = ~[];
190         for _ in range(0, nthreads) {
191             let (tx, rx) = channel();
192             completion_rxs.push(rx);
193             let q = q.clone();
194             native::task::spawn(proc() {
195                 let mut q = q;
196                 let mut i = 0u;
197                 loop {
198                     match q.pop() {
199                         None => {},
200                         Some(_) => {
201                             i += 1;
202                             if i == nmsgs { break }
203                         }
204                     }
205                 }
206                 tx.send(i);
207             });
208         }
209
210         for rx in completion_rxs.mut_iter() {
211             assert_eq!(nmsgs, rx.recv());
212         }
213         for _ in range(0, nthreads) {
214             rx.recv();
215         }
216     }
217 }