use alloc::boxed::Box;
use core::mem;
use core::cell::UnsafeCell;
+use alloc::arc::Arc;
use atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
cache_subtractions: AtomicUint,
}
+/// A safe abstraction for the consumer in a single-producer single-consumer
+/// queue.
+pub struct Consumer<T> {
+ inner: Arc<Queue<T>>
+}
+
+impl<T: Send> Consumer<T> {
+ /// Attempts to pop the value from the head of the queue, returning `None`
+ /// if the queue is empty.
+ pub fn pop(&mut self) -> Option<T> {
+ self.inner.pop()
+ }
+
+ /// Attempts to peek at the head of the queue, returning `None` if the queue
+ /// is empty.
+ pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> {
+ self.inner.peek()
+ }
+}
+
+/// A safe abstraction for the producer in a single-producer single-consumer
+/// queue.
+pub struct Producer<T> {
+ inner: Arc<Queue<T>>
+}
+
+impl<T: Send> Producer<T> {
+ /// Pushes a new value onto the queue.
+ pub fn push(&mut self, t: T) {
+ self.inner.push(t)
+ }
+}
+
impl<T: Send> Node<T> {
fn new() -> *mut Node<T> {
unsafe {
}
}
+/// Creates a new queue with a consumer-producer pair.
+///
+/// The producer returned is connected to the consumer to push all data to
+/// the consumer.
+///
+/// # Arguments
+///
+/// * `bound` - This queue implementation is implemented with a linked
+/// list, and this means that a push is always a malloc. In
+/// order to amortize this cost, an internal cache of nodes is
+/// maintained to prevent a malloc from always being
+/// necessary. This bound is the limit on the size of the
+/// cache (if desired). If the value is 0, then the cache has
+/// no bound. Otherwise, the cache will never grow larger than
+/// `bound` (although the queue itself could be much larger.
+pub fn queue<T: Send>(bound: uint) -> (Consumer<T>, Producer<T>) {
+ let q = unsafe { Queue::new(bound) };
+ let arc = Arc::new(q);
+ let consumer = Consumer { inner: arc.clone() };
+ let producer = Producer { inner: arc };
+
+ (consumer, producer)
+}
+
impl<T: Send> Queue<T> {
- /// Creates a new queue. The producer returned is connected to the consumer
- /// to push all data to the consumer.
+ /// Creates a new queue.
+ ///
+ /// This is unsafe as the type system doesn't enforce a single
+ /// consumer-producer relationship. It also allows the consumer to `pop`
+ /// items while there is a `peek` active due to all methods having a
+ /// non-mutable receiver.
///
/// # Arguments
///
/// cache (if desired). If the value is 0, then the cache has
/// no bound. Otherwise, the cache will never grow larger than
/// `bound` (although the queue itself could be much larger.
- pub fn new(bound: uint) -> Queue<T> {
+ pub unsafe fn new(bound: uint) -> Queue<T> {
let n1 = Node::new();
let n2 = Node::new();
- unsafe { (*n1).next.store(n2, Relaxed) }
+ (*n1).next.store(n2, Relaxed);
Queue {
tail: UnsafeCell::new(n2),
tail_prev: AtomicPtr::new(n1),
/// Attempts to peek at the head of the queue, returning `None` if the queue
/// has no data currently
+ ///
+ /// # Warning
+ /// The reference returned is invalid if it is not used before the consumer
+ /// pops the value off the queue. If the producer then pushes another value
+ /// onto the queue, it will overwrite the value pointed to by the reference.
pub fn peek<'a>(&'a self) -> Option<&'a mut T> {
// This is essentially the same as above with all the popping bits
// stripped out.
mod test {
use std::prelude::*;
- use alloc::arc::Arc;
use native;
- use super::Queue;
+ use super::{queue, Queue};
#[test]
fn smoke() {
- let q = Queue::new(0);
- q.push(1i);
- q.push(2);
- assert_eq!(q.pop(), Some(1));
- assert_eq!(q.pop(), Some(2));
- assert_eq!(q.pop(), None);
- q.push(3);
- q.push(4);
- assert_eq!(q.pop(), Some(3));
- assert_eq!(q.pop(), Some(4));
- assert_eq!(q.pop(), None);
+ let (mut consumer, mut producer) = queue(0);
+ producer.push(1i);
+ producer.push(2);
+ assert_eq!(consumer.pop(), Some(1i));
+ assert_eq!(consumer.pop(), Some(2));
+ assert_eq!(consumer.pop(), None);
+ producer.push(3);
+ producer.push(4);
+ assert_eq!(consumer.pop(), Some(3));
+ assert_eq!(consumer.pop(), Some(4));
+ assert_eq!(consumer.pop(), None);
+ }
+
+ // This behaviour is blocked by the type system if using the safe constructor
+ #[test]
+ fn pop_peeked_unchecked() {
+ let q = unsafe { Queue::new(0) };
+ q.push(vec![1i]);
+ q.push(vec![2]);
+ let peeked = q.peek().unwrap();
+
+ assert_eq!(*peeked, vec![1]);
+ assert_eq!(q.pop(), Some(vec![1]));
+
+ assert_eq!(*peeked, vec![1]);
+ q.push(vec![7]);
+
+ // Note: This should actually expect 1, but this test is to highlight
+ // the unsafety allowed by the unchecked usage. A Rust user would not
+ // expect their peeked value to mutate like this without the type system
+ // complaining.
+ assert_eq!(*peeked, vec![7]);
+ }
+
+ #[test]
+ fn peek() {
+ let (mut consumer, mut producer) = queue(0);
+ producer.push(vec![1i]);
+
+ // Ensure the borrowchecker works
+ match consumer.peek() {
+ Some(vec) => match vec.as_slice() {
+ // Note that `pop` is not allowed here due to borrow
+ [1] => {}
+ _ => return
+ },
+ None => unreachable!()
+ }
+
+ consumer.pop();
}
#[test]
fn drop_full() {
- let q = Queue::new(0);
- q.push(box 1i);
- q.push(box 2i);
+ let (_, mut producer) = queue(0);
+ producer.push(box 1i);
+ producer.push(box 2i);
}
#[test]
fn smoke_bound() {
- let q = Queue::new(1);
- q.push(1i);
- q.push(2);
- assert_eq!(q.pop(), Some(1));
- assert_eq!(q.pop(), Some(2));
- assert_eq!(q.pop(), None);
- q.push(3);
- q.push(4);
- assert_eq!(q.pop(), Some(3));
- assert_eq!(q.pop(), Some(4));
- assert_eq!(q.pop(), None);
+ let (mut consumer, mut producer) = queue(1);
+ producer.push(1i);
+ producer.push(2);
+ assert_eq!(consumer.pop(), Some(1));
+ assert_eq!(consumer.pop(), Some(2));
+ assert_eq!(consumer.pop(), None);
+ producer.push(3);
+ producer.push(4);
+ assert_eq!(consumer.pop(), Some(3));
+ assert_eq!(consumer.pop(), Some(4));
+ assert_eq!(consumer.pop(), None);
}
#[test]
stress_bound(1);
fn stress_bound(bound: uint) {
- let a = Arc::new(Queue::new(bound));
- let b = a.clone();
+ let (consumer, mut producer) = queue(bound);
+
let (tx, rx) = channel();
native::task::spawn(proc() {
+ // Move the consumer to a local mutable slot
+ let mut consumer = consumer;
for _ in range(0u, 100000) {
loop {
- match b.pop() {
+ match consumer.pop() {
Some(1i) => break,
Some(_) => fail!(),
None => {}
tx.send(());
});
for _ in range(0i, 100000) {
- a.push(1);
+ producer.push(1);
}
rx.recv();
}