Skip to content

Commit

Permalink
Support Rust structures --> RecordBatch by adding Serde support t…
Browse files Browse the repository at this point in the history
…o `RawDecoder` (#3949) (#3979)

* Add serde support to RawDecoder (#3949)

* Clippy

* More examples

* Use BTreeMap for deterministic test output

* Use new Field constructors

* Review feedback
  • Loading branch information
tustvold authored Apr 5, 2023
1 parent 2b2ce2f commit 2b354a3
Show file tree
Hide file tree
Showing 6 changed files with 675 additions and 0 deletions.
2 changes: 2 additions & 0 deletions arrow-json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ arrow-schema = { workspace = true }
half = { version = "2.1", default-features = false }
indexmap = { version = "1.9", default-features = false, features = ["std"] }
num = { version = "0.4", default-features = false, features = ["std"] }
serde = { version = "1.0", default-features = false }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
chrono = { version = "0.4.23", default-features = false, features = ["clock"] }
lexical-core = { version = "0.8", default-features = false }

[dev-dependencies]
tempfile = "3.3"
flate2 = { version = "1", default-features = false, features = ["rust_backend"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
181 changes: 181 additions & 0 deletions arrow-json/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,21 @@ use crate::raw::struct_array::StructArrayDecoder;
use crate::raw::tape::{Tape, TapeDecoder, TapeElement};
use crate::raw::timestamp_array::TimestampArrayDecoder;
use arrow_array::timezone::Tz;
use arrow_array::types::Float32Type;
use arrow_array::types::*;
use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader};
use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType, SchemaRef, TimeUnit};
use chrono::Utc;
use serde::Serialize;
use std::io::BufRead;

mod boolean_array;
mod decimal_array;
mod list_array;
mod map_array;
mod primitive_array;
mod serializer;
mod string_array;
mod struct_array;
mod tape;
Expand Down Expand Up @@ -233,6 +236,184 @@ impl RawDecoder {
self.tape_decoder.decode(buf)
}

/// Serialize `rows` to this [`RawDecoder`]
///
/// This provides a simple way to convert [serde]-compatible datastructures into arrow
/// [`RecordBatch`].
///
/// Custom conversion logic as described in [arrow_array::builder] will likely outperform this,
/// especially where the schema is known at compile-time, however, this provides a mechanism
/// to get something up and running quickly
///
/// It can be used with [`serde_json::Value`]
///
/// ```
/// # use std::sync::Arc;
/// # use serde_json::{Value, json};
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::types::Float32Type;
/// # use arrow_json::RawReaderBuilder;
/// # use arrow_schema::{DataType, Field, Schema};
/// let json = vec![json!({"float": 2.3}), json!({"float": 5.7})];
///
/// let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]);
/// let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
///
/// decoder.serialize(&json).unwrap();
/// let batch = decoder.flush().unwrap().unwrap();
/// assert_eq!(batch.num_rows(), 2);
/// assert_eq!(batch.num_columns(), 1);
/// let values = batch.column(0).as_primitive::<Float32Type>().values();
/// assert_eq!(values, &[2.3, 5.7])
/// ```
///
/// Or with arbitrary [`Serialize`] types
///
/// ```
/// # use std::sync::Arc;
/// # use arrow_json::RawReaderBuilder;
/// # use arrow_schema::{DataType, Field, Schema};
/// # use serde::Serialize;
/// # use arrow_array::cast::AsArray;
/// # use arrow_array::types::{Float32Type, Int32Type};
/// #
/// #[derive(Serialize)]
/// struct MyStruct {
/// int32: i32,
/// float: f32,
/// }
///
/// let schema = Schema::new(vec![
/// Field::new("int32", DataType::Int32, false),
/// Field::new("float", DataType::Float32, false),
/// ]);
///
/// let rows = vec![
/// MyStruct{ int32: 0, float: 3. },
/// MyStruct{ int32: 4, float: 67.53 },
/// ];
///
/// let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
/// decoder.serialize(&rows).unwrap();
///
/// let batch = decoder.flush().unwrap().unwrap();
///
/// // Expect batch containing two columns
/// let int32 = batch.column(0).as_primitive::<Int32Type>();
/// assert_eq!(int32.values(), &[0, 4]);
///
/// let float = batch.column(1).as_primitive::<Float32Type>();
/// assert_eq!(float.values(), &[3., 67.53]);
/// ```
///
/// Or even complex nested types
///
/// ```
/// # use std::collections::BTreeMap;
/// # use std::sync::Arc;
/// # use arrow_array::StructArray;
/// # use arrow_cast::display::{ArrayFormatter, FormatOptions};
/// # use arrow_json::RawReaderBuilder;
/// # use arrow_schema::{DataType, Field, Fields, Schema};
/// # use serde::Serialize;
/// #
/// #[derive(Serialize)]
/// struct MyStruct {
/// int32: i32,
/// list: Vec<f64>,
/// nested: Vec<Option<Nested>>,
/// }
///
/// impl MyStruct {
/// /// Returns the [`Fields`] for [`MyStruct`]
/// fn fields() -> Fields {
/// let nested = DataType::Struct(Nested::fields());
/// Fields::from([
/// Arc::new(Field::new("int32", DataType::Int32, false)),
/// Arc::new(Field::new_list(
/// "list",
/// Field::new("element", DataType::Float64, false),
/// false,
/// )),
/// Arc::new(Field::new_list(
/// "nested",
/// Field::new("element", nested, true),
/// true,
/// )),
/// ])
/// }
/// }
///
/// #[derive(Serialize)]
/// struct Nested {
/// map: BTreeMap<String, Vec<String>>
/// }
///
/// impl Nested {
/// /// Returns the [`Fields`] for [`Nested`]
/// fn fields() -> Fields {
/// let element = Field::new("element", DataType::Utf8, false);
/// Fields::from([
/// Arc::new(Field::new_map(
/// "map",
/// "entries",
/// Field::new("key", DataType::Utf8, false),
/// Field::new_list("value", element, false),
/// false, // sorted
/// false, // nullable
/// ))
/// ])
/// }
/// }
///
/// let data = vec![
/// MyStruct {
/// int32: 34,
/// list: vec![1., 2., 34.],
/// nested: vec![
/// None,
/// Some(Nested {
/// map: vec![
/// ("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]),
/// ("key2".to_string(), vec!["baz".to_string()])
/// ].into_iter().collect()
/// })
/// ]
/// },
/// MyStruct {
/// int32: 56,
/// list: vec![],
/// nested: vec![]
/// },
/// MyStruct {
/// int32: 24,
/// list: vec![-1., 245.],
/// nested: vec![None]
/// }
/// ];
///
/// let schema = Schema::new(MyStruct::fields());
/// let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap();
/// decoder.serialize(&data).unwrap();
/// let batch = decoder.flush().unwrap().unwrap();
/// assert_eq!(batch.num_rows(), 3);
/// assert_eq!(batch.num_columns(), 3);
///
/// // Convert to StructArray to format
/// let s = StructArray::from(batch);
/// let options = FormatOptions::default().with_null("null");
/// let formatter = ArrayFormatter::try_new(&s, &options).unwrap();
///
/// assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}");
/// assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}");
/// assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}");
/// ```
///
/// Note: this ignores any batch size setting, and always decodes all rows
pub fn serialize<S: Serialize>(&mut self, rows: &[S]) -> Result<(), ArrowError> {
self.tape_decoder.serialize(rows)
}

/// Flushes the currently buffered data to a [`RecordBatch`]
///
/// Returns `Ok(None)` if no buffered data
Expand Down
Loading

0 comments on commit 2b354a3

Please sign in to comment.