Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libsync: Add safer abstraction for SPSC queue. #15995

Merged
merged 1 commit into from
Aug 1, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/libsync/comm/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ enum Message<T> {
impl<T: Send> Packet<T> {
pub fn new() -> Packet<T> {
Packet {
queue: spsc::Queue::new(128),
queue: unsafe { spsc::Queue::new(128) },

cnt: atomics::AtomicInt::new(0),
steals: 0,
Expand Down
177 changes: 142 additions & 35 deletions src/libsync/spsc_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use core::prelude::*;
use alloc::boxed::Box;
use core::mem;
use core::cell::UnsafeCell;
use alloc::arc::Arc;

use atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};

Expand Down Expand Up @@ -73,6 +74,39 @@ pub struct Queue<T> {
cache_subtractions: AtomicUint,
}

/// A safe abstraction for the consumer in a single-producer single-consumer
/// queue.
pub struct Consumer<T> {
inner: Arc<Queue<T>>
}

impl<T: Send> Consumer<T> {
/// Attempts to pop the value from the head of the queue, returning `None`
/// if the queue is empty.
pub fn pop(&mut self) -> Option<T> {
self.inner.pop()
}

/// Attempts to peek at the head of the queue, returning `None` if the queue
/// is empty.
pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> {
self.inner.peek()
}
}

/// A safe abstraction for the producer in a single-producer single-consumer
/// queue.
pub struct Producer<T> {
inner: Arc<Queue<T>>
}

impl<T: Send> Producer<T> {
/// Pushes a new value onto the queue.
pub fn push(&mut self, t: T) {
self.inner.push(t)
}
}

impl<T: Send> Node<T> {
fn new() -> *mut Node<T> {
unsafe {
Expand All @@ -84,9 +118,37 @@ impl<T: Send> Node<T> {
}
}

/// Creates a new queue with a consumer-producer pair.
///
/// The producer returned is connected to the consumer to push all data to
/// the consumer.
///
/// # Arguments
///
/// * `bound` - This queue implementation is implemented with a linked
/// list, and this means that a push is always a malloc. In
/// order to amortize this cost, an internal cache of nodes is
/// maintained to prevent a malloc from always being
/// necessary. This bound is the limit on the size of the
/// cache (if desired). If the value is 0, then the cache has
/// no bound. Otherwise, the cache will never grow larger than
/// `bound` (although the queue itself could be much larger.
pub fn queue<T: Send>(bound: uint) -> (Consumer<T>, Producer<T>) {
let q = unsafe { Queue::new(bound) };
let arc = Arc::new(q);
let consumer = Consumer { inner: arc.clone() };
let producer = Producer { inner: arc };

(consumer, producer)
}

impl<T: Send> Queue<T> {
/// Creates a new queue. The producer returned is connected to the consumer
/// to push all data to the consumer.
/// Creates a new queue.
///
/// This is unsafe as the type system doesn't enforce a single
/// consumer-producer relationship. It also allows the consumer to `pop`
/// items while there is a `peek` active due to all methods having a
/// non-mutable receiver.
///
/// # Arguments
///
Expand All @@ -98,10 +160,10 @@ impl<T: Send> Queue<T> {
/// cache (if desired). If the value is 0, then the cache has
/// no bound. Otherwise, the cache will never grow larger than
/// `bound` (although the queue itself could be much larger.
pub fn new(bound: uint) -> Queue<T> {
pub unsafe fn new(bound: uint) -> Queue<T> {
let n1 = Node::new();
let n2 = Node::new();
unsafe { (*n1).next.store(n2, Relaxed) }
(*n1).next.store(n2, Relaxed);
Queue {
tail: UnsafeCell::new(n2),
tail_prev: AtomicPtr::new(n1),
Expand Down Expand Up @@ -199,6 +261,11 @@ impl<T: Send> Queue<T> {

/// Attempts to peek at the head of the queue, returning `None` if the queue
/// has no data currently
///
/// # Warning
/// The reference returned is invalid if it is not used before the consumer
/// pops the value off the queue. If the producer then pushes another value
/// onto the queue, it will overwrite the value pointed to by the reference.
pub fn peek<'a>(&'a self) -> Option<&'a mut T> {
// This is essentially the same as above with all the popping bits
// stripped out.
Expand Down Expand Up @@ -229,46 +296,84 @@ impl<T: Send> Drop for Queue<T> {
mod test {
use std::prelude::*;

use alloc::arc::Arc;
use native;

use super::Queue;
use super::{queue, Queue};

#[test]
fn smoke() {
let q = Queue::new(0);
q.push(1i);
q.push(2);
assert_eq!(q.pop(), Some(1));
assert_eq!(q.pop(), Some(2));
assert_eq!(q.pop(), None);
q.push(3);
q.push(4);
assert_eq!(q.pop(), Some(3));
assert_eq!(q.pop(), Some(4));
assert_eq!(q.pop(), None);
let (mut consumer, mut producer) = queue(0);
producer.push(1i);
producer.push(2);
assert_eq!(consumer.pop(), Some(1i));
assert_eq!(consumer.pop(), Some(2));
assert_eq!(consumer.pop(), None);
producer.push(3);
producer.push(4);
assert_eq!(consumer.pop(), Some(3));
assert_eq!(consumer.pop(), Some(4));
assert_eq!(consumer.pop(), None);
}

// This behaviour is blocked by the type system if using the safe constructor
#[test]
fn pop_peeked_unchecked() {
let q = unsafe { Queue::new(0) };
q.push(vec![1i]);
q.push(vec![2]);
let peeked = q.peek().unwrap();

assert_eq!(*peeked, vec![1]);
assert_eq!(q.pop(), Some(vec![1]));

assert_eq!(*peeked, vec![1]);
q.push(vec![7]);

// Note: This should actually expect 1, but this test is to highlight
// the unsafety allowed by the unchecked usage. A Rust user would not
// expect their peeked value to mutate like this without the type system
// complaining.
assert_eq!(*peeked, vec![7]);
}

#[test]
fn peek() {
let (mut consumer, mut producer) = queue(0);
producer.push(vec![1i]);

// Ensure the borrowchecker works
match consumer.peek() {
Some(vec) => match vec.as_slice() {
// Note that `pop` is not allowed here due to borrow
[1] => {}
_ => return
},
None => unreachable!()
}

consumer.pop();
}

#[test]
fn drop_full() {
let q = Queue::new(0);
q.push(box 1i);
q.push(box 2i);
let (_, mut producer) = queue(0);
producer.push(box 1i);
producer.push(box 2i);
}

#[test]
fn smoke_bound() {
let q = Queue::new(1);
q.push(1i);
q.push(2);
assert_eq!(q.pop(), Some(1));
assert_eq!(q.pop(), Some(2));
assert_eq!(q.pop(), None);
q.push(3);
q.push(4);
assert_eq!(q.pop(), Some(3));
assert_eq!(q.pop(), Some(4));
assert_eq!(q.pop(), None);
let (mut consumer, mut producer) = queue(1);
producer.push(1i);
producer.push(2);
assert_eq!(consumer.pop(), Some(1));
assert_eq!(consumer.pop(), Some(2));
assert_eq!(consumer.pop(), None);
producer.push(3);
producer.push(4);
assert_eq!(consumer.pop(), Some(3));
assert_eq!(consumer.pop(), Some(4));
assert_eq!(consumer.pop(), None);
}

#[test]
Expand All @@ -277,13 +382,15 @@ mod test {
stress_bound(1);

fn stress_bound(bound: uint) {
let a = Arc::new(Queue::new(bound));
let b = a.clone();
let (consumer, mut producer) = queue(bound);

let (tx, rx) = channel();
native::task::spawn(proc() {
// Move the consumer to a local mutable slot
let mut consumer = consumer;
for _ in range(0u, 100000) {
loop {
match b.pop() {
match consumer.pop() {
Some(1i) => break,
Some(_) => fail!(),
None => {}
Expand All @@ -293,7 +400,7 @@ mod test {
tx.send(());
});
for _ in range(0i, 100000) {
a.push(1);
producer.push(1);
}
rx.recv();
}
Expand Down