This repository has been archived by the owner on Aug 13, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathto_arrow.rs
170 lines (152 loc) · 5.73 KB
/
to_arrow.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
//! to_arrow module provices the convert functions from iceberg in-memory
//! schema to arrow schema.
use std::convert::TryFrom;
use std::sync::Arc;
use arrow_schema::ArrowError;
use arrow_schema::DataType as ArrowDataType;
use arrow_schema::Field as ArrowField;
use arrow_schema::Schema as ArrowSchema;
use arrow_schema::TimeUnit;
use super::in_memory as types;
impl TryFrom<types::Schema> for ArrowSchema {
type Error = ArrowError;
fn try_from(value: types::Schema) -> Result<Self, Self::Error> {
let fields = value
.fields
.into_iter()
.map(ArrowField::try_from)
.collect::<Result<Vec<ArrowField>, ArrowError>>()?;
Ok(ArrowSchema::new(fields))
}
}
impl TryFrom<types::Field> for ArrowField {
type Error = ArrowError;
fn try_from(value: types::Field) -> Result<Self, Self::Error> {
Ok(ArrowField::new_dict(
value.name,
value.field_type.try_into()?,
!value.required,
value.id as i64,
false,
))
}
}
impl TryFrom<types::Any> for ArrowDataType {
type Error = ArrowError;
fn try_from(value: types::Any) -> Result<Self, Self::Error> {
match value {
super::Any::Primitive(v) => v.try_into(),
super::Any::Struct(v) => {
let mut fields = vec![];
for f in v.fields {
fields.push(ArrowField::try_from(f)?);
}
Ok(ArrowDataType::Struct(fields.into()))
}
super::Any::List(v) => {
let field = ArrowField::new_dict(
"item",
(*v.element_type).try_into()?,
!v.element_required,
v.element_id as i64,
false,
);
Ok(ArrowDataType::List(Arc::new(field)))
}
super::Any::Map(v) => {
let field = ArrowField::new(
"entries",
ArrowDataType::Struct(
vec![
ArrowField::new_dict(
"key",
(*v.key_type).try_into()?,
false,
v.key_id as i64,
false,
),
ArrowField::new_dict(
"value",
(*v.value_type).try_into()?,
!v.value_required,
v.value_id as i64,
false,
),
]
.into(),
),
v.value_required,
);
Ok(ArrowDataType::Map(Arc::new(field), false))
}
}
}
}
impl TryFrom<types::Primitive> for ArrowDataType {
type Error = ArrowError;
fn try_from(value: types::Primitive) -> Result<Self, Self::Error> {
match value {
types::Primitive::Boolean => Ok(ArrowDataType::Boolean),
types::Primitive::Int => Ok(ArrowDataType::Int32),
types::Primitive::Long => Ok(ArrowDataType::Int64),
types::Primitive::Float => Ok(ArrowDataType::Float32),
types::Primitive::Double => Ok(ArrowDataType::Float64),
types::Primitive::Decimal { precision, scale } => {
Ok(ArrowDataType::Decimal128(precision, scale as i8))
}
types::Primitive::Date => Ok(ArrowDataType::Date32),
types::Primitive::Time => Ok(ArrowDataType::Time32(TimeUnit::Microsecond)),
types::Primitive::Timestamp => {
Ok(ArrowDataType::Timestamp(TimeUnit::Microsecond, None))
}
types::Primitive::Timestampz => {
// Timestampz always stored as UTC
Ok(ArrowDataType::Timestamp(TimeUnit::Microsecond, None))
}
types::Primitive::String => Ok(ArrowDataType::Utf8),
types::Primitive::Uuid => Ok(ArrowDataType::FixedSizeBinary(16)),
types::Primitive::Fixed(i) => {
if i <= i32::MAX as u64 {
// FixedSizeBinary only supports up to i32::MAX bytes
Ok(ArrowDataType::FixedSizeBinary(i as i32))
} else {
Ok(ArrowDataType::LargeBinary)
}
}
types::Primitive::Binary => Ok(ArrowDataType::LargeBinary),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_try_into_arrow_schema() {
let schema = types::Schema {
fields: vec![
types::Field {
name: "id".to_string(),
field_type: types::Any::Primitive(types::Primitive::Long),
id: 0,
required: true,
comment: None,
},
types::Field {
name: "data".to_string(),
field_type: types::Any::Primitive(types::Primitive::String),
id: 1,
required: false,
comment: None,
},
],
schema_id: 0,
identifier_field_ids: None,
};
let arrow_schema = ArrowSchema::try_from(schema).unwrap();
assert_eq!(arrow_schema.fields().len(), 2);
assert_eq!(arrow_schema.fields()[0].name(), "id");
assert_eq!(arrow_schema.fields()[0].data_type(), &ArrowDataType::Int64);
assert_eq!(arrow_schema.fields()[1].name(), "data");
assert_eq!(arrow_schema.fields()[1].data_type(), &ArrowDataType::Utf8);
}
}