diff --git a/src/libsync/comm/stream.rs b/src/libsync/comm/stream.rs index 9747c207a2261..f8a28b7600f31 100644 --- a/src/libsync/comm/stream.rs +++ b/src/libsync/comm/stream.rs @@ -74,7 +74,7 @@ enum Message { impl Packet { pub fn new() -> Packet { Packet { - queue: spsc::Queue::new(128), + queue: unsafe { spsc::Queue::new(128) }, cnt: atomics::AtomicInt::new(0), steals: 0, diff --git a/src/libsync/spsc_queue.rs b/src/libsync/spsc_queue.rs index 0cda1098ab447..d8cd44f993594 100644 --- a/src/libsync/spsc_queue.rs +++ b/src/libsync/spsc_queue.rs @@ -40,6 +40,7 @@ use core::prelude::*; use alloc::boxed::Box; use core::mem; use core::cell::UnsafeCell; +use alloc::arc::Arc; use atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; @@ -73,6 +74,39 @@ pub struct Queue { cache_subtractions: AtomicUint, } +/// A safe abstraction for the consumer in a single-producer single-consumer +/// queue. +pub struct Consumer { + inner: Arc> +} + +impl Consumer { + /// Attempts to pop the value from the head of the queue, returning `None` + /// if the queue is empty. + pub fn pop(&mut self) -> Option { + self.inner.pop() + } + + /// Attempts to peek at the head of the queue, returning `None` if the queue + /// is empty. + pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> { + self.inner.peek() + } +} + +/// A safe abstraction for the producer in a single-producer single-consumer +/// queue. +pub struct Producer { + inner: Arc> +} + +impl Producer { + /// Pushes a new value onto the queue. + pub fn push(&mut self, t: T) { + self.inner.push(t) + } +} + impl Node { fn new() -> *mut Node { unsafe { @@ -84,9 +118,37 @@ impl Node { } } +/// Creates a new queue with a consumer-producer pair. +/// +/// The producer returned is connected to the consumer to push all data to +/// the consumer. +/// +/// # Arguments +/// +/// * `bound` - This queue implementation is implemented with a linked +/// list, and this means that a push is always a malloc. In +/// order to amortize this cost, an internal cache of nodes is +/// maintained to prevent a malloc from always being +/// necessary. This bound is the limit on the size of the +/// cache (if desired). If the value is 0, then the cache has +/// no bound. Otherwise, the cache will never grow larger than +/// `bound` (although the queue itself could be much larger. +pub fn queue(bound: uint) -> (Consumer, Producer) { + let q = unsafe { Queue::new(bound) }; + let arc = Arc::new(q); + let consumer = Consumer { inner: arc.clone() }; + let producer = Producer { inner: arc }; + + (consumer, producer) +} + impl Queue { - /// Creates a new queue. The producer returned is connected to the consumer - /// to push all data to the consumer. + /// Creates a new queue. + /// + /// This is unsafe as the type system doesn't enforce a single + /// consumer-producer relationship. It also allows the consumer to `pop` + /// items while there is a `peek` active due to all methods having a + /// non-mutable receiver. /// /// # Arguments /// @@ -98,10 +160,10 @@ impl Queue { /// cache (if desired). If the value is 0, then the cache has /// no bound. Otherwise, the cache will never grow larger than /// `bound` (although the queue itself could be much larger. - pub fn new(bound: uint) -> Queue { + pub unsafe fn new(bound: uint) -> Queue { let n1 = Node::new(); let n2 = Node::new(); - unsafe { (*n1).next.store(n2, Relaxed) } + (*n1).next.store(n2, Relaxed); Queue { tail: UnsafeCell::new(n2), tail_prev: AtomicPtr::new(n1), @@ -199,6 +261,11 @@ impl Queue { /// Attempts to peek at the head of the queue, returning `None` if the queue /// has no data currently + /// + /// # Warning + /// The reference returned is invalid if it is not used before the consumer + /// pops the value off the queue. If the producer then pushes another value + /// onto the queue, it will overwrite the value pointed to by the reference. pub fn peek<'a>(&'a self) -> Option<&'a mut T> { // This is essentially the same as above with all the popping bits // stripped out. @@ -229,46 +296,84 @@ impl Drop for Queue { mod test { use std::prelude::*; - use alloc::arc::Arc; use native; - use super::Queue; + use super::{queue, Queue}; #[test] fn smoke() { - let q = Queue::new(0); - q.push(1i); - q.push(2); - assert_eq!(q.pop(), Some(1)); - assert_eq!(q.pop(), Some(2)); - assert_eq!(q.pop(), None); - q.push(3); - q.push(4); - assert_eq!(q.pop(), Some(3)); - assert_eq!(q.pop(), Some(4)); - assert_eq!(q.pop(), None); + let (mut consumer, mut producer) = queue(0); + producer.push(1i); + producer.push(2); + assert_eq!(consumer.pop(), Some(1i)); + assert_eq!(consumer.pop(), Some(2)); + assert_eq!(consumer.pop(), None); + producer.push(3); + producer.push(4); + assert_eq!(consumer.pop(), Some(3)); + assert_eq!(consumer.pop(), Some(4)); + assert_eq!(consumer.pop(), None); + } + + // This behaviour is blocked by the type system if using the safe constructor + #[test] + fn pop_peeked_unchecked() { + let q = unsafe { Queue::new(0) }; + q.push(vec![1i]); + q.push(vec![2]); + let peeked = q.peek().unwrap(); + + assert_eq!(*peeked, vec![1]); + assert_eq!(q.pop(), Some(vec![1])); + + assert_eq!(*peeked, vec![1]); + q.push(vec![7]); + + // Note: This should actually expect 1, but this test is to highlight + // the unsafety allowed by the unchecked usage. A Rust user would not + // expect their peeked value to mutate like this without the type system + // complaining. + assert_eq!(*peeked, vec![7]); + } + + #[test] + fn peek() { + let (mut consumer, mut producer) = queue(0); + producer.push(vec![1i]); + + // Ensure the borrowchecker works + match consumer.peek() { + Some(vec) => match vec.as_slice() { + // Note that `pop` is not allowed here due to borrow + [1] => {} + _ => return + }, + None => unreachable!() + } + + consumer.pop(); } #[test] fn drop_full() { - let q = Queue::new(0); - q.push(box 1i); - q.push(box 2i); + let (_, mut producer) = queue(0); + producer.push(box 1i); + producer.push(box 2i); } #[test] fn smoke_bound() { - let q = Queue::new(1); - q.push(1i); - q.push(2); - assert_eq!(q.pop(), Some(1)); - assert_eq!(q.pop(), Some(2)); - assert_eq!(q.pop(), None); - q.push(3); - q.push(4); - assert_eq!(q.pop(), Some(3)); - assert_eq!(q.pop(), Some(4)); - assert_eq!(q.pop(), None); + let (mut consumer, mut producer) = queue(1); + producer.push(1i); + producer.push(2); + assert_eq!(consumer.pop(), Some(1)); + assert_eq!(consumer.pop(), Some(2)); + assert_eq!(consumer.pop(), None); + producer.push(3); + producer.push(4); + assert_eq!(consumer.pop(), Some(3)); + assert_eq!(consumer.pop(), Some(4)); + assert_eq!(consumer.pop(), None); } #[test] @@ -277,13 +382,15 @@ mod test { stress_bound(1); fn stress_bound(bound: uint) { - let a = Arc::new(Queue::new(bound)); - let b = a.clone(); + let (consumer, mut producer) = queue(bound); + let (tx, rx) = channel(); native::task::spawn(proc() { + // Move the consumer to a local mutable slot + let mut consumer = consumer; for _ in range(0u, 100000) { loop { - match b.pop() { + match consumer.pop() { Some(1i) => break, Some(_) => fail!(), None => {} @@ -293,7 +400,7 @@ mod test { tx.send(()); }); for _ in range(0i, 100000) { - a.push(1); + producer.push(1); } rx.recv(); }