]> git.lizzy.rs Git - rust.git/commitdiff
libsync: Add safer abstraction for SPSC queue.
authorKevin Butler <haqkrs@gmail.com>
Fri, 25 Jul 2014 19:58:27 +0000 (20:58 +0100)
committerKevin Butler <haqkrs@gmail.com>
Fri, 1 Aug 2014 19:09:03 +0000 (20:09 +0100)
The current spsc implementation doesn't enforce single-producer
single-consumer usage and also allows unsafe memory use through
peek & pop.

For safer usage, `spsc_queue::queue` now returns a pair of owned objects which
only allow consumer or producer behaviours through an `Arc`.
Through restricting the mutability of the receiver to `mut` the
peek and pop behaviour becomes safe again, with the compiler
complaining about usage which could lead to problems.

To fix code broken from this, update:
Queue::new(x) -> unsafe { Queue::new(x) }

[breaking-change]

src/libsync/comm/stream.rs
src/libsync/spsc_queue.rs

index 9747c207a22612fdce2a6fa71f8b6f8dc23389a3..f8a28b7600f316b5fb7e74b3f1cae77276a60192 100644 (file)
@@ -74,7 +74,7 @@ enum Message<T> {
 impl<T: Send> Packet<T> {
     pub fn new() -> Packet<T> {
         Packet {
-            queue: spsc::Queue::new(128),
+            queue: unsafe { spsc::Queue::new(128) },
 
             cnt: atomics::AtomicInt::new(0),
             steals: 0,
index 0cda1098ab447a695f707c481101b0d1cfdf967e..d8cd44f993594006d981f38e925507aa72cec413 100644 (file)
@@ -40,6 +40,7 @@
 use alloc::boxed::Box;
 use core::mem;
 use core::cell::UnsafeCell;
+use alloc::arc::Arc;
 
 use atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
 
@@ -73,6 +74,39 @@ pub struct Queue<T> {
     cache_subtractions: AtomicUint,
 }
 
+/// A safe abstraction for the consumer in a single-producer single-consumer
+/// queue.
+pub struct Consumer<T> {
+    inner: Arc<Queue<T>>
+}
+
+impl<T: Send> Consumer<T> {
+    /// Attempts to pop the value from the head of the queue, returning `None`
+    /// if the queue is empty.
+    pub fn pop(&mut self) -> Option<T> {
+        self.inner.pop()
+    }
+
+    /// Attempts to peek at the head of the queue, returning `None` if the queue
+    /// is empty.
+    pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> {
+        self.inner.peek()
+    }
+}
+
+/// A safe abstraction for the producer in a single-producer single-consumer
+/// queue.
+pub struct Producer<T> {
+    inner: Arc<Queue<T>>
+}
+
+impl<T: Send> Producer<T> {
+    /// Pushes a new value onto the queue.
+    pub fn push(&mut self, t: T) {
+        self.inner.push(t)
+    }
+}
+
 impl<T: Send> Node<T> {
     fn new() -> *mut Node<T> {
         unsafe {
@@ -84,9 +118,37 @@ fn new() -> *mut Node<T> {
     }
 }
 
+/// Creates a new queue with a consumer-producer pair.
+///
+/// The producer returned is connected to the consumer to push all data to
+/// the consumer.
+///
+/// # Arguments
+///
+///   * `bound` - This queue implementation is implemented with a linked
+///               list, and this means that a push is always a malloc. In
+///               order to amortize this cost, an internal cache of nodes is
+///               maintained to prevent a malloc from always being
+///               necessary. This bound is the limit on the size of the
+///               cache (if desired). If the value is 0, then the cache has
+///               no bound. Otherwise, the cache will never grow larger than
+///               `bound` (although the queue itself could be much larger.
+pub fn queue<T: Send>(bound: uint) -> (Consumer<T>, Producer<T>) {
+    let q = unsafe { Queue::new(bound) };
+    let arc = Arc::new(q);
+    let consumer = Consumer { inner: arc.clone() };
+    let producer = Producer { inner: arc };
+
+    (consumer, producer)
+}
+
 impl<T: Send> Queue<T> {
-    /// Creates a new queue. The producer returned is connected to the consumer
-    /// to push all data to the consumer.
+    /// Creates a new queue.
+    ///
+    /// This is unsafe as the type system doesn't enforce a single
+    /// consumer-producer relationship. It also allows the consumer to `pop`
+    /// items while there is a `peek` active due to all methods having a
+    /// non-mutable receiver.
     ///
     /// # Arguments
     ///
@@ -98,10 +160,10 @@ impl<T: Send> Queue<T> {
     ///               cache (if desired). If the value is 0, then the cache has
     ///               no bound. Otherwise, the cache will never grow larger than
     ///               `bound` (although the queue itself could be much larger.
-    pub fn new(bound: uint) -> Queue<T> {
+    pub unsafe fn new(bound: uint) -> Queue<T> {
         let n1 = Node::new();
         let n2 = Node::new();
-        unsafe { (*n1).next.store(n2, Relaxed) }
+        (*n1).next.store(n2, Relaxed);
         Queue {
             tail: UnsafeCell::new(n2),
             tail_prev: AtomicPtr::new(n1),
@@ -199,6 +261,11 @@ pub fn pop(&self) -> Option<T> {
 
     /// Attempts to peek at the head of the queue, returning `None` if the queue
     /// has no data currently
+    ///
+    /// # Warning
+    /// The reference returned is invalid if it is not used before the consumer
+    /// pops the value off the queue. If the producer then pushes another value
+    /// onto the queue, it will overwrite the value pointed to by the reference.
     pub fn peek<'a>(&'a self) -> Option<&'a mut T> {
         // This is essentially the same as above with all the popping bits
         // stripped out.
@@ -229,46 +296,84 @@ fn drop(&mut self) {
 mod test {
     use std::prelude::*;
 
-    use alloc::arc::Arc;
     use native;
 
-    use super::Queue;
+    use super::{queue, Queue};
 
     #[test]
     fn smoke() {
-        let q = Queue::new(0);
-        q.push(1i);
-        q.push(2);
-        assert_eq!(q.pop(), Some(1));
-        assert_eq!(q.pop(), Some(2));
-        assert_eq!(q.pop(), None);
-        q.push(3);
-        q.push(4);
-        assert_eq!(q.pop(), Some(3));
-        assert_eq!(q.pop(), Some(4));
-        assert_eq!(q.pop(), None);
+        let (mut consumer, mut producer) = queue(0);
+        producer.push(1i);
+        producer.push(2);
+        assert_eq!(consumer.pop(), Some(1i));
+        assert_eq!(consumer.pop(), Some(2));
+        assert_eq!(consumer.pop(), None);
+        producer.push(3);
+        producer.push(4);
+        assert_eq!(consumer.pop(), Some(3));
+        assert_eq!(consumer.pop(), Some(4));
+        assert_eq!(consumer.pop(), None);
+    }
+
+    // This behaviour is blocked by the type system if using the safe constructor
+    #[test]
+    fn pop_peeked_unchecked() {
+        let q = unsafe { Queue::new(0) };
+        q.push(vec![1i]);
+        q.push(vec![2]);
+        let peeked = q.peek().unwrap();
+
+        assert_eq!(*peeked, vec![1]);
+        assert_eq!(q.pop(), Some(vec![1]));
+
+        assert_eq!(*peeked, vec![1]);
+        q.push(vec![7]);
+
+        // Note: This should actually expect 1, but this test is to highlight
+        // the unsafety allowed by the unchecked usage. A Rust user would not
+        // expect their peeked value to mutate like this without the type system
+        // complaining.
+        assert_eq!(*peeked, vec![7]);
+    }
+
+    #[test]
+    fn peek() {
+        let (mut consumer, mut producer) = queue(0);
+        producer.push(vec![1i]);
+
+        // Ensure the borrowchecker works
+        match consumer.peek() {
+            Some(vec) => match vec.as_slice() {
+                // Note that `pop` is not allowed here due to borrow
+                [1] => {}
+                _ => return
+            },
+            None => unreachable!()
+        }
+
+        consumer.pop();
     }
 
     #[test]
     fn drop_full() {
-        let q = Queue::new(0);
-        q.push(box 1i);
-        q.push(box 2i);
+        let (_, mut producer) = queue(0);
+        producer.push(box 1i);
+        producer.push(box 2i);
     }
 
     #[test]
     fn smoke_bound() {
-        let q = Queue::new(1);
-        q.push(1i);
-        q.push(2);
-        assert_eq!(q.pop(), Some(1));
-        assert_eq!(q.pop(), Some(2));
-        assert_eq!(q.pop(), None);
-        q.push(3);
-        q.push(4);
-        assert_eq!(q.pop(), Some(3));
-        assert_eq!(q.pop(), Some(4));
-        assert_eq!(q.pop(), None);
+        let (mut consumer, mut producer) = queue(1);
+        producer.push(1i);
+        producer.push(2);
+        assert_eq!(consumer.pop(), Some(1));
+        assert_eq!(consumer.pop(), Some(2));
+        assert_eq!(consumer.pop(), None);
+        producer.push(3);
+        producer.push(4);
+        assert_eq!(consumer.pop(), Some(3));
+        assert_eq!(consumer.pop(), Some(4));
+        assert_eq!(consumer.pop(), None);
     }
 
     #[test]
@@ -277,13 +382,15 @@ fn stress() {
         stress_bound(1);
 
         fn stress_bound(bound: uint) {
-            let a = Arc::new(Queue::new(bound));
-            let b = a.clone();
+            let (consumer, mut producer) = queue(bound);
+
             let (tx, rx) = channel();
             native::task::spawn(proc() {
+                // Move the consumer to a local mutable slot
+                let mut consumer = consumer;
                 for _ in range(0u, 100000) {
                     loop {
-                        match b.pop() {
+                        match consumer.pop() {
                             Some(1i) => break,
                             Some(_) => fail!(),
                             None => {}
@@ -293,7 +400,7 @@ fn stress_bound(bound: uint) {
                 tx.send(());
             });
             for _ in range(0i, 100000) {
-                a.push(1);
+                producer.push(1);
             }
             rx.recv();
         }