Skip to content

Commit

Permalink
Binary / row helpers (#6096)
Browse files Browse the repository at this point in the history
* Allow access to the underlying bytes of the row

Currently these are accessible via `AsRef`, but that trait only gives
you the bytes with the lifetime of the `Row` struct and not the lifetime
of the backing data.

* Conversions to and from BinaryArray

* Clippy fixes

* Doc comments for binary conversions

* Converting to binary now returns a result

* Add some negative tests for invalid bytes

* Address clippy and review comments
  • Loading branch information
bkirwi authored Oct 8, 2024
1 parent 1be268d commit accf625
Showing 1 changed file with 178 additions and 2 deletions.
180 changes: 178 additions & 2 deletions arrow-row/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ use std::sync::Arc;
use arrow_array::cast::*;
use arrow_array::types::ArrowDictionaryKeyType;
use arrow_array::*;
use arrow_buffer::ArrowNativeType;
use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::*;
use variable::{decode_binary_view, decode_string_view};
Expand Down Expand Up @@ -739,6 +739,48 @@ impl RowConverter {
}
}

/// Create a new [Rows] instance from the given binary data.
///
/// ```
/// # use std::sync::Arc;
/// # use std::collections::HashSet;
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::StringArray;
/// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
/// # use arrow_schema::DataType;
/// #
/// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
/// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
/// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
///
/// // We can convert rows into binary format and back in batch.
/// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
/// let binary = rows.try_into_binary().expect("known-small array");
/// let converted = converter.from_binary(binary.clone());
/// assert!(converted.iter().eq(values.iter().map(|r| r.row())));
/// ```
///
/// # Panics
///
/// This function expects the passed [BinaryArray] to contain valid row data as produced by this
/// [RowConverter]. It will panic if any rows are null. Operations on the returned [Rows] may
/// panic if the data is malformed.
pub fn from_binary(&self, array: BinaryArray) -> Rows {
assert_eq!(
array.null_count(),
0,
"can't construct Rows instance from array with nulls"
);
Rows {
buffer: array.values().to_vec(),
offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(),
config: RowConfig {
fields: Arc::clone(&self.fields),
validate_utf8: true,
},
}
}

/// Convert raw bytes into [`ArrayRef`]
///
/// # Safety
Expand Down Expand Up @@ -879,6 +921,55 @@ impl Rows {
+ self.buffer.len()
+ self.offsets.len() * std::mem::size_of::<usize>()
}

/// Create a [BinaryArray] from the [Rows] data without reallocating the
/// underlying bytes.
///
///
/// ```
/// # use std::sync::Arc;
/// # use std::collections::HashSet;
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::StringArray;
/// # use arrow_row::{OwnedRow, Row, RowConverter, RowParser, SortField};
/// # use arrow_schema::DataType;
/// #
/// let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
/// let array = StringArray::from(vec!["hello", "world", "a", "a", "hello"]);
/// let rows = converter.convert_columns(&[Arc::new(array)]).unwrap();
///
/// // We can convert rows into binary format and back.
/// let values: Vec<OwnedRow> = rows.iter().map(|r| r.owned()).collect();
/// let binary = rows.try_into_binary().expect("known-small array");
/// let parser = converter.parser();
/// let parsed: Vec<OwnedRow> =
/// binary.iter().flatten().map(|b| parser.parse(b).owned()).collect();
/// assert_eq!(values, parsed);
/// ```
///
/// # Errors
///
/// This function will return an error if there is more data than can be stored in
/// a [BinaryArray] -- i.e. if the total data size is more than 2GiB.
pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
if self.buffer.len() > i32::MAX as usize {
return Err(ArrowError::InvalidArgumentError(format!(
"{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
self.buffer.len()
)));
}
// We've checked that the buffer length fits in an i32; so all offsets into that buffer should fit as well.
let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
// SAFETY: offsets buffer is nonempty, monotonically increasing, and all represent valid indexes into buffer.
let array = unsafe {
BinaryArray::new_unchecked(
OffsetBuffer::new_unchecked(offsets_scalar),
Buffer::from_vec(self.buffer),
None,
)
};
Ok(array)
}
}

impl<'a> IntoIterator for &'a Rows {
Expand Down Expand Up @@ -962,6 +1053,11 @@ impl<'a> Row<'a> {
config: self.config.clone(),
}
}

/// The row's bytes, with the lifetime of the underlying data.
pub fn data(&self) -> &'a [u8] {
self.data
}
}

// Manually derive these as don't wish to include `fields`
Expand Down Expand Up @@ -1827,6 +1923,68 @@ mod tests {
converter.convert_rows(std::iter::once(utf8_row)).unwrap();
}

#[test]
#[should_panic(expected = "Encountered non UTF-8 data")]
fn test_invalid_utf8_array() {
let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
let rows = converter.convert_columns(&[array]).unwrap();
let binary_rows = rows.try_into_binary().expect("known-small rows");

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parsed = converter.from_binary(binary_rows);

converter.convert_rows(parsed.iter()).unwrap();
}

#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_empty() {
let binary_row: &[u8] = &[];

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parser = converter.parser();
let utf8_row = parser.parse(binary_row.as_ref());

converter.convert_rows(std::iter::once(utf8_row)).unwrap();
}

#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_empty_array() {
let row: &[u8] = &[];
let binary_rows = BinaryArray::from(vec![row]);

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parsed = converter.from_binary(binary_rows);

converter.convert_rows(parsed.iter()).unwrap();
}

#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_truncated() {
let binary_row: &[u8] = &[0x02];

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parser = converter.parser();
let utf8_row = parser.parse(binary_row.as_ref());

converter.convert_rows(std::iter::once(utf8_row)).unwrap();
}

#[test]
#[should_panic(expected = "index out of bounds")]
fn test_invalid_truncated_array() {
let row: &[u8] = &[0x02];
let binary_rows = BinaryArray::from(vec![row]);

let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
let parsed = converter.from_binary(binary_rows);

converter.convert_rows(parsed.iter()).unwrap();
}

#[test]
#[should_panic(expected = "rows were not produced by this RowConverter")]
fn test_different_converter() {
Expand Down Expand Up @@ -2255,7 +2413,7 @@ mod tests {

let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();

let columns = options
let columns: Vec<SortField> = options
.into_iter()
.zip(&arrays)
.map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
Expand Down Expand Up @@ -2288,6 +2446,24 @@ mod tests {
actual.to_data().validate_full().unwrap();
dictionary_eq(actual, expected)
}

// Check that we can convert
let rows = rows.try_into_binary().expect("reasonable size");
let parser = converter.parser();
let back = converter
.convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
.unwrap();
for (actual, expected) in back.iter().zip(&arrays) {
actual.to_data().validate_full().unwrap();
dictionary_eq(actual, expected)
}

let rows = converter.from_binary(rows);
let back = converter.convert_rows(&rows).unwrap();
for (actual, expected) in back.iter().zip(&arrays) {
actual.to_data().validate_full().unwrap();
dictionary_eq(actual, expected)
}
}
}

Expand Down

0 comments on commit accf625

Please sign in to comment.