From 5647a30e980b5ec9888283d3892b53afa9618153 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 19 Jan 2021 18:41:18 +0100 Subject: [PATCH] ARROW-11108: [Rust] Fixed performance issue in mutableBuffer. This PR refactors `MutableBuffer::extend_from_slice` to remove the need to use `to_byte_slice` on every call, thereby removing its level of indirection, that does not allow the compiler to optimize out some code. This is the second performance improvement originally presented in #8796 and, together with #9027 , brings the performance of "MutableBuffer" to the same level as `Vec`, in particular to building buffers on the fly. Basically, when converting to a byte slice `&[u8]`, the compiler loses the type size information, and thus needs to perform extra checks and can't just optimize out the code. This PR adopts the same API as `Vec::extend_from_slice`, but since our buffers are in `u8` (i.e. a la `Vec`), I made the signature ``` pub fn extend_from_slice(&mut self, items: &[T]) pub fn push(&mut self, item: &T) ``` i.e. it consumes something that can be converted to a byte slice, but internally makes the conversion to bytes (as `to_byte_slice` was doing). Credits for the root cause analysis that lead to this PR go to @Dandandan, [originally fielded here](https://github.com/apache/arrow/pull/9016#discussion_r549110164). > [...] current conversion to a byte slice may add some overhead? - @Dandandan Benches (against master, so, both this PR and #9044 ): ``` Switched to branch 'perf_buffer' Your branch and 'origin/perf_buffer' have diverged, and have 6 and 1 different commits each, respectively. (use "git pull" to merge the remote branch into yours) Compiling arrow v3.0.0-SNAPSHOT (/Users/jorgecarleitao/projects/arrow/rust/arrow) Finished bench [optimized] target(s) in 1m 00s Running /Users/jorgecarleitao/projects/arrow/rust/target/release/deps/buffer_create-915da5f1abaf0471 Gnuplot not found, using plotters backend mutable time: [463.11 us 463.57 us 464.07 us] change: [-19.508% -18.571% -17.526%] (p = 0.00 < 0.05) Performance has improved. Found 10 outliers among 100 measurements (10.00%) 1 (1.00%) high mild 9 (9.00%) high severe mutable prepared time: [527.84 us 528.46 us 529.14 us] change: [-13.356% -12.522% -11.790%] (p = 0.00 < 0.05) Performance has improved. Found 12 outliers among 100 measurements (12.00%) 5 (5.00%) high mild 7 (7.00%) high severe Benchmarking from_slice: Warming up for 3.0000 s Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 6.1s, enable flat sampling, or reduce sample count to 60. from_slice time: [1.1968 ms 1.1979 ms 1.1991 ms] change: [-6.8697% -6.2029% -5.5812%] (p = 0.00 < 0.05) Performance has improved. Found 10 outliers among 100 measurements (10.00%) 3 (3.00%) high mild 7 (7.00%) high severe from_slice prepared time: [917.49 us 918.89 us 920.60 us] change: [-6.5111% -5.9102% -5.3038%] (p = 0.00 < 0.05) Performance has improved. Found 10 outliers among 100 measurements (10.00%) 4 (4.00%) high mild 6 (6.00%) high severe ``` Closes #9076 from jorgecarleitao/perf_buffer Authored-by: Jorge C. Leitao Signed-off-by: Jorge C. Leitao --- rust/arrow/benches/buffer_create.rs | 3 +- rust/arrow/src/array/array_binary.rs | 54 ++-- rust/arrow/src/array/array_boolean.rs | 4 +- rust/arrow/src/array/array_list.rs | 75 +---- rust/arrow/src/array/array_primitive.rs | 24 +- rust/arrow/src/array/array_string.rs | 14 +- rust/arrow/src/array/array_union.rs | 35 +-- rust/arrow/src/array/builder.rs | 169 ++++------ rust/arrow/src/array/data.rs | 4 +- rust/arrow/src/array/equal/utils.rs | 7 +- rust/arrow/src/array/raw_pointer.rs | 17 +- rust/arrow/src/array/transform/boolean.rs | 6 +- rust/arrow/src/array/transform/list.rs | 13 +- rust/arrow/src/array/transform/mod.rs | 26 +- rust/arrow/src/array/transform/primitive.rs | 9 +- rust/arrow/src/array/transform/utils.rs | 12 +- .../src/array/transform/variable_size.rs | 9 +- rust/arrow/src/buffer.rs | 288 +++++++++++------- rust/arrow/src/bytes.rs | 13 - rust/arrow/src/compute/kernels/aggregate.rs | 1 - rust/arrow/src/compute/kernels/arithmetic.rs | 8 +- rust/arrow/src/compute/kernels/boolean.rs | 4 +- rust/arrow/src/compute/kernels/cast.rs | 18 +- rust/arrow/src/compute/kernels/comparison.rs | 40 ++- rust/arrow/src/compute/kernels/filter.rs | 9 +- rust/arrow/src/compute/kernels/length.rs | 3 +- rust/arrow/src/compute/kernels/limit.rs | 11 +- rust/arrow/src/compute/kernels/sort.rs | 4 +- rust/arrow/src/compute/kernels/substring.rs | 6 +- rust/arrow/src/compute/kernels/take.rs | 25 +- rust/arrow/src/compute/util.rs | 13 +- rust/arrow/src/datatypes.rs | 2 + rust/arrow/src/json/reader.rs | 25 +- rust/arrow/src/lib.rs | 4 +- rust/arrow/src/memory.rs | 59 ++-- rust/arrow/src/util/integration_util.rs | 2 +- rust/parquet/src/arrow/array_reader.rs | 2 +- rust/parquet/src/arrow/record_reader.rs | 26 +- 38 files changed, 492 insertions(+), 552 deletions(-) diff --git a/rust/arrow/benches/buffer_create.rs b/rust/arrow/benches/buffer_create.rs index 59cf0eb5d65e6..c5b9a4e69a9b2 100644 --- a/rust/arrow/benches/buffer_create.rs +++ b/rust/arrow/benches/buffer_create.rs @@ -33,8 +33,7 @@ fn mutable_buffer(data: &[Vec], capacity: usize) -> Buffer { criterion::black_box({ let mut result = MutableBuffer::new(capacity); - data.iter() - .for_each(|vec| result.extend_from_slice(vec.to_byte_slice())); + data.iter().for_each(|vec| result.extend_from_slice(vec)); result.into() }) diff --git a/rust/arrow/src/array/array_binary.rs b/rust/arrow/src/array/array_binary.rs index 4c4d85d5bbd01..c50af60fefdcf 100644 --- a/rust/arrow/src/array/array_binary.rs +++ b/rust/arrow/src/array/array_binary.rs @@ -27,8 +27,8 @@ use super::{ array::print_long_array, raw_pointer::RawPtrBox, Array, ArrayData, ArrayDataRef, FixedSizeListArray, GenericBinaryIter, GenericListArray, OffsetSizeTrait, }; +use crate::buffer::Buffer; use crate::util::bit_util; -use crate::{buffer::Buffer, datatypes::ToByteSlice}; use crate::{buffer::MutableBuffer, datatypes::DataType}; /// Like OffsetSizeTrait, but specialized for Binary @@ -110,8 +110,8 @@ impl GenericBinaryArray { } let array_data = ArrayData::builder(OffsetSize::DATA_TYPE) .len(v.len()) - .add_buffer(Buffer::from(offsets.to_byte_slice())) - .add_buffer(Buffer::from(&values[..])) + .add_buffer(Buffer::from_slice_ref(&offsets)) + .add_buffer(Buffer::from_slice_ref(&values)) .build(); GenericBinaryArray::::from(array_data) } @@ -245,8 +245,8 @@ where let array_data = ArrayData::builder(OffsetSize::DATA_TYPE) .len(data_len) - .add_buffer(Buffer::from(offsets.to_byte_slice())) - .add_buffer(Buffer::from(&values[..])) + .add_buffer(Buffer::from_slice_ref(&offsets)) + .add_buffer(Buffer::from_slice_ref(&values)) .null_bit_buffer(null_buf.into()) .build(); Self::from(array_data) @@ -368,7 +368,7 @@ impl From>>> for FixedSizeBinaryArray { .all(|item| item.len() == size)); let num_bytes = bit_util::ceil(len, 8); - let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + let mut null_buf = MutableBuffer::from_len_zeroed(num_bytes); let null_slice = null_buf.as_slice_mut(); data.iter().enumerate().for_each(|(i, entry)| { @@ -641,8 +641,8 @@ mod tests { // Array data: ["hello", "", "parquet"] let array_data = ArrayData::builder(DataType::Binary) .len(3) - .add_buffer(Buffer::from(offsets.to_byte_slice())) - .add_buffer(Buffer::from(&values[..])) + .add_buffer(Buffer::from_slice_ref(&offsets)) + .add_buffer(Buffer::from_slice_ref(&values)) .build(); let binary_array = BinaryArray::from(array_data); assert_eq!(3, binary_array.len()); @@ -664,8 +664,8 @@ mod tests { let array_data = ArrayData::builder(DataType::Binary) .len(4) .offset(1) - .add_buffer(Buffer::from(offsets.to_byte_slice())) - .add_buffer(Buffer::from(&values[..])) + .add_buffer(Buffer::from_slice_ref(&offsets)) + .add_buffer(Buffer::from_slice_ref(&values)) .build(); let binary_array = BinaryArray::from(array_data); assert_eq!( @@ -688,8 +688,8 @@ mod tests { // Array data: ["hello", "", "parquet"] let array_data = ArrayData::builder(DataType::LargeBinary) .len(3) - .add_buffer(Buffer::from(offsets.to_byte_slice())) - .add_buffer(Buffer::from(&values[..])) + .add_buffer(Buffer::from_slice_ref(&offsets)) + .add_buffer(Buffer::from_slice_ref(&values)) .build(); let binary_array = LargeBinaryArray::from(array_data); assert_eq!(3, binary_array.len()); @@ -711,8 +711,8 @@ mod tests { let array_data = ArrayData::builder(DataType::LargeBinary) .len(4) .offset(1) - .add_buffer(Buffer::from(offsets.to_byte_slice())) - .add_buffer(Buffer::from(&values[..])) + .add_buffer(Buffer::from_slice_ref(&offsets)) + .add_buffer(Buffer::from_slice_ref(&values)) .build(); let binary_array = LargeBinaryArray::from(array_data); assert_eq!( @@ -739,14 +739,14 @@ mod tests { // Array data: ["hello", "", "parquet"] let array_data1 = ArrayData::builder(DataType::Binary) .len(3) - .add_buffer(Buffer::from(offsets.to_byte_slice())) - .add_buffer(Buffer::from(&values[..])) + .add_buffer(Buffer::from_slice_ref(&offsets)) + .add_buffer(Buffer::from_slice_ref(&values)) .build(); let binary_array1 = BinaryArray::from(array_data1); let array_data2 = ArrayData::builder(DataType::Binary) .len(3) - .add_buffer(Buffer::from(offsets.to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&offsets)) .add_child_data(values_data) .build(); let list_array = ListArray::from(array_data2); @@ -778,14 +778,14 @@ mod tests { // Array data: ["hello", "", "parquet"] let array_data1 = ArrayData::builder(DataType::LargeBinary) .len(3) - .add_buffer(Buffer::from(offsets.to_byte_slice())) - .add_buffer(Buffer::from(&values[..])) + .add_buffer(Buffer::from_slice_ref(&offsets)) + .add_buffer(Buffer::from_slice_ref(&values)) .build(); let binary_array1 = LargeBinaryArray::from(array_data1); let array_data2 = ArrayData::builder(DataType::Binary) .len(3) - .add_buffer(Buffer::from(offsets.to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&offsets)) .add_child_data(values_data) .build(); let list_array = LargeListArray::from(array_data2); @@ -838,13 +838,13 @@ mod tests { let values: [u32; 12] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]; let values_data = ArrayData::builder(DataType::UInt32) .len(12) - .add_buffer(Buffer::from(values[..].to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&values)) .build(); let offsets: [i32; 4] = [0, 5, 5, 12]; let array_data = ArrayData::builder(DataType::Utf8) .len(3) - .add_buffer(Buffer::from(offsets.to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&offsets)) .add_child_data(values_data) .build(); let list_array = ListArray::from(array_data); @@ -860,14 +860,14 @@ mod tests { let values: [u32; 12] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]; let values_data = ArrayData::builder(DataType::UInt32) .len(12) - .add_buffer(Buffer::from(values[..].to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&values)) .add_child_data(ArrayData::builder(DataType::Boolean).build()) .build(); let offsets: [i32; 4] = [0, 5, 5, 12]; let array_data = ArrayData::builder(DataType::Utf8) .len(3) - .add_buffer(Buffer::from(offsets.to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&offsets)) .add_child_data(values_data) .build(); let list_array = ListArray::from(array_data); @@ -934,7 +934,7 @@ mod tests { let values: [u32; 12] = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]; let values_data = ArrayData::builder(DataType::UInt32) .len(12) - .add_buffer(Buffer::from(values[..].to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&values)) .add_child_data(ArrayData::builder(DataType::Boolean).build()) .build(); @@ -957,8 +957,8 @@ mod tests { let offsets: [i32; 4] = [0, 5, 5, 12]; let array_data = ArrayData::builder(DataType::Binary) .len(3) - .add_buffer(Buffer::from(offsets.to_byte_slice())) - .add_buffer(Buffer::from(&values[..])) + .add_buffer(Buffer::from_slice_ref(&offsets)) + .add_buffer(Buffer::from_slice_ref(&values)) .build(); let binary_array = BinaryArray::from(array_data); binary_array.value(4); diff --git a/rust/arrow/src/array/array_boolean.rs b/rust/arrow/src/array/array_boolean.rs index 2f304838115f8..e4b83e697949a 100644 --- a/rust/arrow/src/array/array_boolean.rs +++ b/rust/arrow/src/array/array_boolean.rs @@ -163,8 +163,8 @@ impl>> FromIterator for BooleanArray { let data_len = data_len.expect("Iterator must be sized"); // panic if no upper bound. let num_bytes = bit_util::ceil(data_len, 8); - let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); - let mut val_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + let mut null_buf = MutableBuffer::from_len_zeroed(num_bytes); + let mut val_buf = MutableBuffer::from_len_zeroed(num_bytes); let data = val_buf.as_slice_mut(); diff --git a/rust/arrow/src/array/array_list.rs b/rust/arrow/src/array/array_list.rs index 75b8a4827e5dd..7a80794f150a4 100644 --- a/rust/arrow/src/array/array_list.rs +++ b/rust/arrow/src/array/array_list.rs @@ -298,11 +298,7 @@ impl fmt::Debug for FixedSizeListArray { #[cfg(test)] mod tests { use crate::{ - array::ArrayData, - array::Int32Array, - buffer::Buffer, - datatypes::{Field, ToByteSlice}, - memory, + array::ArrayData, array::Int32Array, buffer::Buffer, datatypes::Field, util::bit_util, }; @@ -313,12 +309,12 @@ mod tests { // Construct a value array let value_data = ArrayData::builder(DataType::Int32) .len(8) - .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7])) .build(); // Construct a buffer for value offsets, for the nested array: // [[0, 1, 2], [3, 4, 5], [6, 7]] - let value_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[0, 3, 6, 8]); // Construct a list array from the above two let list_data_type = @@ -383,12 +379,12 @@ mod tests { // Construct a value array let value_data = ArrayData::builder(DataType::Int32) .len(8) - .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7])) .build(); // Construct a buffer for value offsets, for the nested array: // [[0, 1, 2], [3, 4, 5], [6, 7]] - let value_offsets = Buffer::from(&[0i64, 3, 6, 8].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[0i64, 3, 6, 8]); // Construct a list array from the above two let list_data_type = @@ -453,7 +449,7 @@ mod tests { // Construct a value array let value_data = ArrayData::builder(DataType::Int32) .len(9) - .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7, 8].to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7, 8])) .build(); // Construct a list array from the above two @@ -522,7 +518,7 @@ mod tests { // Construct a value array let value_data = ArrayData::builder(DataType::Int32) .len(8) - .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7])) .build(); // Construct a list array from the above two @@ -542,15 +538,12 @@ mod tests { // Construct a value array let value_data = ArrayData::builder(DataType::Int32) .len(10) - .add_buffer(Buffer::from( - &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9].to_byte_slice(), - )) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9])) .build(); // Construct a buffer for value offsets, for the nested array: // [[0, 1], null, null, [2, 3], [4, 5], null, [6, 7, 8], null, [9]] - let value_offsets = - Buffer::from(&[0, 2, 2, 2, 4, 6, 6, 9, 9, 10].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[0, 2, 2, 2, 4, 6, 6, 9, 9, 10]); // 01011001 00000001 let mut null_bits: [u8; 2] = [0; 2]; bit_util::set_bit(&mut null_bits, 0); @@ -607,15 +600,12 @@ mod tests { // Construct a value array let value_data = ArrayData::builder(DataType::Int32) .len(10) - .add_buffer(Buffer::from( - &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9].to_byte_slice(), - )) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9])) .build(); // Construct a buffer for value offsets, for the nested array: // [[0, 1], null, null, [2, 3], [4, 5], null, [6, 7, 8], null, [9]] - let value_offsets = - Buffer::from(&[0i64, 2, 2, 2, 4, 6, 6, 9, 9, 10].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[0i64, 2, 2, 2, 4, 6, 6, 9, 9, 10]); // 01011001 00000001 let mut null_bits: [u8; 2] = [0; 2]; bit_util::set_bit(&mut null_bits, 0); @@ -674,9 +664,7 @@ mod tests { // Construct a value array let value_data = ArrayData::builder(DataType::Int32) .len(10) - .add_buffer(Buffer::from( - &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9].to_byte_slice(), - )) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9])) .build(); // Set null buts for the nested array: @@ -737,7 +725,7 @@ mod tests { fn test_list_array_invalid_buffer_len() { let value_data = ArrayData::builder(DataType::Int32) .len(8) - .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7])) .build(); let list_data_type = DataType::List(Box::new(Field::new("item", DataType::Int32, false))); @@ -753,7 +741,7 @@ mod tests { expected = "ListArray should contain a single child array (values array)" )] fn test_list_array_invalid_child_array_len() { - let value_offsets = Buffer::from(&[0, 2, 5, 7].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[0, 2, 5, 7]); let list_data_type = DataType::List(Box::new(Field::new("item", DataType::Int32, false))); let list_data = ArrayData::builder(list_data_type) @@ -768,10 +756,10 @@ mod tests { fn test_list_array_invalid_value_offset_start() { let value_data = ArrayData::builder(DataType::Int32) .len(8) - .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7])) .build(); - let value_offsets = Buffer::from(&[2, 2, 5, 7].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[2, 2, 5, 7]); let list_data_type = DataType::List(Box::new(Field::new("item", DataType::Int32, false))); @@ -782,35 +770,4 @@ mod tests { .build(); ListArray::from(list_data); } - - #[test] - #[should_panic(expected = "memory is not aligned")] - fn test_primitive_array_alignment() { - let ptr = memory::allocate_aligned(8); - let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) }; - let buf2 = buf.slice(1); - let array_data = ArrayData::builder(DataType::Int32).add_buffer(buf2).build(); - Int32Array::from(array_data); - } - - #[test] - #[should_panic(expected = "memory is not aligned")] - fn test_list_array_alignment() { - let ptr = memory::allocate_aligned(8); - let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) }; - let buf2 = buf.slice(1); - - let values: [i32; 8] = [0; 8]; - let value_data = ArrayData::builder(DataType::Int32) - .add_buffer(Buffer::from(values.to_byte_slice())) - .build(); - - let list_data_type = - DataType::List(Box::new(Field::new("item", DataType::Int32, false))); - let list_data = ArrayData::builder(list_data_type) - .add_buffer(buf2) - .add_child_data(value_data) - .build(); - ListArray::from(list_data); - } } diff --git a/rust/arrow/src/array/array_primitive.rs b/rust/arrow/src/array/array_primitive.rs index febb1656350f3..3dcb187495f9a 100644 --- a/rust/arrow/src/array/array_primitive.rs +++ b/rust/arrow/src/array/array_primitive.rs @@ -295,20 +295,21 @@ impl::Native let data_len = data_len.expect("Iterator must be sized"); // panic if no upper bound. let num_bytes = bit_util::ceil(data_len, 8); - let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + let mut null_buf = MutableBuffer::from_len_zeroed(num_bytes); let mut val_buf = MutableBuffer::new( data_len * mem::size_of::<::Native>(), ); - let null = vec![0; mem::size_of::<::Native>()]; - let null_slice = null_buf.as_slice_mut(); iter.enumerate().for_each(|(i, item)| { if let Some(a) = item.borrow() { bit_util::set_bit(null_slice, i); - val_buf.extend_from_slice(a.to_byte_slice()); + val_buf.push(*a); } else { - val_buf.extend_from_slice(&null); + // this ensures that null items on the buffer are not arbitrary. + // This is important because falible operations can use null values (e.g. a vectorized "add") + // which may panic (e.g. overflow if the number on the slots happen to be very large). + val_buf.push(T::Native::default()); } }); @@ -334,7 +335,7 @@ macro_rules! def_numeric_from_vec { fn from(data: Vec<<$ty as ArrowPrimitiveType>::Native>) -> Self { let array_data = ArrayData::builder($ty::DATA_TYPE) .len(data.len()) - .add_buffer(Buffer::from(data.to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&data)) .build(); PrimitiveArray::from(array_data) } @@ -383,7 +384,7 @@ impl PrimitiveArray { let array_data = ArrayData::builder(DataType::Timestamp(T::get_time_unit(), timezone)) .len(data.len()) - .add_buffer(Buffer::from(data.to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&data)) .build(); PrimitiveArray::from(array_data) } @@ -398,14 +399,13 @@ impl PrimitiveArray { let mut val_buf = MutableBuffer::new(data_len * mem::size_of::()); { - let null = vec![0; mem::size_of::()]; let null_slice = null_buf.as_slice_mut(); for (i, v) in data.iter().enumerate() { if let Some(n) = v { bit_util::set_bit(null_slice, i); - val_buf.extend_from_slice(&n.to_byte_slice()); + val_buf.push(*n); } else { - val_buf.extend_from_slice(&null); + val_buf.push(0i64); } } } @@ -448,7 +448,7 @@ mod tests { #[test] fn test_primitive_array_from_vec() { - let buf = Buffer::from(&[0, 1, 2, 3, 4].to_byte_slice()); + let buf = Buffer::from_slice_ref(&[0, 1, 2, 3, 4]); let arr = Int32Array::from(vec![0, 1, 2, 3, 4]); let slice = arr.values(); assert_eq!(buf, arr.data.buffers()[0]); @@ -804,7 +804,7 @@ mod tests { #[test] fn test_primitive_array_builder() { // Test building a primitive array with ArrayData builder and offset - let buf = Buffer::from(&[0, 1, 2, 3, 4].to_byte_slice()); + let buf = Buffer::from_slice_ref(&[0, 1, 2, 3, 4]); let buf2 = buf.clone(); let data = ArrayData::builder(DataType::Int32) .len(5) diff --git a/rust/arrow/src/array/array_string.rs b/rust/arrow/src/array/array_string.rs index 28a587d7374e5..f1523e35c3bb6 100644 --- a/rust/arrow/src/array/array_string.rs +++ b/rust/arrow/src/array/array_string.rs @@ -24,8 +24,8 @@ use super::{ array::print_long_array, raw_pointer::RawPtrBox, Array, ArrayData, ArrayDataRef, GenericListArray, GenericStringIter, OffsetSizeTrait, }; +use crate::buffer::Buffer; use crate::util::bit_util; -use crate::{buffer::Buffer, datatypes::ToByteSlice}; use crate::{buffer::MutableBuffer, datatypes::DataType}; /// Like OffsetSizeTrait, but specialized for Strings @@ -128,11 +128,11 @@ impl GenericStringArray { let mut values = MutableBuffer::new(0); let mut length_so_far = OffsetSize::zero(); - offsets.extend_from_slice(length_so_far.to_byte_slice()); + offsets.push(length_so_far); for s in &v { length_so_far += OffsetSize::from_usize(s.len()).unwrap(); - offsets.extend_from_slice(length_so_far.to_byte_slice()); + offsets.push(length_so_far); values.extend_from_slice(s.as_bytes()); } let array_data = ArrayData::builder(OffsetSize::DATA_TYPE) @@ -164,7 +164,7 @@ where let mut null_buf = MutableBuffer::new_null(data_len); let null_slice = null_buf.as_slice_mut(); let mut length_so_far = OffsetSize::zero(); - offsets.extend_from_slice(length_so_far.to_byte_slice()); + offsets.push(length_so_far); for (i, s) in iter.enumerate() { if let Some(s) = s { @@ -177,7 +177,7 @@ where } else { values.extend_from_slice(b""); } - offsets.extend_from_slice(length_so_far.to_byte_slice()); + offsets.push(length_so_far); } let array_data = ArrayData::builder(OffsetSize::DATA_TYPE) @@ -386,8 +386,8 @@ mod tests { let offsets: [i32; 4] = [0, 5, 5, 12]; let array_data = ArrayData::builder(DataType::Utf8) .len(3) - .add_buffer(Buffer::from(offsets.to_byte_slice())) - .add_buffer(Buffer::from(&values[..])) + .add_buffer(Buffer::from_slice_ref(&offsets)) + .add_buffer(Buffer::from_slice_ref(&values)) .build(); let string_array = StringArray::from(array_data); string_array.value(4); diff --git a/rust/arrow/src/array/array_union.rs b/rust/arrow/src/array/array_union.rs index 284508a27d56b..cc5598f8da298 100644 --- a/rust/arrow/src/array/array_union.rs +++ b/rust/arrow/src/array/array_union.rs @@ -344,7 +344,7 @@ mod tests { use crate::array::*; use crate::buffer::Buffer; - use crate::datatypes::{DataType, Field, ToByteSlice}; + use crate::datatypes::{DataType, Field}; #[test] fn test_dense_i32() { @@ -365,7 +365,7 @@ mod tests { // Check type ids assert_eq!( union.data().buffers()[0], - Buffer::from(&expected_type_ids.to_byte_slice()) + Buffer::from_slice_ref(&expected_type_ids) ); for (i, id) in expected_type_ids.iter().enumerate() { assert_eq!(id, &union.type_id(i)); @@ -374,7 +374,7 @@ mod tests { // Check offsets assert_eq!( union.data().buffers()[1], - Buffer::from(expected_value_offsets.to_byte_slice()) + Buffer::from_slice_ref(&expected_value_offsets) ); for (i, id) in expected_value_offsets.iter().enumerate() { assert_eq!(&union.value_offset(i), id); @@ -383,15 +383,15 @@ mod tests { // Check data assert_eq!( union.data().child_data()[0].buffers()[0], - Buffer::from([1_i32, 4, 6].to_byte_slice()) + Buffer::from_slice_ref(&[1_i32, 4, 6]) ); assert_eq!( union.data().child_data()[1].buffers()[0], - Buffer::from([2_i32, 7].to_byte_slice()) + Buffer::from_slice_ref(&[2_i32, 7]) ); assert_eq!( union.data().child_data()[2].buffers()[0], - Buffer::from([3_i32, 5].to_byte_slice()), + Buffer::from_slice_ref(&[3_i32, 5]), ); assert_eq!(expected_array_values.len(), union.len()); @@ -559,8 +559,8 @@ mod tests { let type_ids = [1_i8, 0, 0, 2, 0, 1]; let value_offsets = [0_i32, 0, 1, 0, 2, 1]; - let type_id_buffer = Buffer::from(&type_ids.to_byte_slice()); - let value_offsets_buffer = Buffer::from(value_offsets.to_byte_slice()); + let type_id_buffer = Buffer::from_slice_ref(&type_ids); + let value_offsets_buffer = Buffer::from_slice_ref(&value_offsets); let mut children: Vec<(Field, Arc)> = Vec::new(); children.push(( @@ -581,17 +581,14 @@ mod tests { .unwrap(); // Check type ids - assert_eq!( - Buffer::from(&type_ids.to_byte_slice()), - array.data().buffers()[0] - ); + assert_eq!(Buffer::from_slice_ref(&type_ids), array.data().buffers()[0]); for (i, id) in type_ids.iter().enumerate() { assert_eq!(id, &array.type_id(i)); } // Check offsets assert_eq!( - Buffer::from(value_offsets.to_byte_slice()), + Buffer::from_slice_ref(&value_offsets), array.data().buffers()[1] ); for (i, id) in value_offsets.iter().enumerate() { @@ -659,7 +656,7 @@ mod tests { // Check type ids assert_eq!( - Buffer::from(&expected_type_ids.to_byte_slice()), + Buffer::from_slice_ref(&expected_type_ids), union.data().buffers()[0] ); for (i, id) in expected_type_ids.iter().enumerate() { @@ -672,14 +669,14 @@ mod tests { // Check data assert_eq!( union.data().child_data()[0].buffers()[0], - Buffer::from([1_i32, 0, 0, 4, 0, 6, 0].to_byte_slice()), + Buffer::from_slice_ref(&[1_i32, 0, 0, 4, 0, 6, 0]), ); assert_eq!( - Buffer::from([0_i32, 2_i32, 0, 0, 0, 0, 7].to_byte_slice()), + Buffer::from_slice_ref(&[0_i32, 2_i32, 0, 0, 0, 0, 7]), union.data().child_data()[1].buffers()[0] ); assert_eq!( - Buffer::from([0_i32, 0, 3_i32, 0, 5, 0, 0].to_byte_slice()), + Buffer::from_slice_ref(&[0_i32, 0, 3_i32, 0, 5, 0, 0]), union.data().child_data()[2].buffers()[0] ); @@ -708,7 +705,7 @@ mod tests { // Check type ids assert_eq!( - Buffer::from(&expected_type_ids.to_byte_slice()), + Buffer::from_slice_ref(&expected_type_ids), union.data().buffers()[0] ); for (i, id) in expected_type_ids.iter().enumerate() { @@ -770,7 +767,7 @@ mod tests { // Check type ids assert_eq!( - Buffer::from(&expected_type_ids.to_byte_slice()), + Buffer::from_slice_ref(&expected_type_ids), union.data().buffers()[0] ); for (i, id) in expected_type_ids.iter().enumerate() { diff --git a/rust/arrow/src/array/builder.rs b/rust/arrow/src/array/builder.rs index 30cce75d00c87..d568617c2034d 100644 --- a/rust/arrow/src/array/builder.rs +++ b/rust/arrow/src/array/builder.rs @@ -180,7 +180,7 @@ impl BufferBuilder { #[inline] pub fn advance(&mut self, i: usize) { let new_buffer_len = (self.len + i) * mem::size_of::(); - self.buffer.resize(new_buffer_len); + self.buffer.resize(new_buffer_len, 0); self.len += i; } @@ -198,9 +198,7 @@ impl BufferBuilder { /// ``` #[inline] pub fn reserve(&mut self, n: usize) { - let new_capacity = self.len + n; - let byte_capacity = mem::size_of::() * new_capacity; - self.buffer.reserve(byte_capacity); + self.buffer.reserve(n * mem::size_of::()); } /// Appends a value of type `T` into the builder, @@ -219,12 +217,8 @@ impl BufferBuilder { #[inline] pub fn append(&mut self, v: T) { self.reserve(1); - self.write_bytes(v.to_byte_slice(), 1); - } - - fn write_bytes(&mut self, bytes: &[u8], len_added: usize) { - self.buffer.extend_from_slice(bytes); - self.len += len_added; + self.buffer.push(v); + self.len += 1; } /// Appends a value of type `T` into the builder N times, @@ -244,8 +238,9 @@ impl BufferBuilder { pub fn append_n(&mut self, n: usize, v: T) { self.reserve(n); for _ in 0..n { - self.write_bytes(v.to_byte_slice(), 1); + self.buffer.push(v); } + self.len += n; } /// Appends a slice of type `T`, growing the internal buffer as needed. @@ -262,10 +257,8 @@ impl BufferBuilder { /// ``` #[inline] pub fn append_slice(&mut self, slice: &[T]) { - let array_slots = slice.len(); - self.reserve(array_slots); - - self.write_bytes(slice.to_byte_slice(), array_slots); + self.buffer.extend_from_slice(slice); + self.len += slice.len(); } /// Resets this builder and returns an immutable [`Buffer`](crate::buffer::Buffer). @@ -300,10 +293,7 @@ impl BooleanBufferBuilder { #[inline] pub fn new(capacity: usize) -> Self { let byte_capacity = bit_util::ceil(capacity, 8); - let actual_capacity = bit_util::round_upto_multiple_of_64(byte_capacity); - let mut buffer = MutableBuffer::new(actual_capacity); - buffer.set_null_bits(0, actual_capacity); - + let buffer = MutableBuffer::from_len_zeroed(byte_capacity); Self { buffer, len: 0 } } @@ -320,80 +310,64 @@ impl BooleanBufferBuilder { } #[inline] - pub fn advance(&mut self, i: usize) { - let new_buffer_len = bit_util::ceil(self.len + i, 8); - self.buffer.resize(new_buffer_len); - self.len += i; + pub fn advance(&mut self, additional: usize) { + let new_len = self.len + additional; + let new_len_bytes = bit_util::ceil(new_len, 8); + if new_len_bytes > self.buffer.len() { + self.buffer.resize(new_len_bytes, 0); + } + self.len = new_len; } + /// Reserve space to at least `additional` new bits. + /// Capacity will be `>= self.len() + additional`. + /// New bytes are uninitialized and reading them is undefined behavior. #[inline] - pub fn reserve(&mut self, n: usize) { - let new_capacity = self.len + n; - if new_capacity > self.capacity() { - let new_byte_capacity = bit_util::ceil(new_capacity, 8); - let existing_capacity = self.buffer.capacity(); - let new_capacity = self.buffer.reserve(new_byte_capacity); - self.buffer - .set_null_bits(existing_capacity, new_capacity - existing_capacity); + pub fn reserve(&mut self, additional: usize) { + let capacity = self.len + additional; + if capacity > self.capacity() { + // convert differential to bytes + let additional = bit_util::ceil(capacity, 8) - self.buffer.len(); + self.buffer.reserve(additional); } } #[inline] pub fn append(&mut self, v: bool) { - self.reserve(1); + self.advance(1); if v { - let data = unsafe { - std::slice::from_raw_parts_mut( - self.buffer.as_mut_ptr(), - self.buffer.capacity(), - ) - }; - bit_util::set_bit(data, self.len); + unsafe { bit_util::set_bit_raw(self.buffer.as_mut_ptr(), self.len - 1) }; } - self.len += 1; } #[inline] - pub fn append_n(&mut self, n: usize, v: bool) { - self.reserve(n); - if n != 0 && v { - let data = unsafe { - std::slice::from_raw_parts_mut( - self.buffer.as_mut_ptr(), - self.buffer.capacity(), - ) - }; - (self.len..self.len + n).for_each(|i| bit_util::set_bit(data, i)) + pub fn append_n(&mut self, additional: usize, v: bool) { + self.advance(additional); + if additional > 0 && v { + let offset = self.len() - additional; + (0..additional).for_each(|i| unsafe { + bit_util::set_bit_raw(self.buffer.as_mut_ptr(), offset + i) + }) } - self.len += n; } #[inline] pub fn append_slice(&mut self, slice: &[bool]) { - let array_slots = slice.len(); - self.reserve(array_slots); + let additional = slice.len(); + self.advance(additional); - for v in slice { + let offset = self.len() - additional; + for (i, v) in slice.iter().enumerate() { if *v { - // For performance the `len` of the buffer is not - // updated on each append but is updated in the - // `into` method instead. - unsafe { - bit_util::set_bit_raw(self.buffer.as_mut_ptr(), self.len); - } + unsafe { bit_util::set_bit_raw(self.buffer.as_mut_ptr(), offset + i) } } - self.len += 1; } } #[inline] pub fn finish(&mut self) -> Buffer { - // `append` does not update the buffer's `len` so do it before `into` is called. - let new_buffer_len = bit_util::ceil(self.len, 8); - debug_assert!(new_buffer_len >= self.buffer.len()); - let mut buf = std::mem::replace(&mut self.buffer, MutableBuffer::new(0)); + let buf = std::mem::replace(&mut self.buffer, MutableBuffer::new(0)); self.len = 0; - buf.resize(new_buffer_len); buf.into() } } @@ -2174,17 +2148,6 @@ mod tests { assert_eq!(1, buffer.len()); } - #[test] - fn test_write_bytes_i32() { - let mut b = Int32BufferBuilder::new(4); - let bytes = [8, 16, 32, 64].to_byte_slice(); - b.write_bytes(bytes, 4); - assert_eq!(4, b.len()); - assert_eq!(16, b.capacity()); - let buffer = b.finish(); - assert_eq!(16, buffer.len()); - } - #[test] fn test_boolean_array_builder_append_slice() { let arr1 = @@ -2197,16 +2160,18 @@ mod tests { builder.append_value(false).unwrap(); let arr2 = builder.finish(); - assert_eq!(arr1.len(), arr2.len()); - assert_eq!(arr1.offset(), arr2.offset()); - assert_eq!(arr1.null_count(), arr2.null_count()); - for i in 0..5 { - assert_eq!(arr1.is_null(i), arr2.is_null(i)); - assert_eq!(arr1.is_valid(i), arr2.is_valid(i)); - if arr1.is_valid(i) { - assert_eq!(arr1.value(i), arr2.value(i)); - } - } + assert_eq!(arr1, arr2); + } + + #[test] + fn test_boolean_array_builder_append_slice_large() { + let arr1 = BooleanArray::from(vec![true; 513]); + + let mut builder = BooleanArray::builder(512); + builder.append_slice(&[true; 513]).unwrap(); + let arr2 = builder.finish(); + + assert_eq!(arr1, arr2); } #[test] @@ -2215,7 +2180,7 @@ mod tests { let buf = Buffer::from([72_u8, 2_u8]); let mut builder = BooleanBufferBuilder::new(8); - for i in 0..10 { + for i in 0..16 { if i == 3 || i == 6 || i == 9 { builder.append(true); } else { @@ -2409,12 +2374,9 @@ mod tests { let list_array = builder.finish(); let values = list_array.values().data().buffers()[0].clone(); + assert_eq!(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7]), values); assert_eq!( - Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()), - values - ); - assert_eq!( - Buffer::from(&[0, 3, 6, 8].to_byte_slice()), + Buffer::from_slice_ref(&[0, 3, 6, 8]), list_array.data().buffers()[0].clone() ); assert_eq!(DataType::Int32, list_array.value_type()); @@ -2448,12 +2410,9 @@ mod tests { let list_array = builder.finish(); let values = list_array.values().data().buffers()[0].clone(); + assert_eq!(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7]), values); assert_eq!( - Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()), - values - ); - assert_eq!( - Buffer::from(&[0i64, 3, 6, 8].to_byte_slice()), + Buffer::from_slice_ref(&[0i64, 3, 6, 8]), list_array.data().buffers()[0].clone() ); assert_eq!(DataType::Int32, list_array.value_type()); @@ -2640,21 +2599,21 @@ mod tests { assert_eq!(4, list_array.len()); assert_eq!(1, list_array.null_count()); assert_eq!( - Buffer::from(&[0, 2, 5, 5, 6].to_byte_slice()), + Buffer::from_slice_ref(&[0, 2, 5, 5, 6]), list_array.data().buffers()[0].clone() ); assert_eq!(6, list_array.values().data().len()); assert_eq!(1, list_array.values().data().null_count()); assert_eq!( - Buffer::from(&[0, 2, 4, 7, 7, 8, 10].to_byte_slice()), + Buffer::from_slice_ref(&[0, 2, 4, 7, 7, 8, 10]), list_array.values().data().buffers()[0].clone() ); assert_eq!(10, list_array.values().data().child_data()[0].len()); assert_eq!(0, list_array.values().data().child_data()[0].null_count()); assert_eq!( - Buffer::from(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10].to_byte_slice()), + Buffer::from_slice_ref(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), list_array.values().data().child_data()[0].buffers()[0].clone() ); } @@ -2858,14 +2817,14 @@ mod tests { let expected_string_data = ArrayData::builder(DataType::Utf8) .len(4) .null_bit_buffer(Buffer::from(&[9_u8])) - .add_buffer(Buffer::from(&[0, 3, 3, 3, 7].to_byte_slice())) - .add_buffer(Buffer::from(b"joemark")) + .add_buffer(Buffer::from_slice_ref(&[0, 3, 3, 3, 7])) + .add_buffer(Buffer::from_slice_ref(b"joemark")) .build(); let expected_int_data = ArrayData::builder(DataType::Int32) .len(4) - .null_bit_buffer(Buffer::from(&[11_u8])) - .add_buffer(Buffer::from(&[1, 2, 0, 4].to_byte_slice())) + .null_bit_buffer(Buffer::from_slice_ref(&[11_u8])) + .add_buffer(Buffer::from_slice_ref(&[1, 2, 0, 4])) .build(); assert_eq!(expected_string_data, arr.column(0).data()); diff --git a/rust/arrow/src/array/data.rs b/rust/arrow/src/array/data.rs index ecfda3d5d5b34..6541b8ff950ac 100644 --- a/rust/arrow/src/array/data.rs +++ b/rust/arrow/src/array/data.rs @@ -337,7 +337,7 @@ mod tests { use std::sync::Arc; - use crate::datatypes::ToByteSlice; + use crate::buffer::Buffer; use crate::util::bit_util; #[test] @@ -359,7 +359,7 @@ mod tests { Some(0), None, 0, - vec![Buffer::from([1i32, 2, 3, 4, 5].to_byte_slice())], + vec![Buffer::from_slice_ref(&[1i32, 2, 3, 4, 5])], vec![], )); let v = vec![0, 1, 2, 3]; diff --git a/rust/arrow/src/array/equal/utils.rs b/rust/arrow/src/array/equal/utils.rs index a880527578f99..d0108d236491e 100644 --- a/rust/arrow/src/array/equal/utils.rs +++ b/rust/arrow/src/array/equal/utils.rs @@ -120,8 +120,7 @@ pub(super) fn child_logical_null_buffer( let len = *len as usize; let array_offset = parent_data.offset(); let bitmap_len = bit_util::ceil(parent_len * len, 8); - let mut buffer = - MutableBuffer::new(bitmap_len).with_bitset(bitmap_len, false); + let mut buffer = MutableBuffer::from_len_zeroed(bitmap_len); let mut null_slice = buffer.as_slice_mut(); (array_offset..parent_len + array_offset).for_each(|index| { let start = index * len; @@ -168,9 +167,7 @@ pub(super) fn child_logical_null_buffer( DataType::Dictionary(_, _) => { unimplemented!("Logical equality not yet implemented for nested dictionaries") } - data_type => { - panic!("Data type {:?} is not a supported nested type", data_type) - } + data_type => panic!("Data type {:?} is not a supported nested type", data_type), } } diff --git a/rust/arrow/src/array/raw_pointer.rs b/rust/arrow/src/array/raw_pointer.rs index d18ba4b29a391..897dc5b591c38 100644 --- a/rust/arrow/src/array/raw_pointer.rs +++ b/rust/arrow/src/array/raw_pointer.rs @@ -36,10 +36,7 @@ impl RawPtrBox { /// * `ptr` is not aligned to a slice of type `T`. This is guaranteed if it was built from a slice of type `T`. pub(super) unsafe fn new(ptr: *const u8) -> Self { let ptr = NonNull::new(ptr as *mut u8).expect("Pointer cannot be null"); - assert!( - memory::is_aligned(ptr, std::mem::align_of::()), - "memory is not aligned" - ); + assert!(memory::is_ptr_aligned::(ptr), "memory is not aligned"); Self { ptr: ptr.cast() } } @@ -50,3 +47,15 @@ impl RawPtrBox { unsafe impl Send for RawPtrBox {} unsafe impl Sync for RawPtrBox {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + #[should_panic(expected = "memory is not aligned")] + fn test_primitive_array_alignment() { + let bytes = vec![0u8, 1u8]; + unsafe { RawPtrBox::::new(bytes.as_ptr().offset(1)) }; + } +} diff --git a/rust/arrow/src/array/transform/boolean.rs b/rust/arrow/src/array/transform/boolean.rs index 23be955b975cf..182914971732e 100644 --- a/rust/arrow/src/array/transform/boolean.rs +++ b/rust/arrow/src/array/transform/boolean.rs @@ -19,7 +19,7 @@ use crate::array::ArrayData; use super::{ Extend, _MutableArrayData, - utils::{reserve_for_bits, set_bits}, + utils::{resize_for_bits, set_bits}, }; pub(super) fn build_extend(array: &ArrayData) -> Extend { @@ -27,7 +27,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { Box::new( move |mutable: &mut _MutableArrayData, _, start: usize, len: usize| { let buffer = &mut mutable.buffer1; - reserve_for_bits(buffer, mutable.len + len); + resize_for_bits(buffer, mutable.len + len); set_bits( &mut buffer.as_slice_mut(), values, @@ -41,5 +41,5 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { pub(super) fn extend_nulls(mutable: &mut _MutableArrayData, len: usize) { let buffer = &mut mutable.buffer1; - reserve_for_bits(buffer, mutable.len + len); + resize_for_bits(buffer, mutable.len + len); } diff --git a/rust/arrow/src/array/transform/list.rs b/rust/arrow/src/array/transform/list.rs index 8053513178ea0..8eb2bd1778d39 100644 --- a/rust/arrow/src/array/transform/list.rs +++ b/rust/arrow/src/array/transform/list.rs @@ -15,10 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::{ - array::{ArrayData, OffsetSizeTrait}, - datatypes::ToByteSlice, -}; +use crate::array::{ArrayData, OffsetSizeTrait}; use super::{ Extend, _MutableArrayData, @@ -66,8 +63,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { let mut last_offset: T = unsafe { get_last_offset(offset_buffer) }; let delta_len = array.len() - array.null_count(); - offset_buffer - .reserve(offset_buffer.len() + delta_len * std::mem::size_of::()); + offset_buffer.reserve(delta_len * std::mem::size_of::()); let child = &mut mutable.child_data[0]; (start..start + len).for_each(|i| { @@ -83,7 +79,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { ); } // append offset - offset_buffer.extend_from_slice(last_offset.to_byte_slice()); + offset_buffer.push(last_offset); }) }, ) @@ -99,6 +95,5 @@ pub(super) fn extend_nulls( // this is safe due to how offset is built. See details on `get_last_offset` let last_offset: T = unsafe { get_last_offset(offset_buffer) }; - let offsets = vec![last_offset; len]; - offset_buffer.extend_from_slice(offsets.to_byte_slice()); + (0..len).for_each(|_| offset_buffer.push(last_offset)) } diff --git a/rust/arrow/src/array/transform/mod.rs b/rust/arrow/src/array/transform/mod.rs index 4e428f26697a5..4a5e829d6e7c5 100644 --- a/rust/arrow/src/array/transform/mod.rs +++ b/rust/arrow/src/array/transform/mod.rs @@ -95,7 +95,7 @@ fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits if let Some(bitmap) = array.null_bitmap() { let bytes = bitmap.bits.as_slice(); Box::new(move |mutable, start, len| { - utils::reserve_for_bits(&mut mutable.null_buffer, mutable.len + len); + utils::resize_for_bits(&mut mutable.null_buffer, mutable.len + len); mutable.null_count += utils::set_bits( mutable.null_buffer.as_slice_mut(), bytes, @@ -106,7 +106,7 @@ fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits }) } else if use_nulls { Box::new(|mutable, _, len| { - utils::reserve_for_bits(&mut mutable.null_buffer, mutable.len + len); + utils::resize_for_bits(&mut mutable.null_buffer, mutable.len + len); let write_data = mutable.null_buffer.as_slice_mut(); let offset = mutable.len; (0..len).for_each(|i| { @@ -298,7 +298,7 @@ impl<'a> MutableArrayData<'a> { DataType::Null => [empty_buffer, MutableBuffer::new(0)], DataType::Boolean => { let bytes = bit_util::ceil(capacity, 8); - let buffer = MutableBuffer::new(bytes).with_bitset(bytes, false); + let buffer = MutableBuffer::from_len_zeroed(bytes); [buffer, empty_buffer] } DataType::UInt8 => { @@ -361,25 +361,25 @@ impl<'a> MutableArrayData<'a> { DataType::Utf8 | DataType::Binary => { let mut buffer = MutableBuffer::new((1 + capacity) * size_of::()); // safety: `unsafe` code assumes that this buffer is initialized with one element - buffer.extend_from_slice(&[0i32].to_byte_slice()); + buffer.push(0i32); [buffer, MutableBuffer::new(capacity * size_of::())] } DataType::LargeUtf8 | DataType::LargeBinary => { let mut buffer = MutableBuffer::new((1 + capacity) * size_of::()); // safety: `unsafe` code assumes that this buffer is initialized with one element - buffer.extend_from_slice(&[0i64].to_byte_slice()); + buffer.push(0i64); [buffer, MutableBuffer::new(capacity * size_of::())] } DataType::List(_) => { // offset buffer always starts with a zero let mut buffer = MutableBuffer::new((1 + capacity) * size_of::()); - buffer.extend_from_slice(0i32.to_byte_slice()); + buffer.push(0i32); [buffer, empty_buffer] } DataType::LargeList(_) => { // offset buffer always starts with a zero let mut buffer = MutableBuffer::new((1 + capacity) * size_of::()); - buffer.extend_from_slice(&[0i64].to_byte_slice()); + buffer.push(0i64); [buffer, empty_buffer] } DataType::FixedSizeBinary(size) => { @@ -487,7 +487,7 @@ impl<'a> MutableArrayData<'a> { .collect(); let null_bytes = bit_util::ceil(capacity, 8); - let null_buffer = MutableBuffer::new(null_bytes).with_bitset(null_bytes, false); + let null_buffer = MutableBuffer::from_len_zeroed(null_bytes); let extend_values = arrays.iter().map(|array| build_extend(array)).collect(); @@ -552,7 +552,6 @@ mod tests { }; use crate::{ array::{ListArray, StringBuilder}, - datatypes::ToByteSlice, error::Result, }; @@ -1002,7 +1001,7 @@ mod tests { Some(15), ]); let list_value_offsets = - Buffer::from(&[0i32, 3, 5, 11, 13, 13, 15, 15, 17].to_byte_slice()); + Buffer::from_slice_ref(&[0i32, 3, 5, 11, 13, 13, 15, 15, 17]); let expected_list_data = ArrayData::new( DataType::List(Box::new(Field::new("item", DataType::Int64, true))), 8, @@ -1083,9 +1082,8 @@ mod tests { Some(14), Some(15), ]); - let list_value_offsets = Buffer::from( - &[0, 3, 5, 5, 13, 15, 15, 15, 19, 19, 19, 19, 23].to_byte_slice(), - ); + let list_value_offsets = + Buffer::from_slice_ref(&[0, 3, 5, 5, 13, 15, 15, 15, 19, 19, 19, 19, 23]); let expected_list_data = ArrayData::new( DataType::List(Box::new(Field::new("item", DataType::Int64, true))), 12, @@ -1156,7 +1154,7 @@ mod tests { None, // extend b[0..0] ]); - let list_value_offsets = Buffer::from(&[0, 3, 5, 6, 9, 10, 13].to_byte_slice()); + let list_value_offsets = Buffer::from_slice_ref(&[0, 3, 5, 6, 9, 10, 13]); let expected_list_data = ArrayData::new( DataType::List(Box::new(Field::new("item", DataType::Utf8, true))), 6, diff --git a/rust/arrow/src/array/transform/primitive.rs b/rust/arrow/src/array/transform/primitive.rs index 86c76941af332..77038432459c1 100644 --- a/rust/arrow/src/array/transform/primitive.rs +++ b/rust/arrow/src/array/transform/primitive.rs @@ -22,13 +22,12 @@ use crate::{array::ArrayData, datatypes::ArrowNativeType}; use super::{Extend, _MutableArrayData}; pub(super) fn build_extend(array: &ArrayData) -> Extend { - let values = &array.buffers()[0].as_slice()[array.offset() * size_of::()..]; + let values = array.buffer::(0); Box::new( move |mutable: &mut _MutableArrayData, _, start: usize, len: usize| { - let start = start * size_of::(); - let len = len * size_of::(); - let bytes = &values[start..start + len]; - mutable.buffer1.extend_from_slice(bytes); + mutable + .buffer1 + .extend_from_slice(&values[start..start + len]); }, ) } diff --git a/rust/arrow/src/array/transform/utils.rs b/rust/arrow/src/array/transform/utils.rs index 617db275193d7..8c718c70c1767 100644 --- a/rust/arrow/src/array/transform/utils.rs +++ b/rust/arrow/src/array/transform/utils.rs @@ -15,16 +15,14 @@ // specific language governing permissions and limitations // under the License. -use crate::{ - array::OffsetSizeTrait, buffer::MutableBuffer, datatypes::ToByteSlice, util::bit_util, -}; +use crate::{array::OffsetSizeTrait, buffer::MutableBuffer, util::bit_util}; /// extends the `buffer` to be able to hold `len` bits, setting all bits of the new size to zero. #[inline] -pub(super) fn reserve_for_bits(buffer: &mut MutableBuffer, len: usize) { +pub(super) fn resize_for_bits(buffer: &mut MutableBuffer, len: usize) { let needed_bytes = bit_util::ceil(len, 8); if buffer.len() < needed_bytes { - buffer.extend(needed_bytes - buffer.len()); + buffer.resize(needed_bytes, 0); } } @@ -53,12 +51,12 @@ pub(super) fn extend_offsets( mut last_offset: T, offsets: &[T], ) { - buffer.reserve(buffer.len() + offsets.len() * std::mem::size_of::()); + buffer.reserve(offsets.len() * std::mem::size_of::()); offsets.windows(2).for_each(|offsets| { // compute the new offset let length = offsets[1] - offsets[0]; last_offset += length; - buffer.extend_from_slice(last_offset.to_byte_slice()); + buffer.push(last_offset); }); } diff --git a/rust/arrow/src/array/transform/variable_size.rs b/rust/arrow/src/array/transform/variable_size.rs index e9143117e35a1..3ede26d710706 100644 --- a/rust/arrow/src/array/transform/variable_size.rs +++ b/rust/arrow/src/array/transform/variable_size.rs @@ -18,7 +18,6 @@ use crate::{ array::{ArrayData, OffsetSizeTrait}, buffer::MutableBuffer, - datatypes::ToByteSlice, }; use super::{ @@ -72,8 +71,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { let mut last_offset: T = unsafe { get_last_offset(offset_buffer) }; // nulls present: append item by item, ignoring null entries - offset_buffer - .reserve(offset_buffer.len() + len * std::mem::size_of::()); + offset_buffer.reserve(len * std::mem::size_of::()); (start..start + len).for_each(|i| { if array.is_valid(i) { @@ -89,7 +87,7 @@ pub(super) fn build_extend(array: &ArrayData) -> Extend { values_buffer.extend_from_slice(bytes); } // offsets are always present - offset_buffer.extend_from_slice(last_offset.to_byte_slice()); + offset_buffer.push(last_offset); }) }, ) @@ -105,6 +103,5 @@ pub(super) fn extend_nulls( // this is safe due to how offset is built. See details on `get_last_offset` let last_offset: T = unsafe { get_last_offset(offset_buffer) }; - let offsets = vec![last_offset; len]; - offset_buffer.extend_from_slice(offsets.to_byte_slice()); + (0..len).for_each(|_| offset_buffer.push(last_offset)) } diff --git a/rust/arrow/src/buffer.rs b/rust/arrow/src/buffer.rs index 0613433ee395b..cf300fe3fd3d3 100644 --- a/rust/arrow/src/buffer.rs +++ b/rust/arrow/src/buffer.rs @@ -15,23 +15,23 @@ // specific language governing permissions and limitations // under the License. -//! The main type in the module is `Buffer`, a contiguous immutable memory region of -//! fixed size aligned at a 64-byte boundary. `MutableBuffer` is like `Buffer`, but it can -//! be mutated and grown. +//! This module contains two main structs: [Buffer] and [MutableBuffer]. A buffer represents +//! a contiguous memory region that can be shared via `offsets`. + #[cfg(feature = "simd")] use packed_simd::u8x64; use crate::{ bytes::{Bytes, Deallocation}, + datatypes::ToByteSlice, ffi, }; use std::convert::AsRef; use std::fmt::Debug; -use std::mem; use std::ops::{BitAnd, BitOr, Not}; +use std::ptr::NonNull; use std::sync::Arc; -use std::{cmp, ptr::NonNull}; #[cfg(feature = "avx512")] use crate::arch::avx512::*; @@ -44,11 +44,11 @@ use crate::util::bit_util::ceil; #[cfg(any(feature = "simd", feature = "avx512"))] use std::borrow::BorrowMut; -/// Buffer is a contiguous memory region of fixed size and is aligned at a 64-byte -/// boundary. Buffer is immutable. +/// Buffer represents a contiguous memory region that can be shared with other buffers and across +/// thread boundaries. #[derive(Clone, PartialEq, Debug)] pub struct Buffer { - /// Reference-counted pointer to the internal byte buffer. + /// the internal byte buffer. data: Arc, /// The offset into the buffer. @@ -56,6 +56,23 @@ pub struct Buffer { } impl Buffer { + /// Initializes a [Buffer] from a slice of items. + pub fn from_slice_ref>(items: &T) -> Self { + // allocate aligned memory buffer + let slice = items.as_ref(); + let len = slice.len() * std::mem::size_of::(); + let capacity = bit_util::round_upto_multiple_of_64(len); + let buffer = memory::allocate_aligned(capacity); + unsafe { + memory::memcpy( + buffer, + NonNull::new_unchecked(slice.as_ptr() as *mut u8), + len, + ); + Buffer::build_with_arguments(buffer, len, Deallocation::Native(capacity)) + } + } + /// Creates a buffer from an existing memory region (must already be byte-aligned), this /// `Buffer` will free this piece of memory when dropped. /// @@ -162,19 +179,17 @@ impl Buffer { /// /// Also `typed_data::` is unsafe as `0x00` and `0x01` are the only valid values for /// `bool` in Rust. However, `bool` arrays in Arrow are bit-packed which breaks this condition. + /// View buffer as typed slice. pub unsafe fn typed_data(&self) -> &[T] { - assert_eq!(self.len() % mem::size_of::(), 0); - assert!(memory::is_ptr_aligned::(self.data.ptr().cast())); // JUSTIFICATION // Benefit // Many of the buffers represent specific types, and consumers of `Buffer` often need to re-interpret them. // Soundness // * The pointer is non-null by construction - // * alignment asserted above - std::slice::from_raw_parts( - self.as_ptr() as *const T, - self.len() / mem::size_of::(), - ) + // * alignment asserted below. + let (prefix, offsets, suffix) = self.as_slice().align_to::(); + assert!(prefix.is_empty() && suffix.is_empty()); + offsets } /// Returns a slice of this buffer starting at a certain bit offset. @@ -219,25 +234,10 @@ impl> From for Buffer { fn from(p: T) -> Self { // allocate aligned memory buffer let slice = p.as_ref(); - let len = slice.len() * mem::size_of::(); - let capacity = bit_util::round_upto_multiple_of_64(len); - let buffer = memory::allocate_aligned(capacity); - // JUSTIFICATION - // Benefit - // It is often useful to create a buffer from bytes, typically when they are allocated by external sources - // Soundness - // * The pointers are non-null by construction - // * alignment asserted above - // Unsoundness - // * There is no guarantee that the memory regions do are non-overalling, but `memcpy` requires this. - unsafe { - memory::memcpy( - buffer, - NonNull::new_unchecked(slice.as_ptr() as *mut u8), - len, - ); - Buffer::build_with_arguments(buffer, len, Deallocation::Native(capacity)) - } + let len = slice.len(); + let mut buffer = MutableBuffer::new(len); + buffer.extend_from_slice(slice); + buffer.into() } } @@ -702,18 +702,31 @@ impl From for Buffer { } } -/// Similar to `Buffer`, but is growable and can be mutated. A mutable buffer can be -/// converted into a immutable buffer via the `into` method. +/// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of items or slices of items. +/// [`Buffer`]s created from [`MutableBuffer`] (via `into`) are guaranteed to have its pointer aligned +/// along cache lines and in multiple of 64 bytes. +/// Use [MutableBuffer::push] to insert an item, [MutableBuffer::extend_from_slice] +/// to insert many items, and `into` to convert it to [`Buffer`]. +/// # Example +/// ``` +/// # use arrow::buffer::{Buffer, MutableBuffer}; +/// let mut buffer = MutableBuffer::new(0); +/// buffer.push(256u32); +/// buffer.extend_from_slice(&[1u32]); +/// let buffer: Buffer = buffer.into(); +/// assert_eq!(buffer.as_slice(), &[0u8, 1, 0, 0, 1, 0, 0, 0]) +/// ``` #[derive(Debug)] pub struct MutableBuffer { // dangling iff capacity = 0 data: NonNull, + // invariant: len <= capacity len: usize, capacity: usize, } impl MutableBuffer { - /// Allocate a new mutable buffer with initial capacity to be `capacity`. + /// Allocate a new [MutableBuffer] with initial capacity to be at least `capacity`. pub fn new(capacity: usize) -> Self { let new_capacity = bit_util::round_upto_multiple_of_64(capacity); let ptr = memory::allocate_aligned(new_capacity); @@ -724,10 +737,32 @@ impl MutableBuffer { } } - /// creates a new [MutableBuffer] where every bit is initialized to `0` + /// Allocates a new [MutableBuffer] with `len` and capacity to be at least `len` where + /// all bytes are guaranteed to be `0u8`. + /// # Example + /// ``` + /// # use arrow::buffer::{Buffer, MutableBuffer}; + /// let mut buffer = MutableBuffer::from_len_zeroed(127); + /// assert_eq!(buffer.len(), 127); + /// assert!(buffer.capacity() >= 127); + /// let data = buffer.as_slice_mut(); + /// assert_eq!(data[126], 0u8); + /// ``` + pub fn from_len_zeroed(len: usize) -> Self { + let new_capacity = bit_util::round_upto_multiple_of_64(len); + let ptr = memory::allocate_aligned_zeroed(new_capacity); + Self { + data: ptr, + len, + capacity: new_capacity, + } + } + + /// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits. + /// This is useful to create a buffer for packed bitmaps. pub fn new_null(len: usize) -> Self { let num_bytes = bit_util::ceil(len, 8); - MutableBuffer::new(num_bytes).with_bitset(num_bytes, false) + MutableBuffer::from_len_zeroed(num_bytes) } /// Set the bits in the range of `[0, end)` to 0 (if `val` is false), or 1 (if `val` @@ -758,39 +793,51 @@ impl MutableBuffer { } } - /// Ensures that this buffer has at least `capacity` slots in this buffer. This will - /// also ensure the new capacity will be a multiple of 64 bytes. - /// - /// Returns the new capacity for this buffer. - pub fn reserve(&mut self, capacity: usize) -> usize { - if capacity > self.capacity { - let new_capacity = bit_util::round_upto_multiple_of_64(capacity); - let new_capacity = cmp::max(new_capacity, self.capacity * 2); + /// Ensures that this buffer has at least `self.len + additional` bytes. This re-allocates iff + /// `self.len + additional > capacity`. + /// # Example + /// ``` + /// # use arrow::buffer::{Buffer, MutableBuffer}; + /// let mut buffer = MutableBuffer::new(0); + /// buffer.reserve(253); // allocates for the first time + /// (0..253u8).for_each(|i| buffer.push(i)); // no reallocation + /// let buffer: Buffer = buffer.into(); + /// assert_eq!(buffer.len(), 253); + /// ``` + // For performance reasons, this must be inlined so that the `if` is executed inside the caller, and not as an extra call that just + // exits. + #[inline(always)] + pub fn reserve(&mut self, additional: usize) { + let required_cap = self.len + additional; + if required_cap > self.capacity { + let new_capacity = bit_util::round_upto_multiple_of_64(required_cap); + let new_capacity = std::cmp::max(new_capacity, self.capacity * 2); self.data = unsafe { memory::reallocate(self.data, self.capacity, new_capacity) }; self.capacity = new_capacity; } - self.capacity } - /// Resizes the buffer so that the `len` will equal to the `new_len`. - /// - /// If `new_len` is greater than `len`, the buffer's length is simply adjusted to be - /// the former, optionally extending the capacity. The data between `len` and - /// `new_len` will be zeroed out. - /// - /// If `new_len` is less than `len`, the buffer will be truncated. - pub fn resize(&mut self, new_len: usize) { + /// Resizes the buffer, either truncating its contents (with no change in capacity), or + /// growing it (potentially reallocating it) and writing `value` in the newly available bytes. + /// # Example + /// ``` + /// # use arrow::buffer::{Buffer, MutableBuffer}; + /// let mut buffer = MutableBuffer::new(0); + /// buffer.resize(253, 2); // allocates for the first time + /// assert_eq!(buffer.as_slice()[252], 2u8); + /// ``` + // For performance reasons, this must be inlined so that the `if` is executed inside the caller, and not as an extra call that just + // exits. + #[inline(always)] + pub fn resize(&mut self, new_len: usize, value: u8) { if new_len > self.len { - self.reserve(new_len); - } else { - let new_capacity = bit_util::round_upto_multiple_of_64(new_len); - if new_capacity < self.capacity { - self.data = - unsafe { memory::reallocate(self.data, self.capacity, new_capacity) }; - self.capacity = new_capacity; - } + let diff = new_len - self.len; + self.reserve(diff); + // write the value + unsafe { self.data.as_ptr().add(self.len).write_bytes(value, diff) }; } + // this truncates the buffer when new_len < self.len self.len = new_len; } @@ -801,12 +848,14 @@ impl MutableBuffer { } /// Returns the length (the number of bytes written) in this buffer. + /// The invariant `buffer.len() <= buffer.capacity()` is always upheld. #[inline] pub const fn len(&self) -> usize { self.len } /// Returns the total capacity in this buffer. + /// The invariant `buffer.len() <= buffer.capacity()` is always upheld. #[inline] pub const fn capacity(&self) -> usize { self.capacity @@ -827,15 +876,15 @@ impl MutableBuffer { self } - /// Returns a raw pointer for this buffer. - /// - /// Note that this should be used cautiously, and the returned pointer should not be - /// stored anywhere, to avoid dangling pointers. + /// Returns a raw pointer to this buffer's internal memory + /// This pointer is guaranteed to be aligned along cache-lines. #[inline] pub const fn as_ptr(&self) -> *const u8 { self.data.as_ptr() } + /// Returns a mutable raw pointer to this buffer's internal memory + /// This pointer is guaranteed to be aligned along cache-lines. #[inline] pub fn as_mut_ptr(&mut self) -> *mut u8 { self.data.as_ptr() @@ -861,46 +910,61 @@ impl MutableBuffer { } } - /// View buffer as typed slice. + /// View this buffer asa slice of a specific type. + /// # Safety + /// This function must only be used when this buffer was extended with items of type `T`. + /// Failure to do so results in undefined behavior. pub fn typed_data_mut(&mut self) -> &mut [T] { - assert_eq!(self.len() % mem::size_of::(), 0); - assert!(memory::is_ptr_aligned::(self.data.cast())); - // JUSTIFICATION - // Benefit - // Many of the buffers represent specific types, and consumers of `Buffer` often need to re-interpret them. - // Soundness - // * The pointer is non-null by construction - // * alignment asserted above unsafe { - std::slice::from_raw_parts_mut( - self.as_ptr() as *mut T, - self.len() / mem::size_of::(), - ) + let (prefix, offsets, suffix) = self.as_slice_mut().align_to_mut::(); + assert!(prefix.is_empty() && suffix.is_empty()); + offsets } } - /// Extends the buffer from a byte slice, incrementing its capacity if needed. - #[inline] - pub fn extend_from_slice(&mut self, bytes: &[u8]) { - let new_len = self.len + bytes.len(); - if new_len > self.capacity { - self.reserve(new_len); - } + /// Extends this buffer from a slice of items that can be represented in bytes, increasing its capacity if needed. + /// # Example + /// ``` + /// # use arrow::buffer::MutableBuffer; + /// let mut buffer = MutableBuffer::new(0); + /// buffer.extend_from_slice(&[2u32, 0]); + /// assert_eq!(buffer.len(), 8) // u32 has 4 bytes + /// ``` + pub fn extend_from_slice(&mut self, items: &[T]) { + let len = items.len(); + let additional = len * std::mem::size_of::(); + self.reserve(additional); unsafe { - let dst = NonNull::new_unchecked(self.data.as_ptr().add(self.len)); - let src = NonNull::new_unchecked(bytes.as_ptr() as *mut u8); - memory::memcpy(dst, src, bytes.len()); + let dst = self.data.as_ptr().add(self.len); + let src = items.as_ptr() as *const u8; + std::ptr::copy_nonoverlapping(src, dst, additional) } - self.len = new_len; + self.len += additional; } - /// Extends the buffer by `len` with all bytes equal to `0u8`, incrementing its capacity if needed. - pub fn extend(&mut self, len: usize) { - let remaining_capacity = self.capacity - self.len; - if len > remaining_capacity { - self.reserve(self.len + len); + /// Extends the buffer with a new item, increasing its capacity if needed. + /// # Example + /// ``` + /// # use arrow::buffer::MutableBuffer; + /// let mut buffer = MutableBuffer::new(0); + /// buffer.push(256u32); + /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes + /// ``` + #[inline] + pub fn push(&mut self, item: T) { + let additional = std::mem::size_of::(); + self.reserve(additional); + unsafe { + let dst = self.data.as_ptr().add(self.len) as *mut T; + std::ptr::write(dst, item); } - self.len += len; + self.len += additional; + } + + /// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed. + #[inline] + pub fn extend(&mut self, additional: usize) { + self.resize(self.len + additional, 0); } } @@ -944,7 +1008,6 @@ mod tests { use std::thread; use super::*; - use crate::datatypes::ToByteSlice; #[test] fn test_buffer_data_equality() { @@ -960,7 +1023,7 @@ mod tests { // Different capacities should still preserve equality let mut buf2 = MutableBuffer::new(65); - buf2.extend_from_slice(&[0, 1, 2, 3, 4]); + buf2.extend_from_slice(&[0u8, 1, 2, 3, 4]); let buf2 = buf2.into(); assert_eq!(buf1, buf2); @@ -1115,13 +1178,14 @@ mod tests { assert_eq!(64, buf.capacity()); // Reserving a smaller capacity should have no effect. - let mut new_cap = buf.reserve(10); - assert_eq!(64, new_cap); + buf.reserve(10); assert_eq!(64, buf.capacity()); - new_cap = buf.reserve(100); - assert_eq!(128, new_cap); + buf.reserve(80); assert_eq!(128, buf.capacity()); + + buf.reserve(129); + assert_eq!(256, buf.capacity()); } #[test] @@ -1130,24 +1194,24 @@ mod tests { assert_eq!(64, buf.capacity()); assert_eq!(0, buf.len()); - buf.resize(20); + buf.resize(20, 0); assert_eq!(64, buf.capacity()); assert_eq!(20, buf.len()); - buf.resize(10); + buf.resize(10, 0); assert_eq!(64, buf.capacity()); assert_eq!(10, buf.len()); - buf.resize(100); + buf.resize(100, 0); assert_eq!(128, buf.capacity()); assert_eq!(100, buf.len()); - buf.resize(30); - assert_eq!(64, buf.capacity()); + buf.resize(30, 0); + assert_eq!(128, buf.capacity()); assert_eq!(30, buf.len()); - buf.resize(0); - assert_eq!(0, buf.capacity()); + buf.resize(0, 0); + assert_eq!(128, buf.capacity()); assert_eq!(0, buf.len()); } @@ -1201,7 +1265,7 @@ mod tests { macro_rules! check_as_typed_data { ($input: expr, $native_t: ty) => {{ - let buffer = Buffer::from($input.to_byte_slice()); + let buffer = Buffer::from_slice_ref($input); let slice: &[$native_t] = unsafe { buffer.typed_data::<$native_t>() }; assert_eq!($input, slice); }}; diff --git a/rust/arrow/src/bytes.rs b/rust/arrow/src/bytes.rs index 331011687da01..61d52ef572ff8 100644 --- a/rust/arrow/src/bytes.rs +++ b/rust/arrow/src/bytes.rs @@ -156,16 +156,3 @@ impl Debug for Bytes { write!(f, " }}") } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_dealloc_native() { - let capacity = 5; - let a = memory::allocate_aligned(capacity); - // create Bytes and release it. This will make `a` be an invalid pointer, but it is defined behavior - unsafe { Bytes::new(a, 3, Deallocation::Native(capacity)) }; - } -} diff --git a/rust/arrow/src/compute/kernels/aggregate.rs b/rust/arrow/src/compute/kernels/aggregate.rs index 416753a3cf583..d0e3f22f54176 100644 --- a/rust/arrow/src/compute/kernels/aggregate.rs +++ b/rust/arrow/src/compute/kernels/aggregate.rs @@ -810,7 +810,6 @@ mod tests { fn test_primitive_min_max_float_all_nans_non_null() { let a: Float64Array = (0..100).map(|_| Some(f64::NAN)).collect(); assert!(max(&a).unwrap().is_nan()); - dbg!(min(&a)); assert!(min(&a).unwrap().is_nan()); } diff --git a/rust/arrow/src/compute/kernels/arithmetic.rs b/rust/arrow/src/compute/kernels/arithmetic.rs index 6e8fea54bff59..98c0660ba2b7c 100644 --- a/rust/arrow/src/compute/kernels/arithmetic.rs +++ b/rust/arrow/src/compute/kernels/arithmetic.rs @@ -32,7 +32,7 @@ use crate::buffer::Buffer; use crate::buffer::MutableBuffer; use crate::compute::util::combine_option_bitmap; use crate::datatypes; -use crate::datatypes::{ArrowNumericType, ToByteSlice}; +use crate::datatypes::ArrowNumericType; use crate::error::{ArrowError, Result}; use crate::{array::*, util::bit_util}; #[cfg(simd)] @@ -63,7 +63,7 @@ where None, array.data_ref().null_buffer().cloned(), 0, - vec![Buffer::from(values.to_byte_slice())], + vec![Buffer::from_slice_ref(&values)], vec![], ); Ok(PrimitiveArray::::from(Arc::new(data))) @@ -156,7 +156,7 @@ where None, null_bit_buffer, 0, - vec![Buffer::from(values.to_byte_slice())], + vec![Buffer::from_slice_ref(&values)], vec![], ); Ok(PrimitiveArray::::from(Arc::new(data))) @@ -220,7 +220,7 @@ where None, null_bit_buffer, 0, - vec![Buffer::from(values.to_byte_slice())], + vec![Buffer::from_slice_ref(&values)], vec![], ); Ok(PrimitiveArray::::from(Arc::new(data))) diff --git a/rust/arrow/src/compute/kernels/boolean.rs b/rust/arrow/src/compute/kernels/boolean.rs index 33456773e68cf..b0f7f9f2fc28b 100644 --- a/rust/arrow/src/compute/kernels/boolean.rs +++ b/rust/arrow/src/compute/kernels/boolean.rs @@ -177,9 +177,7 @@ pub fn is_null(input: &Array) -> Result { let output = match input.data_ref().null_buffer() { None => { let len_bytes = ceil(len, 8); - MutableBuffer::new(len_bytes) - .with_bitset(len_bytes, false) - .into() + MutableBuffer::from_len_zeroed(len_bytes).into() } Some(buffer) => buffer_unary_not(buffer, input.offset(), len), }; diff --git a/rust/arrow/src/compute/kernels/cast.rs b/rust/arrow/src/compute/kernels/cast.rs index 40b33fc649de6..e2e29620cc5aa 100644 --- a/rust/arrow/src/compute/kernels/cast.rs +++ b/rust/arrow/src/compute/kernels/cast.rs @@ -273,7 +273,7 @@ pub fn cast(array: &ArrayRef, to_type: &DataType) -> Result { let cast_array = cast(array, to.data_type())?; // create offsets, where if array.len() = 2, we have [0,1,2] let offsets: Vec = (0..=array.len() as i32).collect(); - let value_offsets = Buffer::from(offsets[..].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&offsets); let list_data = ArrayData::new( to.data_type().clone(), array.len(), @@ -1391,7 +1391,7 @@ mod tests { // Construct a value array let value_data = Int32Array::from(vec![0, 0, 0, -1, -2, -1, 2, 100000000]).data(); - let value_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[0, 3, 6, 8]); // Construct a list array from the above two let list_data_type = @@ -1450,7 +1450,7 @@ mod tests { let value_data = Int32Array::from(vec![0, 0, 0, -1, -2, -1, 2, 8, 100000000]).data(); - let value_offsets = Buffer::from(&[0, 3, 6, 9].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[0, 3, 6, 9]); // Construct a list array from the above two let list_data_type = @@ -2922,12 +2922,12 @@ mod tests { // Construct a value array let value_data = ArrayData::builder(DataType::Int32) .len(8) - .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7])) .build(); // Construct a buffer for value offsets, for the nested array: // [[0, 1, 2], [3, 4, 5], [6, 7]] - let value_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[0, 3, 6, 8]); // Construct a list array from the above two let list_data_type = @@ -2944,12 +2944,12 @@ mod tests { // Construct a value array let value_data = ArrayData::builder(DataType::Int32) .len(8) - .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7])) .build(); // Construct a buffer for value offsets, for the nested array: // [[0, 1, 2], [3, 4, 5], [6, 7]] - let value_offsets = Buffer::from(&[0i64, 3, 6, 8].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[0i64, 3, 6, 8]); // Construct a list array from the above two let list_data_type = @@ -2966,9 +2966,7 @@ mod tests { // Construct a value array let value_data = ArrayData::builder(DataType::Int32) .len(10) - .add_buffer(Buffer::from( - &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9].to_byte_slice(), - )) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9])) .build(); // Construct a fixed size list array from the above two diff --git a/rust/arrow/src/compute/kernels/comparison.rs b/rust/arrow/src/compute/kernels/comparison.rs index f988973171890..7dab0ec902403 100644 --- a/rust/arrow/src/compute/kernels/comparison.rs +++ b/rust/arrow/src/compute/kernels/comparison.rs @@ -48,9 +48,7 @@ macro_rules! compare_op { combine_option_bitmap($left.data_ref(), $right.data_ref(), $left.len())?; let byte_capacity = bit_util::ceil($left.len(), 8); - let actual_capacity = bit_util::round_upto_multiple_of_64(byte_capacity); - let mut buffer = MutableBuffer::new(actual_capacity); - buffer.resize(byte_capacity); + let mut buffer = MutableBuffer::from_len_zeroed(byte_capacity); let data = buffer.as_mut_ptr(); for i in 0..$left.len() { @@ -81,9 +79,7 @@ macro_rules! compare_op_scalar { let null_bit_buffer = $left.data().null_buffer().cloned(); let byte_capacity = bit_util::ceil($left.len(), 8); - let actual_capacity = bit_util::round_upto_multiple_of_64(byte_capacity); - let mut buffer = MutableBuffer::new(actual_capacity); - buffer.resize(byte_capacity); + let mut buffer = MutableBuffer::from_len_zeroed(byte_capacity); let data = buffer.as_mut_ptr(); for i in 0..$left.len() { @@ -184,23 +180,33 @@ fn is_like_pattern(c: char) -> bool { pub fn like_utf8_scalar(left: &StringArray, right: &str) -> Result { let null_bit_buffer = left.data().null_buffer().cloned(); - let mut result = BooleanBufferBuilder::new(left.len()); + let bytes = bit_util::ceil(left.len(), 8); + let mut bool_buf = MutableBuffer::from_len_zeroed(bytes); + let bool_slice = bool_buf.as_slice_mut(); if !right.contains(is_like_pattern) { // fast path, can use equals for i in 0..left.len() { - result.append(left.value(i) == right); + if left.value(i) == right { + bit_util::set_bit(bool_slice, i); + } } } else if right.ends_with('%') && !right[..right.len() - 1].contains(is_like_pattern) { // fast path, can use starts_with + let starts_with = &right[..right.len() - 1]; for i in 0..left.len() { - result.append(left.value(i).starts_with(&right[..right.len() - 1])); + if left.value(i).starts_with(starts_with) { + bit_util::set_bit(bool_slice, i); + } } } else if right.starts_with('%') && !right[1..].contains(is_like_pattern) { // fast path, can use ends_with + let ends_with = &right[1..]; for i in 0..left.len() { - result.append(left.value(i).ends_with(&right[1..])); + if left.value(i).ends_with(ends_with) { + bit_util::set_bit(bool_slice, i); + } } } else { let re_pattern = right.replace("%", ".*").replace("_", "."); @@ -213,7 +219,9 @@ pub fn like_utf8_scalar(left: &StringArray, right: &str) -> Result for i in 0..left.len() { let haystack = left.value(i); - result.append(re.is_match(haystack)); + if re.is_match(haystack) { + bit_util::set_bit(bool_slice, i); + } } }; @@ -223,7 +231,7 @@ pub fn like_utf8_scalar(left: &StringArray, right: &str) -> Result None, null_bit_buffer, 0, - vec![result.finish()], + vec![bool_buf.into()], vec![], ); Ok(BooleanArray::from(Arc::new(data))) @@ -713,7 +721,7 @@ where }; let not_both_null_bitmap = not_both_null_bit_buffer.as_slice(); - let mut bool_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + let mut bool_buf = MutableBuffer::from_len_zeroed(num_bytes); let bool_slice = bool_buf.as_slice_mut(); // if both array slots are valid, check if list contains primitive @@ -768,7 +776,7 @@ where }; let not_both_null_bitmap = not_both_null_bit_buffer.as_slice(); - let mut bool_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + let mut bool_buf = MutableBuffer::from_len_zeroed(num_bytes); let bool_slice = &mut bool_buf; for i in 0..left_len { @@ -815,7 +823,7 @@ fn new_all_set_buffer(len: usize) -> Buffer { #[cfg(test)] mod tests { use super::*; - use crate::datatypes::{Int8Type, ToByteSlice}; + use crate::datatypes::Int8Type; use crate::{array::Int32Array, array::Int64Array, datatypes::Field}; /// Evaluate `KERNEL` with two vectors as inputs and assert against the expected output. @@ -1117,7 +1125,7 @@ mod tests { Some(7), ]) .data(); - let value_offsets = Buffer::from(&[0i64, 3, 6, 6, 9].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[0i64, 3, 6, 6, 9]); let list_data_type = DataType::LargeList(Box::new(Field::new("item", DataType::Int32, true))); let list_data = ArrayData::builder(list_data_type) diff --git a/rust/arrow/src/compute/kernels/filter.rs b/rust/arrow/src/compute/kernels/filter.rs index c0f6299f0d111..8bd0d3e70889a 100644 --- a/rust/arrow/src/compute/kernels/filter.rs +++ b/rust/arrow/src/compute/kernels/filter.rs @@ -249,7 +249,6 @@ pub fn filter_record_batch( #[cfg(test)] mod tests { use super::*; - use crate::datatypes::ToByteSlice; use crate::{ buffer::Buffer, datatypes::{DataType, Field}, @@ -505,10 +504,10 @@ mod tests { fn test_filter_list_array() { let value_data = ArrayData::builder(DataType::Int32) .len(8) - .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7])) .build(); - let value_offsets = Buffer::from(&[0i64, 3, 6, 8, 8].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[0i64, 3, 6, 8, 8]); let list_data_type = DataType::LargeList(Box::new(Field::new("item", DataType::Int32, false))); @@ -527,10 +526,10 @@ mod tests { // expected: [[3, 4, 5], null] let value_data = ArrayData::builder(DataType::Int32) .len(3) - .add_buffer(Buffer::from(&[3, 4, 5].to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&[3, 4, 5])) .build(); - let value_offsets = Buffer::from(&[0i64, 3, 3].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[0i64, 3, 3]); let list_data_type = DataType::LargeList(Box::new(Field::new("item", DataType::Int32, false))); diff --git a/rust/arrow/src/compute/kernels/length.rs b/rust/arrow/src/compute/kernels/length.rs index f0f8bf160b79c..a0107c35f911b 100644 --- a/rust/arrow/src/compute/kernels/length.rs +++ b/rust/arrow/src/compute/kernels/length.rs @@ -17,7 +17,6 @@ //! Defines kernel for length of a string array -use crate::datatypes::ToByteSlice; use crate::{array::*, buffer::Buffer}; use crate::{ datatypes::DataType, @@ -53,7 +52,7 @@ where None, null_bit_buffer, 0, - vec![Buffer::from(lengths.to_byte_slice())], + vec![Buffer::from_slice_ref(&lengths)], vec![], ); Ok(make_array(Arc::new(data))) diff --git a/rust/arrow/src/compute/kernels/limit.rs b/rust/arrow/src/compute/kernels/limit.rs index 911dbf2889d69..18db31c4df5a0 100644 --- a/rust/arrow/src/compute/kernels/limit.rs +++ b/rust/arrow/src/compute/kernels/limit.rs @@ -35,7 +35,7 @@ mod tests { use super::*; use crate::array::*; use crate::buffer::Buffer; - use crate::datatypes::{DataType, Field, ToByteSlice}; + use crate::datatypes::{DataType, Field}; use crate::util::bit_util; use std::sync::Arc; @@ -91,15 +91,12 @@ mod tests { // Construct a value array let value_data = ArrayData::builder(DataType::Int32) .len(10) - .add_buffer(Buffer::from( - &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9].to_byte_slice(), - )) + .add_buffer(Buffer::from_slice_ref(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9])) .build(); // Construct a buffer for value offsets, for the nested array: // [[0, 1], null, [2, 3], null, [4, 5], null, [6, 7, 8], null, [9]] - let value_offsets = - Buffer::from(&[0, 2, 2, 4, 4, 6, 6, 9, 9, 10].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[0, 2, 2, 4, 4, 6, 6, 9, 9, 10]); // 01010101 00000001 let mut null_bits: [u8; 2] = [0; 2]; bit_util::set_bit(&mut null_bits, 0); @@ -149,7 +146,7 @@ mod tests { .build(); let int_data = ArrayData::builder(DataType::Int32) .len(5) - .add_buffer(Buffer::from([0, 28, 42, 0, 0].to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&[0, 28, 42, 0, 0])) .null_bit_buffer(Buffer::from([0b00000110])) .build(); diff --git a/rust/arrow/src/compute/kernels/sort.rs b/rust/arrow/src/compute/kernels/sort.rs index ace560e8bf7e7..56e7c79ca591d 100644 --- a/rust/arrow/src/compute/kernels/sort.rs +++ b/rust/arrow/src/compute/kernels/sort.rs @@ -292,7 +292,7 @@ fn sort_boolean( // collect results directly into a buffer instead of a vec to avoid another aligned allocation let mut result = MutableBuffer::new(values.len() * std::mem::size_of::()); // sets len to capacity so we can access the whole buffer as a typed slice - result.resize(values.len() * std::mem::size_of::()); + result.resize(values.len() * std::mem::size_of::(), 0); let result_slice: &mut [u32] = result.typed_data_mut(); debug_assert_eq!(result_slice.len(), nulls_len + valids_len); @@ -357,7 +357,7 @@ where // collect results directly into a buffer instead of a vec to avoid another aligned allocation let mut result = MutableBuffer::new(values.len() * std::mem::size_of::()); // sets len to capacity so we can access the whole buffer as a typed slice - result.resize(values.len() * std::mem::size_of::()); + result.resize(values.len() * std::mem::size_of::(), 0); let result_slice: &mut [u32] = result.typed_data_mut(); debug_assert_eq!(result_slice.len(), nulls_len + valids_len); diff --git a/rust/arrow/src/compute/kernels/substring.rs b/rust/arrow/src/compute/kernels/substring.rs index 4c9d1995feb2e..2ad9bc3c26273 100644 --- a/rust/arrow/src/compute/kernels/substring.rs +++ b/rust/arrow/src/compute/kernels/substring.rs @@ -17,7 +17,7 @@ //! Defines kernel to extract a substring of a \[Large\]StringArray -use crate::{array::*, buffer::Buffer, datatypes::ToByteSlice}; +use crate::{array::*, buffer::Buffer}; use crate::{ datatypes::DataType, error::{ArrowError, Result}, @@ -81,8 +81,8 @@ fn generic_substring( null_bit_buffer, 0, vec![ - Buffer::from(new_offsets.to_byte_slice()), - Buffer::from(&new_values[..]), + Buffer::from_slice_ref(&new_offsets), + Buffer::from_slice_ref(&new_values), ], vec![], ); diff --git a/rust/arrow/src/compute/kernels/take.rs b/rust/arrow/src/compute/kernels/take.rs index 85567ad8ceef6..e60cad10bbc90 100644 --- a/rust/arrow/src/compute/kernels/take.rs +++ b/rust/arrow/src/compute/kernels/take.rs @@ -275,8 +275,8 @@ where { let data_len = indices.len(); - let mut buffer = MutableBuffer::new(data_len * std::mem::size_of::()); - buffer.resize(data_len * std::mem::size_of::()); + let mut buffer = + MutableBuffer::from_len_zeroed(data_len * std::mem::size_of::()); let data = buffer.typed_data_mut(); let nulls; @@ -344,7 +344,7 @@ where let data_len = indices.len(); let num_byte = bit_util::ceil(data_len, 8); - let mut val_buf = MutableBuffer::new(num_byte).with_bitset(num_byte, false); + let mut val_buf = MutableBuffer::from_len_zeroed(num_byte); let val_slice = val_buf.as_slice_mut(); @@ -420,8 +420,7 @@ where let data_len = indices.len(); let bytes_offset = (data_len + 1) * std::mem::size_of::(); - let mut offsets_buffer = MutableBuffer::new(bytes_offset); - offsets_buffer.resize(bytes_offset); + let mut offsets_buffer = MutableBuffer::from_len_zeroed(bytes_offset); let offsets = offsets_buffer.typed_data_mut(); let mut values = Vec::with_capacity(bytes_offset); @@ -559,7 +558,7 @@ where }, ); } - let value_offsets = Buffer::from(offsets[..].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&offsets); // create a new list with taken data and computed null information let list_data = ArrayDataBuilder::new(values.data_type().clone()) .len(indices.len()) @@ -966,7 +965,7 @@ mod tests { let value_data = Int32Array::from(vec![0, 0, 0, -1, -2, -1, 2, 3]).data(); // Construct offsets let value_offsets: [$offset_type; 4] = [0, 3, 6, 8]; - let value_offsets = Buffer::from(&value_offsets.to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&value_offsets); // Construct a list array from the above two let list_data_type = DataType::$list_data_type(Box::new(Field::new( "item", @@ -1004,7 +1003,7 @@ mod tests { .data(); // construct offsets let expected_offsets: [$offset_type; 6] = [0, 2, 2, 5, 7, 10]; - let expected_offsets = Buffer::from(&expected_offsets.to_byte_slice()); + let expected_offsets = Buffer::from_slice_ref(&expected_offsets); // construct list array from the two let expected_list_data = ArrayData::builder(list_data_type) .len(5) @@ -1038,7 +1037,7 @@ mod tests { .data(); // Construct offsets let value_offsets: [$offset_type; 5] = [0, 3, 6, 7, 9]; - let value_offsets = Buffer::from(&value_offsets.to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&value_offsets); // Construct a list array from the above two let list_data_type = DataType::$list_data_type(Box::new(Field::new( "item", @@ -1076,7 +1075,7 @@ mod tests { .data(); // construct offsets let expected_offsets: [$offset_type; 6] = [0, 1, 1, 4, 6, 9]; - let expected_offsets = Buffer::from(&expected_offsets.to_byte_slice()); + let expected_offsets = Buffer::from_slice_ref(&expected_offsets); // construct list array from the two let expected_list_data = ArrayData::builder(list_data_type) .len(5) @@ -1109,7 +1108,7 @@ mod tests { .data(); // Construct offsets let value_offsets: [$offset_type; 5] = [0, 3, 6, 6, 8]; - let value_offsets = Buffer::from(&value_offsets.to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&value_offsets); // Construct a list array from the above two let list_data_type = DataType::$list_data_type(Box::new(Field::new( "item", @@ -1146,7 +1145,7 @@ mod tests { .data(); // construct offsets let expected_offsets: [$offset_type; 6] = [0, 0, 0, 3, 5, 8]; - let expected_offsets = Buffer::from(&expected_offsets.to_byte_slice()); + let expected_offsets = Buffer::from_slice_ref(&expected_offsets); // construct list array from the two let mut null_bits: [u8; 1] = [0; 1]; bit_util::set_bit(&mut null_bits, 2); @@ -1277,7 +1276,7 @@ mod tests { // Construct a value array, [[0,0,0], [-1,-2,-1], [2,3]] let value_data = Int32Array::from(vec![0, 0, 0, -1, -2, -1, 2, 3]).data(); // Construct offsets - let value_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[0, 3, 6, 8]); // Construct a list array from the above two let list_data_type = DataType::List(Box::new(Field::new("item", DataType::Int32, false))); diff --git a/rust/arrow/src/compute/util.rs b/rust/arrow/src/compute/util.rs index 1b20e18649e33..e613146f719e2 100644 --- a/rust/arrow/src/compute/util.rs +++ b/rust/arrow/src/compute/util.rs @@ -175,7 +175,7 @@ pub(super) mod tests { use std::sync::Arc; - use crate::datatypes::{DataType, ToByteSlice}; + use crate::datatypes::DataType; use crate::util::bit_util; use crate::{array::ArrayData, buffer::MutableBuffer}; @@ -311,13 +311,8 @@ pub(super) mod tests { T::DATA_TYPE, list_null_count == 0, ))), - Buffer::from( - offset - .into_iter() - .map(|x| x as i32) - .collect::>() - .as_slice() - .to_byte_slice(), + Buffer::from_slice_ref( + &offset.into_iter().map(|x| x as i32).collect::>(), ), ) } else if TypeId::of::() == TypeId::of::() { @@ -327,7 +322,7 @@ pub(super) mod tests { T::DATA_TYPE, list_null_count == 0, ))), - Buffer::from(offset.as_slice().to_byte_slice()), + Buffer::from_slice_ref(&offset), ) } else { unreachable!() diff --git a/rust/arrow/src/datatypes.rs b/rust/arrow/src/datatypes.rs index 95ee4d286b49d..a49e8cb2baf36 100644 --- a/rust/arrow/src/datatypes.rs +++ b/rust/arrow/src/datatypes.rs @@ -943,6 +943,7 @@ pub trait ToByteSlice { } impl ToByteSlice for [T] { + #[inline] fn to_byte_slice(&self) -> &[u8] { let raw_ptr = self.as_ptr() as *const T as *const u8; unsafe { from_raw_parts(raw_ptr, self.len() * size_of::()) } @@ -950,6 +951,7 @@ impl ToByteSlice for [T] { } impl ToByteSlice for T { + #[inline] fn to_byte_slice(&self) -> &[u8] { let raw_ptr = self as *const T as *const u8; unsafe { from_raw_parts(raw_ptr, size_of::()) } diff --git a/rust/arrow/src/json/reader.rs b/rust/arrow/src/json/reader.rs index 30fbc59ed83a1..6f2a541353cd7 100644 --- a/rust/arrow/src/json/reader.rs +++ b/rust/arrow/src/json/reader.rs @@ -866,13 +866,13 @@ impl Decoder { let list_len = rows.len(); let num_list_bytes = bit_util::ceil(list_len, 8); let mut offsets = Vec::with_capacity(list_len + 1); - let mut list_nulls = - MutableBuffer::new(num_list_bytes).with_bitset(num_list_bytes, false); + let mut list_nulls = MutableBuffer::from_len_zeroed(num_list_bytes); + let list_nulls = list_nulls.as_slice_mut(); offsets.push(cur_offset); rows.iter().enumerate().for_each(|(i, v)| { if let Value::Array(a) = v { cur_offset += OffsetSize::from_usize(a.len()).unwrap(); - bit_util::set_bit(list_nulls.as_slice_mut(), i); + bit_util::set_bit(list_nulls, i); } else if let Value::Null = v { // value is null, not incremented } else { @@ -885,8 +885,7 @@ impl Decoder { DataType::Null => NullArray::new(valid_len).data(), DataType::Boolean => { let num_bytes = bit_util::ceil(valid_len, 8); - let mut bool_values = - MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + let mut bool_values = MutableBuffer::from_len_zeroed(num_bytes); let mut bool_nulls = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true); let mut curr_index = 0; @@ -962,8 +961,7 @@ impl Decoder { // extract list values, with non-lists converted to Value::Null let len = rows.len(); let num_bytes = bit_util::ceil(len, 8); - let mut null_buffer = - MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes); let mut struct_index = 0; let rows: Vec = rows .iter() @@ -1003,7 +1001,7 @@ impl Decoder { // build list let list_data = ArrayData::builder(DataType::List(Box::new(list_field.clone()))) .len(list_len) - .add_buffer(Buffer::from(offsets.to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&offsets)) .add_child_data(array_data) .null_bit_buffer(list_nulls.into()) .build(); @@ -1164,8 +1162,7 @@ impl Decoder { DataType::Struct(fields) => { let len = rows.len(); let num_bytes = bit_util::ceil(len, 8); - let mut null_buffer = - MutableBuffer::new(num_bytes).with_bitset(num_bytes, false); + let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes); let struct_rows = rows .iter() .enumerate() @@ -1889,7 +1886,7 @@ mod tests { // test that the list offsets are correct assert_eq!( cc.data().buffers()[0], - Buffer::from(vec![0i32, 2, 2, 4, 5].to_byte_slice()) + Buffer::from_slice_ref(&[0i32, 2, 2, 4, 5]) ); let cc = cc.values(); let cc = cc.as_any().downcast_ref::().unwrap(); @@ -1910,7 +1907,7 @@ mod tests { // test that the list offsets are correct assert_eq!( dd.data().buffers()[0], - Buffer::from(vec![0i32, 1, 1, 2, 6].to_byte_slice()) + Buffer::from_slice_ref(&[0i32, 1, 1, 2, 6]) ); let dd = dd.values(); let dd = dd.as_any().downcast_ref::().unwrap(); @@ -2031,7 +2028,7 @@ mod tests { .build(); let a_list = ArrayDataBuilder::new(a_field.data_type().clone()) .len(5) - .add_buffer(Buffer::from(vec![0i32, 2, 3, 6, 6, 6].to_byte_slice())) + .add_buffer(Buffer::from_slice_ref(&[0i32, 2, 3, 6, 6, 6])) .add_child_data(a) .null_bit_buffer(Buffer::from(vec![0b00010111])) .build(); @@ -2046,7 +2043,7 @@ mod tests { let expected = expected.as_any().downcast_ref::().unwrap(); assert_eq!( read.data().buffers()[0], - Buffer::from(vec![0i32, 2, 3, 6, 6, 6].to_byte_slice()) + Buffer::from_slice_ref(&[0i32, 2, 3, 6, 6, 6]) ); // compare list null buffers assert_eq!(read.data().null_buffer(), expected.data().null_buffer()); diff --git a/rust/arrow/src/lib.rs b/rust/arrow/src/lib.rs index 1fa3cddec2a57..5d14ab59611a3 100644 --- a/rust/arrow/src/lib.rs +++ b/rust/arrow/src/lib.rs @@ -139,7 +139,7 @@ mod arch; pub mod array; pub mod bitmap; pub mod buffer; -pub mod bytes; +mod bytes; pub mod compute; pub mod csv; pub mod datatypes; @@ -147,7 +147,7 @@ pub mod error; pub mod ffi; pub mod ipc; pub mod json; -pub mod memory; +mod memory; pub mod record_batch; pub mod tensor; pub mod util; diff --git a/rust/arrow/src/memory.rs b/rust/arrow/src/memory.rs index ad103b06280b0..0ea8845decce8 100644 --- a/rust/arrow/src/memory.rs +++ b/rust/arrow/src/memory.rs @@ -141,7 +141,30 @@ const BYPASS_PTR: NonNull = unsafe { NonNull::new_unchecked(ALIGNMENT as *mu // If this number is not zero after all objects have been `drop`, there is a memory leak pub static mut ALLOCATIONS: AtomicIsize = AtomicIsize::new(0); +/// Allocates a cache-aligned memory region of `size` bytes with uninitialized values. +/// This is more performant than using [allocate_aligned_zeroed] when all bytes will have +/// an unknown or non-zero value and is semantically similar to `malloc`. pub fn allocate_aligned(size: usize) -> NonNull { + unsafe { + if size == 0 { + // In a perfect world, there is no need to request zero size allocation. + // Currently, passing zero sized layout to alloc is UB. + // This will dodge allocator api for any type. + BYPASS_PTR + } else { + ALLOCATIONS.fetch_add(size as isize, std::sync::atomic::Ordering::SeqCst); + + let layout = Layout::from_size_align_unchecked(size, ALIGNMENT); + let raw_ptr = std::alloc::alloc(layout); + NonNull::new(raw_ptr).unwrap_or_else(|| handle_alloc_error(layout)) + } + } +} + +/// Allocates a cache-aligned memory region of `size` bytes with `0u8` on all of them. +/// This is more performant than using [allocate_aligned] and setting all bytes to zero +/// and is semantically similar to `calloc`. +pub fn allocate_aligned_zeroed(size: usize) -> NonNull { unsafe { if size == 0 { // In a perfect world, there is no need to request zero size allocation. @@ -210,16 +233,9 @@ pub unsafe fn reallocate( Layout::from_size_align_unchecked(old_size, ALIGNMENT), new_size, ); - let ptr = NonNull::new(raw_ptr).unwrap_or_else(|| { + NonNull::new(raw_ptr).unwrap_or_else(|| { handle_alloc_error(Layout::from_size_align_unchecked(new_size, ALIGNMENT)) - }); - - if new_size > old_size { - ptr.as_ptr() - .add(old_size) - .write_bytes(0, new_size - old_size); - } - ptr + }) } /// # Safety @@ -241,14 +257,7 @@ pub unsafe fn memcpy(dst: NonNull, src: NonNull, count: usize) { } } -/// Check if the pointer `p` is aligned to offset `a`. -pub fn is_aligned(p: NonNull, a: usize) -> bool { - let a_minus_one = a.wrapping_sub(1); - let pmoda = p.as_ptr() as usize & a_minus_one; - pmoda == 0 -} - -pub fn is_ptr_aligned(p: NonNull) -> bool { +pub fn is_ptr_aligned(p: NonNull) -> bool { p.as_ptr().align_offset(align_of::()) == 0 } @@ -265,20 +274,4 @@ mod tests { unsafe { free_aligned(p, 1024) }; } } - - #[test] - fn test_is_aligned() { - // allocate memory aligned to 64-byte - let ptr = allocate_aligned(10); - assert_eq!(true, is_aligned::(ptr, 1)); - assert_eq!(true, is_aligned::(ptr, 2)); - assert_eq!(true, is_aligned::(ptr, 4)); - - // now make the memory aligned to 63-byte - let ptr = unsafe { NonNull::new_unchecked(ptr.as_ptr().offset(1)) }; - assert_eq!(true, is_aligned::(ptr, 1)); - assert_eq!(false, is_aligned::(ptr, 2)); - assert_eq!(false, is_aligned::(ptr, 4)); - unsafe { free_aligned(NonNull::new_unchecked(ptr.as_ptr().offset(-1)), 10) }; - } } diff --git a/rust/arrow/src/util/integration_util.rs b/rust/arrow/src/util/integration_util.rs index 4a4b865e8f443..1cc4ad7882c7f 100644 --- a/rust/arrow/src/util/integration_util.rs +++ b/rust/arrow/src/util/integration_util.rs @@ -885,7 +885,7 @@ mod tests { let utf8s = StringArray::from(vec![Some("aa"), None, Some("bbb")]); let value_data = Int32Array::from(vec![None, Some(2), None, None]); - let value_offsets = Buffer::from(&[0, 3, 4, 4].to_byte_slice()); + let value_offsets = Buffer::from_slice_ref(&[0, 3, 4, 4]); let list_data_type = DataType::List(Box::new(Field::new("item", DataType::Int32, true))); let list_data = ArrayData::builder(list_data_type) diff --git a/rust/parquet/src/arrow/array_reader.rs b/rust/parquet/src/arrow/array_reader.rs index 71b84cd981b09..dba2569c6e016 100644 --- a/rust/parquet/src/arrow/array_reader.rs +++ b/rust/parquet/src/arrow/array_reader.rs @@ -1074,7 +1074,7 @@ impl ArrayReader for StructArrayReader { // calculate struct def level data let buffer_size = children_array_len * size_of::(); let mut def_level_data_buffer = MutableBuffer::new(buffer_size); - def_level_data_buffer.resize(buffer_size); + def_level_data_buffer.resize(buffer_size, 0); let def_level_data = def_level_data_buffer.typed_data_mut(); diff --git a/rust/parquet/src/arrow/record_reader.rs b/rust/parquet/src/arrow/record_reader.rs index 6ab88599a6b1b..8591911922199 100644 --- a/rust/parquet/src/arrow/record_reader.rs +++ b/rust/parquet/src/arrow/record_reader.rs @@ -54,7 +54,7 @@ impl RecordReader { let (def_levels, null_map) = if column_schema.max_def_level() > 0 { ( Some(MutableBuffer::new(MIN_BATCH_SIZE)), - Some(BooleanBufferBuilder::new(MIN_BATCH_SIZE)), + Some(BooleanBufferBuilder::new(0)), ) } else { (None, None) @@ -161,14 +161,14 @@ impl RecordReader { let num_bytes = num_left_values * size_of::(); let new_len = self.num_values * size_of::(); - new_buffer.resize(num_bytes); + new_buffer.resize(num_bytes, 0); let new_def_levels = new_buffer.as_slice_mut(); let left_def_levels = &def_levels_buf.as_slice_mut()[new_len..]; new_def_levels[0..num_bytes].copy_from_slice(&left_def_levels[0..num_bytes]); - def_levels_buf.resize(new_len); + def_levels_buf.resize(new_len, 0); Some(new_buffer) } else { None @@ -188,14 +188,14 @@ impl RecordReader { let num_bytes = num_left_values * size_of::(); let new_len = self.num_values * size_of::(); - new_buffer.resize(num_bytes); + new_buffer.resize(num_bytes, 0); let new_rep_levels = new_buffer.as_slice_mut(); let left_rep_levels = &rep_levels_buf.as_slice_mut()[new_len..]; new_rep_levels[0..num_bytes].copy_from_slice(&left_rep_levels[0..num_bytes]); - rep_levels_buf.resize(new_len); + rep_levels_buf.resize(new_len, 0); Some(new_buffer) } else { @@ -215,14 +215,14 @@ impl RecordReader { let num_bytes = num_left_values * T::get_type_size(); let new_len = self.num_values * T::get_type_size(); - new_buffer.resize(num_bytes); + new_buffer.resize(num_bytes, 0); let new_records = new_buffer.as_slice_mut(); let left_records = &mut self.records.as_slice_mut()[new_len..]; new_records[0..num_bytes].copy_from_slice(&left_records[0..num_bytes]); - self.records.resize(new_len); + self.records.resize(new_len, 0); Ok(replace(&mut self.records, new_buffer).into()) } @@ -279,12 +279,12 @@ impl RecordReader { fn read_one_batch(&mut self, batch_size: usize) -> Result { // Reserve spaces self.records - .resize(self.records.len() + batch_size * T::get_type_size()); + .resize(self.records.len() + batch_size * T::get_type_size(), 0); if let Some(ref mut buf) = self.rep_levels { - buf.resize(buf.len() + batch_size * size_of::()); + buf.resize(buf.len() + batch_size * size_of::(), 0); } if let Some(ref mut buf) = self.def_levels { - buf.resize(buf.len() + batch_size * size_of::()); + buf.resize(buf.len() + batch_size * size_of::(), 0); } let values_written = self.values_written; @@ -413,16 +413,16 @@ impl RecordReader { fn set_values_written(&mut self, new_values_written: usize) -> Result<()> { self.values_written = new_values_written; self.records - .resize(self.values_written * T::get_type_size()); + .resize(self.values_written * T::get_type_size(), 0); let new_levels_len = self.values_written * size_of::(); if let Some(ref mut buf) = self.rep_levels { - buf.resize(new_levels_len) + buf.resize(new_levels_len, 0) }; if let Some(ref mut buf) = self.def_levels { - buf.resize(new_levels_len) + buf.resize(new_levels_len, 0) }; Ok(())