Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement BufQueue type #371

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions src/buf/buf_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use alloc::collections::VecDeque;
use crate::Buf;
use core::cmp;
#[cfg(feature = "std")]
use std::io::IoSlice;

/// Ring buffer of buffers.
///
/// `push` operation appends a buffer to the tail of the buffer,
/// and read operations (`bytes`, `bytes_vectored`, `advance` etc)
/// pop elements from the head of the buffer.
///
/// This type can be used to implement an outgoing network buffer,
/// when the front of the queue is written to the network and the back
/// of the queue gets new messages.
///
/// # Note
///
/// This type caches the remaining size (sum of all remaining sizes of all buffers).
/// If buffers owned by this `BufQueue` get their remaining size modified
/// not through this type, the behavior is undefined:
/// operations may hang forever, panic or produce otherwise unexpected results
/// (but not violate memory access).
#[derive(Debug)]
pub struct BufQueue<B: Buf> {
deque: VecDeque<B>,
remaining: usize,
}

impl<B: Buf> BufQueue<B> {
/// Create an empty queue.
pub fn new() -> Self {
BufQueue::default()
}

/// Push a buf to the back of the deque.
///
/// This operation is no-op if the buf has no remaining.
///
/// # Panics
///
/// This struct tracks the total remaining, and panics if
/// the total overflows `usize`.
pub fn push(&mut self, buf: B) {
let rem = buf.remaining();
if rem != 0 {
self.deque.push_back(buf);
self.remaining = self.remaining.checked_add(rem).expect("remaining overflow");
}
}
}

impl<B: Buf> Default for BufQueue<B> {
fn default() -> Self {
BufQueue {
deque: VecDeque::default(),
remaining: 0,
}
}
}

impl<B: Buf> Buf for BufQueue<B> {
fn remaining(&self) -> usize {
self.remaining
}

fn bytes(&self) -> &[u8] {
match self.deque.front() {
Some(b) => b.bytes(),
None => &[],
}
}

#[cfg(feature = "std")]
fn bytes_vectored<'a>(&'a self, mut dst: &mut [IoSlice<'a>]) -> usize {
let mut n = 0;
for b in &self.deque {
if dst.is_empty() {
break;
}
let next = b.bytes_vectored(dst);
dst = &mut dst[next..];
n += next;
}
n
}

fn advance(&mut self, mut cnt: usize) {
while cnt != 0 {
let front = self.deque.front_mut().expect("must not be empty");
let rem = front.remaining();
let advance = cmp::min(cnt, rem);
front.advance(advance);
if rem == advance {
self.deque.pop_front().unwrap();
}
cnt -= advance;
self.remaining -= advance;
}
}
}
3 changes: 2 additions & 1 deletion src/buf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ mod buf_mut;
pub mod ext;
mod iter;
mod vec_deque;
mod buf_queue;

pub use self::buf_impl::Buf;
pub use self::buf_mut::BufMut;
pub use self::ext::{BufExt, BufMutExt};
#[cfg(feature = "std")]
pub use self::buf_mut::IoSliceMut;
pub use self::iter::IntoIter;

pub use self::buf_queue::BufQueue;
131 changes: 131 additions & 0 deletions tests/test_buf_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#![deny(warnings, rust_2018_idioms)]

use bytes::buf::BufQueue;
use bytes::Bytes;
use bytes::Buf;
use std::collections::VecDeque;
use std::cmp;
use std::io::IoSlice;


#[test]
fn simple() {
let mut queue = BufQueue::new();
queue.push(Bytes::copy_from_slice(b"abc"));
queue.push(Bytes::copy_from_slice(b"de"));
assert_eq!(5, queue.remaining());
assert_eq!(b"abc", queue.bytes());
queue.advance(1);
assert_eq!(4, queue.remaining());
assert_eq!(b"bc", queue.bytes());
queue.advance(2);
assert_eq!(2, queue.remaining());
assert_eq!(b"de", queue.bytes());
queue.push(Bytes::copy_from_slice(b"fgh"));
assert_eq!(5, queue.remaining());
assert_eq!(b"de", queue.bytes());
// advance past front bytes
queue.advance(4);
assert_eq!(1, queue.remaining());
assert_eq!(b"h", queue.bytes());
queue.advance(1);
assert_eq!(0, queue.remaining());
assert_eq!(b"", queue.bytes());
}

struct Rng {
state: u32,
}

impl Rng {
// copy-paste from https://en.wikipedia.org/wiki/Xorshift
fn next(&mut self) -> u32 {
let mut x = self.state;
x ^= x << 13;
x ^= x >> 17;
x ^= x << 5;
self.state = x;
x
}
}

#[test]
fn random() {
let mut rng = Rng { state: 1 };

// Update these two synchronously
let mut correct: VecDeque<u8> = Default::default();
let mut testing: BufQueue<BufQueue<Bytes>> = Default::default();

for _ in 0..10000 {
// uncomment to have a look at what is tested
//println!("{:?}", testing);

assert_eq!(correct.remaining(), testing.remaining());

let bytes = testing.bytes();
assert!(correct.len() == 0 || bytes.len() != 0);
assert_eq!(bytes, &correct.iter().cloned().take(bytes.len()).collect::<Vec<_>>()[..]);

if correct.len() >= 1000 || rng.next() % 2 == 0 {
let take = cmp::min(rng.next() as usize % 10, correct.len());
testing.advance(take);
correct.advance(take);
} else {
let mut inner = BufQueue::new();

let inner_len = rng.next() % 3;
for _ in 0..inner_len {
let bytes_len = rng.next() % 5;
let v: Vec<u8> = (0..bytes_len).map(|_| rng.next() as u8).collect();
correct.extend(&v);
inner.push(Bytes::from(v));
}

testing.push(inner);

assert_eq!(correct.len(), testing.remaining());
}
}
}

#[test]
fn vectored() {
let mut v: BufQueue<BufQueue<Bytes>> = Default::default();
v.push({
let mut i = BufQueue::new();
i.push(Bytes::copy_from_slice(b"ab"));
i.push(Bytes::copy_from_slice(b"cde"));
i
});
v.push({
let mut i = BufQueue::new();
i.push(Bytes::copy_from_slice(b"fg"));
i
});

let zero = &mut [];
assert_eq!(0, v.bytes_vectored(zero));

let mut one = [IoSlice::new(&[])];
assert_eq!(1, v.bytes_vectored(&mut one));
assert_eq!(b"ab", &*one[0]);

let mut two = [IoSlice::new(&[]), IoSlice::new(&[])];
assert_eq!(2, v.bytes_vectored(&mut two));
assert_eq!(b"ab", &*two[0]);
assert_eq!(b"cde", &*two[1]);

let mut three = [IoSlice::new(&[]), IoSlice::new(&[]), IoSlice::new(&[])];
assert_eq!(3, v.bytes_vectored(&mut three));
assert_eq!(b"ab", &*three[0]);
assert_eq!(b"cde", &*three[1]);
assert_eq!(b"fg", &*three[2]);

let mut four = [IoSlice::new(&[]), IoSlice::new(&[]), IoSlice::new(&[]), IoSlice::new(&[])];
assert_eq!(3, v.bytes_vectored(&mut four));
assert_eq!(b"ab", &*four[0]);
assert_eq!(b"cde", &*four[1]);
assert_eq!(b"fg", &*four[2]);
assert_eq!(b"", &*four[3]);
}