Skip to content

Commit

Permalink
Simplified API for buffer.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 19, 2020
1 parent afabc5e commit 666ac94
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 94 deletions.
9 changes: 1 addition & 8 deletions rust/arrow/src/array/array_boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::{convert::From, sync::Arc};
use super::*;
use super::{array::print_long_array, raw_pointer::RawPtrBox};
use crate::buffer::{Buffer, MutableBuffer};
use crate::memory;
use crate::util::bit_util;

/// Array of bools
Expand Down Expand Up @@ -148,10 +147,6 @@ impl From<ArrayDataRef> for BooleanArray {
"BooleanArray data should contain a single buffer only (values buffer)"
);
let raw_values = data.buffers()[0].raw_data();
assert!(
memory::is_aligned::<u8>(raw_values, mem::align_of::<bool>()),
"memory is not aligned"
);
Self {
data,
raw_values: RawPtrBox::new(raw_values as *const u8),
Expand Down Expand Up @@ -185,9 +180,7 @@ impl<Ptr: Borrow<Option<bool>>> FromIterator<Ptr> for BooleanArray {
let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
let mut val_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);

let data = unsafe {
std::slice::from_raw_parts_mut(val_buf.raw_data_mut(), val_buf.capacity())
};
let data = val_buf.data_mut();

let null_slice = null_buf.data_mut();
iter.enumerate().for_each(|(i, item)| {
Expand Down
5 changes: 1 addition & 4 deletions rust/arrow/src/array/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ pub trait BufferBuilderTrait<T: ArrowPrimitiveType> {
/// use arrow::array::{UInt8BufferBuilder, BufferBuilderTrait};
///
/// let mut builder = UInt8BufferBuilder::new(10);
///
/// assert!(builder.capacity() >= 10);
/// ```
fn new(capacity: usize) -> Self;

Expand Down Expand Up @@ -178,8 +176,6 @@ pub trait BufferBuilderTrait<T: ArrowPrimitiveType> {
///
/// let mut builder = UInt8BufferBuilder::new(10);
/// builder.reserve(10);
///
/// assert!(builder.capacity() >= 20);
/// ```
fn reserve(&mut self, n: usize);

Expand Down Expand Up @@ -364,6 +360,7 @@ impl BooleanBufferBuilder {
self.reserve(n);
if n != 0 && v {
let data = self.buffer.data_mut();
(self.len..self.len + n).for_each(|i| bit_util::set_bit(data, i))
}
self.len += n;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion rust/arrow/src/array/transform/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub(super) fn build_extend<T: OffsetSizeTrait>(array: &ArrayData) -> Extend {

let buffer = &mut mutable.buffers[0];
let delta_len = array.len() - array.null_count();
buffer.reserve(buffer.len() + delta_len * std::mem::size_of::<T>());
buffer.reserve(delta_len * std::mem::size_of::<T>());

let child = &mut mutable.child_data[0];
(start..start + len).for_each(|i| {
Expand Down
2 changes: 1 addition & 1 deletion rust/arrow/src/array/transform/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub(super) fn extend_offsets<T: OffsetSizeTrait>(
mut last_offset: T,
offsets: &[T],
) {
buffer.reserve(buffer.len() + offsets.len() * std::mem::size_of::<T>());
buffer.reserve(offsets.len() * std::mem::size_of::<T>());
offsets.windows(2).for_each(|offsets| {
// compute the new offset
let length = offsets[1] - offsets[0];
Expand Down
4 changes: 1 addition & 3 deletions rust/arrow/src/array/transform/variable_size.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ pub(super) fn build_extend<T: OffsetSizeTrait>(array: &ArrayData) -> Extend {
let (offset_buffer, values_buffer) = mutable.buffers.split_at_mut(1);
let offset_buffer = &mut offset_buffer[0];
let values_buffer = &mut values_buffer[0];
offset_buffer.reserve(
offset_buffer.len() + array.len() * std::mem::size_of::<T>(),
);
offset_buffer.reserve(array.len() * std::mem::size_of::<T>());

(start..start + len).for_each(|i| {
if array.is_valid(i) {
Expand Down
66 changes: 36 additions & 30 deletions rust/arrow/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,8 @@
#[cfg(feature = "simd")]
use packed_simd::u8x64;

use crate::{
bytes::{Bytes, Deallocation},
ffi,
};
use crate::bytes::{Bytes, Deallocation};

use std::cmp;
use std::convert::AsRef;
use std::fmt::Debug;
use std::mem;
Expand All @@ -44,34 +40,44 @@ use crate::util::bit_util::ceil;
#[cfg(any(feature = "simd", feature = "avx512"))]
use std::borrow::BorrowMut;

/// Buffer is a contiguous memory region of fixed size and is aligned at a 64-byte
/// boundary. Buffer is immutable.
/// Buffer is a shared contiguous memory region.
#[derive(Clone, PartialEq, Debug)]
pub struct Buffer {
/// Reference-counted pointer to the internal byte buffer.
data: Arc<Bytes>,

/// The offset into the buffer.
/// The offset into contiguous memory region
offset: usize,
}

impl Buffer {
pub(crate) fn new(data: Bytes) -> Self {
Self {
data: Arc::new(data),
offset: 0,
}
}

/// Returns the number of bytes in the buffer
#[inline]
pub fn len(&self) -> usize {
self.data.len() - self.offset
}

/// Returns whether the buffer is empty.
#[inline]
pub fn is_empty(&self) -> bool {
self.data.len() - self.offset == 0
self.len() == 0
}

/// Returns the byte slice stored in this buffer
#[inline]
pub fn data(&self) -> &[u8] {
&self.data.as_slice()[self.offset..]
}

/// Returns a slice of this buffer, starting from `offset`.
#[inline]
pub fn slice(&self, offset: usize) -> Self {
assert!(
offset <= self.len(),
Expand All @@ -87,8 +93,9 @@ impl Buffer {
///
/// Note that this should be used cautiously, and the returned pointer should not be
/// stored anywhere, to avoid dangling pointers.
#[inline]
pub fn raw_data(&self) -> *const u8 {
unsafe { self.data.raw_data().add(self.offset) }
self.data.as_slice()[self.offset..].as_ptr()
}

/// View buffer as typed slice.
Expand All @@ -101,6 +108,7 @@ impl Buffer {
///
/// Also `typed_data::<bool>` is unsafe as `0x00` and `0x01` are the only valid values for
/// `bool` in Rust. However, `bool` arrays in Arrow are bit-packed which breaks this condition.
#[inline]
pub unsafe fn typed_data<T: ArrowNativeType + num::Num>(&self) -> &[T] {
assert_eq!(self.len() % mem::size_of::<T>(), 0);
from_raw_parts(
Expand All @@ -112,6 +120,7 @@ impl Buffer {
/// Returns a slice of this buffer starting at a certain bit offset.
/// If the offset is byte-aligned the returned buffer is a shallow clone,
/// otherwise a new buffer is allocated and filled with a copy of the bits in the range.
#[inline]
pub fn bit_slice(&self, offset: usize, len: usize) -> Self {
if offset % 8 == 0 && len % 8 == 0 {
return self.slice(offset / 8);
Expand Down Expand Up @@ -149,11 +158,8 @@ impl Buffer {
/// allocated memory region.
impl<T: AsRef<[u8]>> From<T> for Buffer {
fn from(p: T) -> Self {
let a = p.as_ref();
Self {
data: Arc::new(a.to_vec()),
offset: 0,
}
let bytes = unsafe { Bytes::new(p.as_ref().to_vec(), Deallocation::Native) };
Buffer::new(bytes)
}
}

Expand Down Expand Up @@ -597,9 +603,6 @@ impl Not for &Buffer {
}
}

unsafe impl Sync for Buffer {}
unsafe impl Send for Buffer {}

/// Similar to `Buffer`, but is growable and can be mutated. A mutable buffer can be
/// converted into a immutable buffer via the `freeze` method.
#[derive(Debug)]
Expand All @@ -609,13 +612,15 @@ pub struct MutableBuffer {

impl MutableBuffer {
/// Allocate a new mutable buffer with initial capacity to be `capacity`.
#[inline]
pub fn new(capacity: usize) -> Self {
Self {
data: Vec::with_capacity(capacity),
}
}

/// creates a new [MutableBuffer] where every bit is initialized to `0`
#[inline]
pub fn new_null(len: usize) -> Self {
let num_bytes = bit_util::ceil(len, 8);
MutableBuffer::new(num_bytes).with_bitset(num_bytes, false)
Expand All @@ -627,6 +632,7 @@ impl MutableBuffer {
/// This is useful when one wants to clear (or set) the bits and then manipulate
/// the buffer directly (e.g., modifying the buffer by holding a mutable reference
/// from `data_mut()`).
#[inline]
pub fn with_bitset(mut self, end: usize, val: bool) -> Self {
assert!(end <= self.data.capacity());
let v = if val { 255 } else { 0 };
Expand All @@ -642,21 +648,18 @@ impl MutableBuffer {
/// This is used to initialize the bits in a buffer, however, it has no impact on the
/// `len` of the buffer and so can be used to initialize the memory region from
/// `len` to `capacity`.
#[inline]
pub fn set_null_bits(&mut self, start: usize, count: usize) {
assert!(start + count <= self.data.capacity());
unsafe {
std::ptr::write_bytes(self.data.as_mut_ptr().add(start), 0, count);
}
}

/// Ensures that this buffer has at least `capacity` slots in this buffer. This will
/// also ensure the new capacity will be a multiple of 64 bytes.
///
/// Returns the new capacity for this buffer.
pub fn reserve(&mut self, capacity: usize) {
if capacity < self.data.capacity() {
self.data.reserve(self.data.capacity() - capacity);
}
/// Reserves capacity for at least `additional` more bytes
#[inline]
pub fn reserve(&mut self, additional: usize) {
self.data.reserve(additional);
}

/// Resizes the buffer so that the `len` will equal to the `new_len`.
Expand All @@ -666,6 +669,7 @@ impl MutableBuffer {
/// `new_len` will be zeroed out.
///
/// If `new_len` is less than `len`, the buffer will be truncated.
#[inline]
pub fn resize(&mut self, new_len: usize) {
self.data.resize(new_len, 0);
}
Expand All @@ -683,26 +687,28 @@ impl MutableBuffer {
}

/// Clear all existing data from this buffer.
#[inline]
pub fn clear(&mut self) {
self.data.clear()
}

/// Returns the data stored in this buffer as a slice.
#[inline]
pub fn data(&self) -> &[u8] {
&self.data
}

/// Returns the data stored in this buffer as a mutable slice.
#[inline]
pub fn data_mut(&mut self) -> &mut [u8] {
&mut self.data
}

/// Freezes this buffer and return an immutable version of it.
#[inline]
pub fn freeze(self) -> Buffer {
Buffer {
data: Arc::new(self.data),
offset: 0,
}
let data = unsafe { Bytes::new(self.data.to_vec(), Deallocation::Native) };
Buffer::new(data)
}

/// View buffer as typed slice.
Expand Down
Loading

0 comments on commit 666ac94

Please sign in to comment.