Skip to content
This repository was archived by the owner on Feb 18, 2024. It is now read-only.

Commit bb8c815

Browse files
Added support to read Avro.
1 parent 06892e9 commit bb8c815

File tree

10 files changed

+715
-0
lines changed

10 files changed

+715
-0
lines changed

Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ ahash = { version = "0.7", optional = true }
6060

6161
parquet2 = { version = "0.4", optional = true, default_features = false, features = ["stream"] }
6262

63+
avro-rs = { version = "0.13", optional = true, default_features = false }
64+
6365
# for division/remainder optimization at runtime
6466
strength_reduce = { version = "0.2", optional = true }
6567
multiversion = { version = "0.6.1", optional = true }
@@ -86,6 +88,7 @@ full = [
8688
"io_print",
8789
"io_parquet",
8890
"io_parquet_compression",
91+
"io_avro",
8992
"regex",
9093
"merge_sort",
9194
"ahash",
@@ -105,6 +108,7 @@ io_parquet_compression = [
105108
"parquet2/lz4",
106109
"parquet2/brotli",
107110
]
111+
io_avro = ["avro-rs", "streaming-iterator", "serde_json"]
108112
# io_json: its dependencies + error handling
109113
# serde_derive: there is some derive around
110114
io_json_integration = ["io_json", "serde_derive", "hex"]

src/io/avro/mod.rs

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
//! Read and write from and to Apache Avro
2+
3+
pub mod read;
4+
5+
use crate::error::ArrowError;
6+
7+
impl From<avro_rs::SerError> for ArrowError {
8+
fn from(error: avro_rs::SerError) -> Self {
9+
ArrowError::External("".to_string(), Box::new(error))
10+
}
11+
}

src/io/avro/read/deserialize.rs

+130
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
use std::convert::TryInto;
2+
use std::sync::Arc;
3+
4+
use crate::array::*;
5+
use crate::datatypes::*;
6+
use crate::error::ArrowError;
7+
use crate::error::Result;
8+
use crate::record_batch::RecordBatch;
9+
10+
use super::util;
11+
12+
pub fn deserialize(mut block: &[u8], rows: usize, schema: Arc<Schema>) -> Result<RecordBatch> {
13+
// create mutables, one per field
14+
let mut arrays: Vec<Box<dyn MutableArray>> = schema
15+
.fields()
16+
.iter()
17+
.map(|field| match field.data_type().to_physical_type() {
18+
PhysicalType::Boolean => {
19+
Ok(Box::new(MutableBooleanArray::with_capacity(rows)) as Box<dyn MutableArray>)
20+
}
21+
PhysicalType::Primitive(primitive) => with_match_primitive_type!(primitive, |$T| {
22+
Ok(Box::new(MutablePrimitiveArray::<$T>::with_capacity(rows)) as Box<dyn MutableArray>)
23+
}),
24+
PhysicalType::Utf8 => {
25+
Ok(Box::new(MutableUtf8Array::<i32>::with_capacity(rows)) as Box<dyn MutableArray>)
26+
}
27+
PhysicalType::Binary => {
28+
Ok(Box::new(MutableBinaryArray::<i32>::with_capacity(rows))
29+
as Box<dyn MutableArray>)
30+
}
31+
other => {
32+
return Err(ArrowError::NotYetImplemented(format!(
33+
"Deserializing type {:?} is still not implemented",
34+
other
35+
)))
36+
}
37+
})
38+
.collect::<Result<_>>()?;
39+
40+
// this is _the_ expensive transpose (rows -> columns)
41+
for row in (0..rows) {
42+
for array in &mut arrays {
43+
match array.data_type().to_physical_type() {
44+
PhysicalType::Boolean => {
45+
let is_valid = block[0] == 1;
46+
block = &block[1..];
47+
let array = array
48+
.as_mut_any()
49+
.downcast_mut::<MutableBooleanArray>()
50+
.unwrap();
51+
array.push(Some(is_valid))
52+
}
53+
PhysicalType::Primitive(primitive) => {
54+
use crate::datatypes::PrimitiveType::*;
55+
match primitive {
56+
Int32 => {
57+
let value = util::zigzag_i64(&mut block)? as i32;
58+
let array = array
59+
.as_mut_any()
60+
.downcast_mut::<MutablePrimitiveArray<i32>>()
61+
.unwrap();
62+
array.push(Some(value))
63+
}
64+
Int64 => {
65+
let value = util::zigzag_i64(&mut block)? as i64;
66+
let array = array
67+
.as_mut_any()
68+
.downcast_mut::<MutablePrimitiveArray<i64>>()
69+
.unwrap();
70+
array.push(Some(value))
71+
}
72+
Float32 => {
73+
let value = f32::from_le_bytes(block[..4].try_into().unwrap());
74+
block = &block[4..];
75+
let array = array
76+
.as_mut_any()
77+
.downcast_mut::<MutablePrimitiveArray<f32>>()
78+
.unwrap();
79+
array.push(Some(value))
80+
}
81+
Float64 => {
82+
let value = f64::from_le_bytes(block[..8].try_into().unwrap());
83+
block = &block[8..];
84+
let array = array
85+
.as_mut_any()
86+
.downcast_mut::<MutablePrimitiveArray<f64>>()
87+
.unwrap();
88+
array.push(Some(value))
89+
}
90+
_ => unreachable!(),
91+
}
92+
}
93+
PhysicalType::Utf8 => {
94+
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|e| {
95+
ArrowError::ExternalFormat(
96+
"Avro format contains a non-usize number of bytes".to_string(),
97+
)
98+
})?;
99+
let data = std::str::from_utf8(&block[..len])?;
100+
block = &block[len..];
101+
102+
let array = array
103+
.as_mut_any()
104+
.downcast_mut::<MutableUtf8Array<i32>>()
105+
.unwrap();
106+
array.push(Some(data))
107+
}
108+
PhysicalType::Binary => {
109+
let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|e| {
110+
ArrowError::ExternalFormat(
111+
"Avro format contains a non-usize number of bytes".to_string(),
112+
)
113+
})?;
114+
let data = &block[..len];
115+
block = &block[len..];
116+
117+
let array = array
118+
.as_mut_any()
119+
.downcast_mut::<MutableBinaryArray<i32>>()
120+
.unwrap();
121+
array.push(Some(data))
122+
}
123+
_ => todo!(),
124+
};
125+
}
126+
}
127+
let columns = arrays.iter_mut().map(|array| array.as_arc()).collect();
128+
129+
RecordBatch::try_new(schema, columns)
130+
}

