From 2b354a3e8d7b57f2ad5eb12aeb283cc15bc9e170 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Wed, 5 Apr 2023 18:35:15 +0100 Subject: [PATCH] Support Rust structures --> `RecordBatch` by adding `Serde` support to `RawDecoder` (#3949) (#3979) * Add serde support to RawDecoder (#3949) * Clippy * More examples * Use BTreeMap for deterministic test output * Use new Field constructors * Review feedback --- arrow-json/Cargo.toml | 2 + arrow-json/src/raw/mod.rs | 181 +++++++++++++ arrow-json/src/raw/serializer.rs | 422 +++++++++++++++++++++++++++++++ arrow-json/src/raw/tape.rs | 23 ++ arrow/Cargo.toml | 1 + arrow/src/lib.rs | 46 ++++ 6 files changed, 675 insertions(+) create mode 100644 arrow-json/src/raw/serializer.rs diff --git a/arrow-json/Cargo.toml b/arrow-json/Cargo.toml index 34bd447da183..453e4aa35182 100644 --- a/arrow-json/Cargo.toml +++ b/arrow-json/Cargo.toml @@ -42,6 +42,7 @@ arrow-schema = { workspace = true } half = { version = "2.1", default-features = false } indexmap = { version = "1.9", default-features = false, features = ["std"] } num = { version = "0.4", default-features = false, features = ["std"] } +serde = { version = "1.0", default-features = false } serde_json = { version = "1.0", default-features = false, features = ["std"] } chrono = { version = "0.4.23", default-features = false, features = ["clock"] } lexical-core = { version = "0.8", default-features = false } @@ -49,3 +50,4 @@ lexical-core = { version = "0.8", default-features = false } [dev-dependencies] tempfile = "3.3" flate2 = { version = "1", default-features = false, features = ["rust_backend"] } +serde = { version = "1.0", default-features = false, features = ["derive"] } diff --git a/arrow-json/src/raw/mod.rs b/arrow-json/src/raw/mod.rs index 1bae8ac529e7..f1f1ffb779d0 100644 --- a/arrow-json/src/raw/mod.rs +++ b/arrow-json/src/raw/mod.rs @@ -29,11 +29,13 @@ use crate::raw::struct_array::StructArrayDecoder; use crate::raw::tape::{Tape, TapeDecoder, TapeElement}; use crate::raw::timestamp_array::TimestampArrayDecoder; use arrow_array::timezone::Tz; +use arrow_array::types::Float32Type; use arrow_array::types::*; use arrow_array::{downcast_integer, make_array, RecordBatch, RecordBatchReader}; use arrow_data::ArrayData; use arrow_schema::{ArrowError, DataType, SchemaRef, TimeUnit}; use chrono::Utc; +use serde::Serialize; use std::io::BufRead; mod boolean_array; @@ -41,6 +43,7 @@ mod decimal_array; mod list_array; mod map_array; mod primitive_array; +mod serializer; mod string_array; mod struct_array; mod tape; @@ -233,6 +236,184 @@ impl RawDecoder { self.tape_decoder.decode(buf) } + /// Serialize `rows` to this [`RawDecoder`] + /// + /// This provides a simple way to convert [serde]-compatible datastructures into arrow + /// [`RecordBatch`]. + /// + /// Custom conversion logic as described in [arrow_array::builder] will likely outperform this, + /// especially where the schema is known at compile-time, however, this provides a mechanism + /// to get something up and running quickly + /// + /// It can be used with [`serde_json::Value`] + /// + /// ``` + /// # use std::sync::Arc; + /// # use serde_json::{Value, json}; + /// # use arrow_array::cast::AsArray; + /// # use arrow_array::types::Float32Type; + /// # use arrow_json::RawReaderBuilder; + /// # use arrow_schema::{DataType, Field, Schema}; + /// let json = vec![json!({"float": 2.3}), json!({"float": 5.7})]; + /// + /// let schema = Schema::new(vec![Field::new("float", DataType::Float32, true)]); + /// let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap(); + /// + /// decoder.serialize(&json).unwrap(); + /// let batch = decoder.flush().unwrap().unwrap(); + /// assert_eq!(batch.num_rows(), 2); + /// assert_eq!(batch.num_columns(), 1); + /// let values = batch.column(0).as_primitive::().values(); + /// assert_eq!(values, &[2.3, 5.7]) + /// ``` + /// + /// Or with arbitrary [`Serialize`] types + /// + /// ``` + /// # use std::sync::Arc; + /// # use arrow_json::RawReaderBuilder; + /// # use arrow_schema::{DataType, Field, Schema}; + /// # use serde::Serialize; + /// # use arrow_array::cast::AsArray; + /// # use arrow_array::types::{Float32Type, Int32Type}; + /// # + /// #[derive(Serialize)] + /// struct MyStruct { + /// int32: i32, + /// float: f32, + /// } + /// + /// let schema = Schema::new(vec![ + /// Field::new("int32", DataType::Int32, false), + /// Field::new("float", DataType::Float32, false), + /// ]); + /// + /// let rows = vec![ + /// MyStruct{ int32: 0, float: 3. }, + /// MyStruct{ int32: 4, float: 67.53 }, + /// ]; + /// + /// let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap(); + /// decoder.serialize(&rows).unwrap(); + /// + /// let batch = decoder.flush().unwrap().unwrap(); + /// + /// // Expect batch containing two columns + /// let int32 = batch.column(0).as_primitive::(); + /// assert_eq!(int32.values(), &[0, 4]); + /// + /// let float = batch.column(1).as_primitive::(); + /// assert_eq!(float.values(), &[3., 67.53]); + /// ``` + /// + /// Or even complex nested types + /// + /// ``` + /// # use std::collections::BTreeMap; + /// # use std::sync::Arc; + /// # use arrow_array::StructArray; + /// # use arrow_cast::display::{ArrayFormatter, FormatOptions}; + /// # use arrow_json::RawReaderBuilder; + /// # use arrow_schema::{DataType, Field, Fields, Schema}; + /// # use serde::Serialize; + /// # + /// #[derive(Serialize)] + /// struct MyStruct { + /// int32: i32, + /// list: Vec, + /// nested: Vec>, + /// } + /// + /// impl MyStruct { + /// /// Returns the [`Fields`] for [`MyStruct`] + /// fn fields() -> Fields { + /// let nested = DataType::Struct(Nested::fields()); + /// Fields::from([ + /// Arc::new(Field::new("int32", DataType::Int32, false)), + /// Arc::new(Field::new_list( + /// "list", + /// Field::new("element", DataType::Float64, false), + /// false, + /// )), + /// Arc::new(Field::new_list( + /// "nested", + /// Field::new("element", nested, true), + /// true, + /// )), + /// ]) + /// } + /// } + /// + /// #[derive(Serialize)] + /// struct Nested { + /// map: BTreeMap> + /// } + /// + /// impl Nested { + /// /// Returns the [`Fields`] for [`Nested`] + /// fn fields() -> Fields { + /// let element = Field::new("element", DataType::Utf8, false); + /// Fields::from([ + /// Arc::new(Field::new_map( + /// "map", + /// "entries", + /// Field::new("key", DataType::Utf8, false), + /// Field::new_list("value", element, false), + /// false, // sorted + /// false, // nullable + /// )) + /// ]) + /// } + /// } + /// + /// let data = vec![ + /// MyStruct { + /// int32: 34, + /// list: vec![1., 2., 34.], + /// nested: vec![ + /// None, + /// Some(Nested { + /// map: vec![ + /// ("key1".to_string(), vec!["foo".to_string(), "bar".to_string()]), + /// ("key2".to_string(), vec!["baz".to_string()]) + /// ].into_iter().collect() + /// }) + /// ] + /// }, + /// MyStruct { + /// int32: 56, + /// list: vec![], + /// nested: vec![] + /// }, + /// MyStruct { + /// int32: 24, + /// list: vec![-1., 245.], + /// nested: vec![None] + /// } + /// ]; + /// + /// let schema = Schema::new(MyStruct::fields()); + /// let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap(); + /// decoder.serialize(&data).unwrap(); + /// let batch = decoder.flush().unwrap().unwrap(); + /// assert_eq!(batch.num_rows(), 3); + /// assert_eq!(batch.num_columns(), 3); + /// + /// // Convert to StructArray to format + /// let s = StructArray::from(batch); + /// let options = FormatOptions::default().with_null("null"); + /// let formatter = ArrayFormatter::try_new(&s, &options).unwrap(); + /// + /// assert_eq!(&formatter.value(0).to_string(), "{int32: 34, list: [1.0, 2.0, 34.0], nested: [null, {map: {key1: [foo, bar], key2: [baz]}}]}"); + /// assert_eq!(&formatter.value(1).to_string(), "{int32: 56, list: [], nested: []}"); + /// assert_eq!(&formatter.value(2).to_string(), "{int32: 24, list: [-1.0, 245.0], nested: [null]}"); + /// ``` + /// + /// Note: this ignores any batch size setting, and always decodes all rows + pub fn serialize(&mut self, rows: &[S]) -> Result<(), ArrowError> { + self.tape_decoder.serialize(rows) + } + /// Flushes the currently buffered data to a [`RecordBatch`] /// /// Returns `Ok(None)` if no buffered data diff --git a/arrow-json/src/raw/serializer.rs b/arrow-json/src/raw/serializer.rs new file mode 100644 index 000000000000..d743b6dba126 --- /dev/null +++ b/arrow-json/src/raw/serializer.rs @@ -0,0 +1,422 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::raw::tape::TapeElement; +use lexical_core::FormattedSize; +use serde::ser::{ + Impossible, SerializeMap, SerializeSeq, SerializeStruct, SerializeTuple, + SerializeTupleStruct, +}; +use serde::{Serialize, Serializer}; + +#[derive(Debug)] +pub struct SerializerError(String); + +impl std::error::Error for SerializerError {} + +impl std::fmt::Display for SerializerError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl serde::ser::Error for SerializerError { + fn custom(msg: T) -> Self + where + T: std::fmt::Display, + { + Self(msg.to_string()) + } +} + +/// [`Serializer`] for [`TapeElement`] +/// +/// Heavily based on +pub struct TapeSerializer<'a> { + elements: &'a mut Vec, + + /// A buffer of parsed string data + bytes: &'a mut Vec, + + /// Offsets into `data` + offsets: &'a mut Vec, +} + +impl<'a> TapeSerializer<'a> { + pub fn new( + elements: &'a mut Vec, + bytes: &'a mut Vec, + offsets: &'a mut Vec, + ) -> Self { + Self { + elements, + bytes, + offsets, + } + } +} + +/// The tape stores all values as strings, and so must serialize numeric types +/// +/// Formatting to a string only to parse it back again is rather wasteful, +/// it may be possible to tweak the tape representation to avoid this +/// +/// Need to use macro as const generic expressions are unstable +/// +macro_rules! serialize_numeric { + ($s:ident, $t:ty, $v:ident) => {{ + let mut buffer = [0_u8; <$t>::FORMATTED_SIZE]; + let s = lexical_core::write($v, &mut buffer); + $s.serialize_bytes(s) + }}; +} + +impl<'a, 'b> Serializer for &'a mut TapeSerializer<'b> { + type Ok = (); + + type Error = SerializerError; + + type SerializeSeq = ListSerializer<'a, 'b>; + type SerializeTuple = ListSerializer<'a, 'b>; + type SerializeTupleStruct = ListSerializer<'a, 'b>; + type SerializeTupleVariant = Impossible<(), SerializerError>; + type SerializeMap = ObjectSerializer<'a, 'b>; + type SerializeStruct = ObjectSerializer<'a, 'b>; + type SerializeStructVariant = Impossible<(), SerializerError>; + + fn serialize_bool(self, v: bool) -> Result<(), SerializerError> { + self.elements.push(match v { + true => TapeElement::True, + false => TapeElement::False, + }); + Ok(()) + } + + fn serialize_i8(self, v: i8) -> Result<(), SerializerError> { + serialize_numeric!(self, i8, v) + } + + fn serialize_i16(self, v: i16) -> Result<(), SerializerError> { + serialize_numeric!(self, i16, v) + } + + fn serialize_i32(self, v: i32) -> Result<(), SerializerError> { + serialize_numeric!(self, i32, v) + } + + fn serialize_i64(self, v: i64) -> Result<(), SerializerError> { + serialize_numeric!(self, i64, v) + } + + fn serialize_u8(self, v: u8) -> Result<(), SerializerError> { + serialize_numeric!(self, u8, v) + } + + fn serialize_u16(self, v: u16) -> Result<(), SerializerError> { + serialize_numeric!(self, u16, v) + } + + fn serialize_u32(self, v: u32) -> Result<(), SerializerError> { + serialize_numeric!(self, u32, v) + } + + fn serialize_u64(self, v: u64) -> Result<(), SerializerError> { + serialize_numeric!(self, u64, v) + } + + fn serialize_f32(self, v: f32) -> Result<(), SerializerError> { + serialize_numeric!(self, f32, v) + } + + fn serialize_f64(self, v: f64) -> Result<(), SerializerError> { + serialize_numeric!(self, f64, v) + } + + fn serialize_char(self, v: char) -> Result<(), SerializerError> { + self.serialize_str(&v.to_string()) + } + + fn serialize_str(self, v: &str) -> Result<(), SerializerError> { + self.serialize_bytes(v.as_bytes()) + } + + fn serialize_bytes(self, v: &[u8]) -> Result<(), SerializerError> { + self.bytes.extend_from_slice(v); + let idx = self.offsets.len() - 1; + self.elements.push(TapeElement::String(idx as _)); + self.offsets.push(self.bytes.len()); + Ok(()) + } + + fn serialize_none(self) -> Result<(), SerializerError> { + self.serialize_unit() + } + + fn serialize_some(self, value: &T) -> Result<(), SerializerError> + where + T: ?Sized + Serialize, + { + value.serialize(self) + } + + fn serialize_unit(self) -> Result<(), SerializerError> { + self.elements.push(TapeElement::Null); + Ok(()) + } + + fn serialize_unit_struct(self, _name: &'static str) -> Result<(), SerializerError> { + self.serialize_unit() + } + + fn serialize_unit_variant( + self, + _name: &'static str, + _variant_index: u32, + variant: &'static str, + ) -> Result<(), SerializerError> { + self.serialize_str(variant) + } + + fn serialize_newtype_struct( + self, + _name: &'static str, + value: &T, + ) -> Result<(), SerializerError> + where + T: ?Sized + Serialize, + { + value.serialize(self) + } + + fn serialize_newtype_variant( + self, + _name: &'static str, + _variant_index: u32, + variant: &'static str, + value: &T, + ) -> Result<(), SerializerError> + where + T: ?Sized + Serialize, + { + let mut serializer = self.serialize_map(Some(1))?; + serializer.serialize_key(variant)?; + serializer.serialize_value(value)?; + serializer.finish(); + Ok(()) + } + + fn serialize_seq( + self, + _len: Option, + ) -> Result { + Ok(ListSerializer::new(self)) + } + + fn serialize_tuple( + self, + len: usize, + ) -> Result { + self.serialize_seq(Some(len)) + } + + fn serialize_tuple_struct( + self, + _name: &'static str, + len: usize, + ) -> Result { + self.serialize_seq(Some(len)) + } + + fn serialize_tuple_variant( + self, + name: &'static str, + _variant_index: u32, + variant: &'static str, + _len: usize, + ) -> Result { + Err(SerializerError(format!( + "serializing tuple variants is not currently supported: {name}::{variant}" + ))) + } + + // Maps are represented in JSON as `{ K: V, K: V, ... }`. + fn serialize_map( + self, + _len: Option, + ) -> Result { + Ok(ObjectSerializer::new(self)) + } + + fn serialize_struct( + self, + _name: &'static str, + len: usize, + ) -> Result { + self.serialize_map(Some(len)) + } + + fn serialize_struct_variant( + self, + name: &'static str, + _variant_index: u32, + variant: &'static str, + _len: usize, + ) -> Result { + Err(SerializerError(format!( + "serializing struct variants is not currently supported: {name}::{variant}" + ))) + } +} + +pub struct ObjectSerializer<'a, 'b> { + serializer: &'a mut TapeSerializer<'b>, + start: usize, +} + +impl<'a, 'b> ObjectSerializer<'a, 'b> { + fn new(serializer: &'a mut TapeSerializer<'b>) -> Self { + let start = serializer.elements.len(); + serializer.elements.push(TapeElement::StartObject(0)); + Self { serializer, start } + } + + fn finish(self) { + let end = self.serializer.elements.len() as _; + self.serializer.elements[self.start] = TapeElement::StartObject(end); + + let end = TapeElement::EndObject(self.start as _); + self.serializer.elements.push(end); + } +} + +impl<'a, 'b> SerializeMap for ObjectSerializer<'a, 'b> { + type Ok = (); + type Error = SerializerError; + + fn serialize_key(&mut self, key: &T) -> Result<(), Self::Error> + where + T: Serialize, + { + key.serialize(&mut *self.serializer) + } + + fn serialize_value(&mut self, value: &T) -> Result<(), Self::Error> + where + T: Serialize, + { + value.serialize(&mut *self.serializer) + } + + fn end(self) -> Result<(), Self::Error> { + self.finish(); + Ok(()) + } +} + +impl<'a, 'b> SerializeStruct for ObjectSerializer<'a, 'b> { + type Ok = (); + type Error = SerializerError; + + fn serialize_field( + &mut self, + key: &'static str, + value: &T, + ) -> Result<(), Self::Error> + where + T: Serialize, + { + key.serialize(&mut *self.serializer)?; + value.serialize(&mut *self.serializer) + } + + fn end(self) -> Result<(), Self::Error> { + self.finish(); + Ok(()) + } +} + +pub struct ListSerializer<'a, 'b> { + serializer: &'a mut TapeSerializer<'b>, + start: usize, +} + +impl<'a, 'b> ListSerializer<'a, 'b> { + fn new(serializer: &'a mut TapeSerializer<'b>) -> Self { + let start = serializer.elements.len(); + serializer.elements.push(TapeElement::StartList(0)); + Self { serializer, start } + } + + fn finish(self) { + let end = self.serializer.elements.len() as _; + self.serializer.elements[self.start] = TapeElement::StartList(end); + + let end = TapeElement::EndList(self.start as _); + self.serializer.elements.push(end); + } +} + +impl<'a, 'b> SerializeSeq for ListSerializer<'a, 'b> { + type Ok = (); + type Error = SerializerError; + + fn serialize_element(&mut self, value: &T) -> Result<(), Self::Error> + where + T: Serialize, + { + value.serialize(&mut *self.serializer) + } + + fn end(self) -> Result<(), Self::Error> { + self.finish(); + Ok(()) + } +} + +impl<'a, 'b> SerializeTuple for ListSerializer<'a, 'b> { + type Ok = (); + type Error = SerializerError; + + fn serialize_element(&mut self, value: &T) -> Result<(), Self::Error> + where + T: Serialize, + { + value.serialize(&mut *self.serializer) + } + + fn end(self) -> Result<(), Self::Error> { + self.finish(); + Ok(()) + } +} + +impl<'a, 'b> SerializeTupleStruct for ListSerializer<'a, 'b> { + type Ok = (); + type Error = SerializerError; + + fn serialize_field(&mut self, value: &T) -> Result<(), Self::Error> + where + T: Serialize, + { + value.serialize(&mut *self.serializer) + } + + fn end(self) -> Result<(), Self::Error> { + self.finish(); + Ok(()) + } +} diff --git a/arrow-json/src/raw/tape.rs b/arrow-json/src/raw/tape.rs index 3f4a317c8700..2720c2502585 100644 --- a/arrow-json/src/raw/tape.rs +++ b/arrow-json/src/raw/tape.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. +use crate::raw::serializer::TapeSerializer; use arrow_schema::ArrowError; +use serde::Serialize; use std::fmt::{Display, Formatter}; /// We decode JSON to a flattened tape representation, @@ -452,6 +454,27 @@ impl TapeDecoder { Ok(buf.len() - iter.len()) } + /// Writes any type that implements [`Serialize`] into this [`TapeDecoder`] + pub fn serialize(&mut self, rows: &[S]) -> Result<(), ArrowError> { + if let Some(b) = self.stack.last() { + return Err(ArrowError::JsonError(format!( + "Cannot serialize to tape containing partial decode state {}", + b.as_str() + ))); + } + + let mut serializer = + TapeSerializer::new(&mut self.elements, &mut self.bytes, &mut self.offsets); + + rows.iter() + .try_for_each(|row| row.serialize(&mut serializer)) + .map_err(|e| ArrowError::JsonError(e.to_string()))?; + + self.num_rows += rows.len(); + + Ok(()) + } + /// Finishes the current [`Tape`] pub fn finish(&self) -> Result, ArrowError> { if let Some(b) = self.stack.last() { diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 2c9bf64eccf1..58fe54fd1f29 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -99,6 +99,7 @@ criterion = { version = "0.4", default-features = false } half = { version = "2.1", default-features = false } rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] } tempfile = { version = "3", default-features = false } +serde = { version = "1.0", default-features = false, features = ["derive"] } [build-dependencies] diff --git a/arrow/src/lib.rs b/arrow/src/lib.rs index 40b09a976178..41b846b0475e 100644 --- a/arrow/src/lib.rs +++ b/arrow/src/lib.rs @@ -271,6 +271,52 @@ //! //! Parquet is published as a [separate crate](https://crates.io/crates/parquet) //! +//! # Serde Compatibility +//! +//! [`arrow_json::RawDecoder`] provides a mechanism to convert arbitrary, serde-compatible +//! structures into [`RecordBatch`]. +//! +//! Whilst likely less performant than implementing a custom builder, as described in +//! [arrow_array::builder], this provides a simple mechanism to get up and running quickly +//! +//! ``` +//! # use std::sync::Arc; +//! # use arrow_json::RawReaderBuilder; +//! # use arrow_schema::{DataType, Field, Schema}; +//! # use serde::Serialize; +//! # use arrow_array::cast::AsArray; +//! # use arrow_array::types::{Float32Type, Int32Type}; +//! # +//! #[derive(Serialize)] +//! struct MyStruct { +//! int32: i32, +//! string: String, +//! } +//! +//! let schema = Schema::new(vec![ +//! Field::new("int32", DataType::Int32, false), +//! Field::new("string", DataType::Utf8, false), +//! ]); +//! +//! let rows = vec![ +//! MyStruct{ int32: 5, string: "bar".to_string() }, +//! MyStruct{ int32: 8, string: "foo".to_string() }, +//! ]; +//! +//! let mut decoder = RawReaderBuilder::new(Arc::new(schema)).build_decoder().unwrap(); +//! decoder.serialize(&rows).unwrap(); +//! +//! let batch = decoder.flush().unwrap().unwrap(); +//! +//! // Expect batch containing two columns +//! let int32 = batch.column(0).as_primitive::(); +//! assert_eq!(int32.values(), &[5, 8]); +//! +//! let string = batch.column(1).as_string::(); +//! assert_eq!(string.value(0), "bar"); +//! assert_eq!(string.value(1), "foo"); +//! ``` +//! //! # Memory and Buffers //! //! Advanced users may wish to interact with the underlying buffers of an [`Array`], for example,