src/io/avro/read/mod.rs

+173
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
use std::io::Read;
2+
use std::sync::Arc;
3+
4+
use avro_rs::Codec;
5+
use avro_rs::Reader as AvroReader;
6+
use streaming_iterator::StreamingIterator;
7+
8+
mod deserialize;
9+
mod schema;
10+
mod util;
11+
12+
use crate::datatypes::Schema;
13+
use crate::error::{ArrowError, Result};
14+
use crate::record_batch::RecordBatch;
15+
16+
pub fn read_metadata<R: std::io::Read>(reader: &mut R) -> Result<(Schema, Codec, [u8; 16])> {
17+
let (schema, codec, marker) = util::read_schema(reader)?;
18+
Ok((schema::convert_schema(&schema)?, codec, marker))
19+
}
20+
21+
fn read_size<R: Read>(reader: &mut R) -> Result<(usize, usize)> {
22+
let rows = match util::zigzag_i64(reader) {
23+
Ok(a) => a,
24+
Err(ArrowError::Io(io_err)) => {
25+
if let std::io::ErrorKind::UnexpectedEof = io_err.kind() {
26+
// end
27+
return Ok((0, 0));
28+
} else {
29+
return Err(ArrowError::Io(io_err));
30+
}
31+
}
32+
Err(other) => return Err(other),
33+
};
34+
let bytes = util::zigzag_i64(reader)?;
35+
Ok((rows as usize, bytes as usize))
36+
}
37+
38+
/// Reads a block from the file into `buf`.
39+
/// # Panic
40+
/// Panics iff the block marker does not equal to the file's marker
41+
fn read_block<R: Read>(reader: &mut R, buf: &mut Vec<u8>, file_marker: [u8; 16]) -> Result<usize> {
42+
let (rows, bytes) = read_size(reader)?;
43+
if rows == 0 {
44+
return Ok(0);
45+
};
46+
47+
buf.resize(bytes, 0);
48+
reader.read_exact(buf)?;
49+
50+
let mut marker = [0u8; 16];
51+
reader.read_exact(&mut marker)?;
52+
53+
if marker != file_marker {
54+
panic!();
55+
}
56+
Ok(rows)
57+
}
58+
59+
fn decompress_block(buf: &mut Vec<u8>, decompress: &mut Vec<u8>, codec: Codec) -> Result<bool> {
60+
match codec {
61+
Codec::Null => {
62+
std::mem::swap(buf, decompress);
63+
Ok(false)
64+
}
65+
Codec::Deflate => {
66+
todo!()
67+
}
68+
}
69+
}
70+
71+
/// [`StreamingIterator`] of blocks of avro data
72+
pub struct BlockStreamIterator<'a, R: Read> {
73+
buf: (Vec<u8>, usize),
74+
reader: &'a mut R,
75+
file_marker: [u8; 16],
76+
}
77+
78+
impl<'a, R: Read> BlockStreamIterator<'a, R> {
79+
pub fn new(reader: &'a mut R, file_marker: [u8; 16]) -> Self {
80+
Self {
81+
reader,
82+
file_marker,
83+
buf: (vec![], 0),
84+
}
85+
}
86+
87+
pub fn buffer(&mut self) -> &mut Vec<u8> {
88+
&mut self.buf.0
89+
}
90+
}
91+
92+
impl<'a, R: Read> StreamingIterator for BlockStreamIterator<'a, R> {
93+
type Item = (Vec<u8>, usize);
94+
95+
fn advance(&mut self) {
96+
let (buf, rows) = &mut self.buf;
97+
// todo: surface this error
98+
*rows = read_block(self.reader, buf, self.file_marker).unwrap();
99+
}
100+
101+
fn get(&self) -> Option<&Self::Item> {
102+
if self.buf.1 > 0 {
103+
Some(&self.buf)
104+
} else {
105+
None
106+
}
107+
}
108+
}
109+
110+
/// [`StreamingIterator`] of blocks of decompressed avro data
111+
pub struct Decompressor<'a, R: Read> {
112+
blocks: BlockStreamIterator<'a, R>,
113+
codec: Codec,
114+
buf: (Vec<u8>, usize),
115+
was_swapped: bool,
116+
}
117+
118+
impl<'a, R: Read> Decompressor<'a, R> {
119+
pub fn new(blocks: BlockStreamIterator<'a, R>, codec: Codec) -> Self {
120+
Self {
121+
blocks,
122+
codec,
123+
buf: (vec![], 0),
124+
was_swapped: false,
125+
}
126+
}
127+
}
128+
129+
impl<'a, R: Read> StreamingIterator for Decompressor<'a, R> {
130+
type Item = (Vec<u8>, usize);
131+
132+
fn advance(&mut self) {
133+
if self.was_swapped {
134+
std::mem::swap(self.blocks.buffer(), &mut self.buf.0);
135+
}
136+
self.blocks.advance();
137+
self.was_swapped =
138+
decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec).unwrap();
139+
self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default();
140+
}
141+
142+
fn get(&self) -> Option<&Self::Item> {
143+
if self.buf.1 > 0 {
144+
Some(&self.buf)
145+
} else {
146+
None
147+
}
148+
}
149+
}
150+
151+
/// Single threaded, blocking reader of Avro files; [`Iterator`] of [`RecordBatch`]es.
152+
pub struct Reader<'a, R: Read> {
153+
iter: Decompressor<'a, R>,
154+
schema: Arc<Schema>,
155+
}
156+
157+
impl<'a, R: Read> Reader<'a, R> {
158+
pub fn new(iter: Decompressor<'a, R>, schema: Arc<Schema>) -> Self {
159+
Self { iter, schema }
160+
}
161+
}
162+
163+
impl<'a, R: Read> Iterator for Reader<'a, R> {
164+
type Item = Result<RecordBatch>;
165+
166+
fn next(&mut self) -> Option<Self::Item> {
167+
if let Some((data, rows)) = self.iter.next() {
168+
Some(deserialize::deserialize(data, *rows, self.schema.clone()))
169+
} else {
170+
None
171+
}
172+
}
173+
}

0 commit comments

Comments
 (0)