Skip to content

Commit

Permalink
Update to DataFusion 32 (#393)
Browse files Browse the repository at this point in the history
* update to DataFusion 32.0.0 and arrow 47
  • Loading branch information
jonmmease authored Oct 13, 2023
1 parent d94fee4 commit c319a3f
Show file tree
Hide file tree
Showing 42 changed files with 374 additions and 272 deletions.
383 changes: 210 additions & 173 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,23 @@ members = [
"vegafusion-jni",
]

[workspace.dependencies]
arrow = { version = "47.0.0", default_features = false }
sqlparser = { version = "0.37.0" }
chrono = { version = "0.4.31", default_features = false }
reqwest = { version = "=0.11.13", default-features = false }
tokio = { version = "1.32.0" }
pyo3 = { version = "0.19" }
prost = { version = "0.12.1" }
prost-types = { version = "0.12.1" }

datafusion = { version = "32.0.0" }
datafusion-common = { version = "32.0.0", default_features = false}
datafusion-expr = { version = "32.0.0" }
datafusion-proto = { version = "32.0.0" }
datafusion-physical-expr = { version = "32.0.0" }
datafusion-optimizer = { version = "32.0.0" }

[profile.release]
## Tell `rustc` to use highest performance optimization and perform Link Time Optimization
opt-level = 3
Expand Down
16 changes: 8 additions & 8 deletions vegafusion-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ prettyprint = [ "arrow/prettyprint",]
thiserror = "^1.0.29"

[dependencies.chrono]
version = "0.4.23"
workspace = true
optional = true

[dependencies.sqlparser]
version = "0.35.0"
workspace = true
optional = true

[dependencies.serde_json]
Expand All @@ -27,22 +27,22 @@ default_features = false
optional = true

[dependencies.arrow]
version = "43.0.0"
default_features = false
workspace = true
features = [ "ipc",]

[dependencies.datafusion-common]
version = "28.0.0"
workspace = true
default_features = false

[dependencies.datafusion-expr]
version = "28.0.0"
workspace = true

[dependencies.datafusion-proto]
version = "28.0.0"
workspace = true
optional = true

[dependencies.pyo3]
version = "0.19"
workspace = true
optional = true

[dependencies.jni]
Expand Down
3 changes: 2 additions & 1 deletion vegafusion-common/src/data/json_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,8 @@ mod tests {
let ts_nanos = ts_string
.parse::<chrono::NaiveDateTime>()
.unwrap()
.timestamp_nanos();
.timestamp_nanos_opt()
.unwrap();
let ts_micros = ts_nanos / 1000;
let ts_millis = ts_micros / 1000;
let ts_secs = ts_millis / 1000;
Expand Down
2 changes: 1 addition & 1 deletion vegafusion-common/src/data/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl ScalarValueHelpers for ScalarValue {
.iter()
.map(ScalarValue::from_json)
.collect::<Result<Vec<ScalarValue>>>()?;
let dtype = elements[0].get_datatype();
let dtype = elements[0].data_type();
(elements, dtype)
};

Expand Down
2 changes: 1 addition & 1 deletion vegafusion-common/src/data/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl VegaFusionTable {
}
}

let dtype = elements[0].get_datatype();
let dtype = elements[0].data_type();
Ok(ScalarValue::List(
Some(elements),
Arc::new(Field::new("item", dtype, true)),
Expand Down
17 changes: 9 additions & 8 deletions vegafusion-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ pyarrow = [ "pyo3", "datafusion-common/pyarrow", "vegafusion-common/pyarrow",]
[dependencies]
thiserror = "^1.0.29"
bytes = "1.1.0"
prost = "0.11.3"
prost-types = "0.11.2"
prost = { workspace = true }
prost-types = { workspace = true }
itertools = "0.10.3"
lazy_static = "^1.4.0"
regex = "^1.5.5"
Expand All @@ -26,7 +26,7 @@ rand = "0.8.5"
json-patch = "1.0.0"

[dependencies.sqlparser]
version = "0.35.0"
workspace = true
optional = true

[dependencies.serde_json]
Expand All @@ -39,25 +39,26 @@ features = [ "json", "sqlparser",]
version = "1.4.1"

[dependencies.datafusion-common]
version = "28.0.0"
workspace = true
default_features = false

[dependencies.pyo3]
version = "0.19"
workspace = true
optional = true

[dependencies.serde]
version = "1.0.137"
features = [ "derive",]

[dependencies.tonic]
version = "0.9.1"
version = "0.10.2"
optional = true

[build-dependencies.prost-build]
version = "0.11.4"
version = "0.12.1"

[build-dependencies.tonic-build]
version = "0.9.1"
version = "0.10.2"
optional = true

[build-dependencies.protobuf-src]
Expand Down
2 changes: 1 addition & 1 deletion vegafusion-core/src/expression/ast/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl BinaryExpression {
}

pub fn to_operator(&self) -> BinaryOperator {
BinaryOperator::from_i32(self.operator).unwrap()
BinaryOperator::try_from(self.operator).unwrap()
}

pub fn infix_binding_power(&self) -> (f64, f64) {
Expand Down
2 changes: 1 addition & 1 deletion vegafusion-core/src/expression/ast/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl LogicalExpression {
}

pub fn to_operator(&self) -> LogicalOperator {
LogicalOperator::from_i32(self.operator).unwrap()
LogicalOperator::try_from(self.operator).unwrap()
}

pub fn infix_binding_power(&self) -> (f64, f64) {
Expand Down
2 changes: 1 addition & 1 deletion vegafusion-core/src/expression/ast/unary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl UnaryExpression {
}

pub fn to_operator(&self) -> UnaryOperator {
UnaryOperator::from_i32(self.operator).unwrap()
UnaryOperator::try_from(self.operator).unwrap()
}

pub fn new(op: &UnaryOperator, arg: Expression) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion vegafusion-core/src/proto/prost_gen/transforms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub struct Bin {
/// A minimum distance between adjacent bins
#[prost(double, tag = "12")]
pub minstep: f64,
/// Scale factors indicating the allowed subdivisions. The defualt value is vec![5.0, 2.0],
/// Scale factors indicating the allowed subdivisions. The defualt value is vec!\[5.0, 2.0\],
/// which indicates that for base 10 numbers, the method may consider dividing bin sizes by 5 and/or 2.
#[prost(double, repeated, tag = "13")]
pub divide: ::prost::alloc::vec::Vec<f64>,
Expand Down
21 changes: 17 additions & 4 deletions vegafusion-core/src/proto/tonic_gen/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,8 @@ pub mod vega_fusion_runtime_server {
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).task_graph_query(request).await
<T as VegaFusionRuntime>::task_graph_query(&inner, request)
.await
};
Box::pin(fut)
}
Expand Down Expand Up @@ -460,7 +461,11 @@ pub mod vega_fusion_runtime_server {
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).pre_transform_spec(request).await
<T as VegaFusionRuntime>::pre_transform_spec(
&inner,
request,
)
.await
};
Box::pin(fut)
}
Expand Down Expand Up @@ -509,7 +514,11 @@ pub mod vega_fusion_runtime_server {
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).pre_transform_values(request).await
<T as VegaFusionRuntime>::pre_transform_values(
&inner,
request,
)
.await
};
Box::pin(fut)
}
Expand Down Expand Up @@ -558,7 +567,11 @@ pub mod vega_fusion_runtime_server {
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).pre_transform_extract(request).await
<T as VegaFusionRuntime>::pre_transform_extract(
&inner,
request,
)
.await
};
Box::pin(fut)
}
Expand Down
2 changes: 1 addition & 1 deletion vegafusion-core/src/proto/tonic_gen/transforms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub struct Bin {
/// A minimum distance between adjacent bins
#[prost(double, tag = "12")]
pub minstep: f64,
/// Scale factors indicating the allowed subdivisions. The defualt value is vec![5.0, 2.0],
/// Scale factors indicating the allowed subdivisions. The defualt value is vec!\[5.0, 2.0\],
/// which indicates that for base 10 numbers, the method may consider dividing bin sizes by 5 and/or 2.
#[prost(double, repeated, tag = "13")]
pub divide: ::prost::alloc::vec::Vec<f64>,
Expand Down
2 changes: 1 addition & 1 deletion vegafusion-core/src/variable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl Variable {
}

pub fn ns(&self) -> VariableNamespace {
VariableNamespace::from_i32(self.namespace).unwrap()
VariableNamespace::try_from(self.namespace).unwrap()
}
}

Expand Down
6 changes: 3 additions & 3 deletions vegafusion-dataframe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ path = "../vegafusion-common"
version = "1.4.1"

[dependencies.datafusion-common]
version = "28.0.0"
workspace = true

[dependencies.datafusion-expr]
version = "28.0.0"
workspace = true

[dependencies.arrow]
version = "43.0.0"
workspace = true
default_features = false
4 changes: 2 additions & 2 deletions vegafusion-datafusion-udfs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ description = "Custom DataFusion UDFs used by VegaFusion"
[dependencies]
ordered-float = "3.6.0"
lazy_static = "^1.4.0"
chrono = "0.4.23"
chrono = { workspace = true }
chrono-tz = "0.8.1"
regex = "^1.5.5"

Expand All @@ -17,4 +17,4 @@ path = "../vegafusion-common"
version = "1.4.1"

[dependencies.datafusion-physical-expr]
version = "28.0.0"
workspace = true
4 changes: 2 additions & 2 deletions vegafusion-datafusion-udfs/src/udafs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ lazy_static! {
pub static ref Q1_UDF: AggregateUDF = create_udaf(
"q1",
// input type
DataType::Float64,
vec![DataType::Float64],
// the return type
Arc::new(DataType::Float64),
Volatility::Immutable,
Expand All @@ -161,7 +161,7 @@ lazy_static! {
pub static ref Q3_UDF: AggregateUDF = create_udaf(
"q3",
// input type
DataType::Float64,
vec![DataType::Float64],
// the return type
Arc::new(DataType::Float64),
Volatility::Immutable,
Expand Down
2 changes: 1 addition & 1 deletion vegafusion-datafusion-udfs/src/udfs/array/indexof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn make_indexof_udf() -> ScalarUDF {
let arg = &args[1];
Ok(match arg {
ColumnarValue::Scalar(value) => {
let value_dtype = value.get_datatype();
let value_dtype = value.data_type();
if is_numeric_datatype(&value_dtype) && is_numeric_datatype(&array_dtype) {
let indices = build_notnan_index_map(array.as_slice());
if let Ok(value) = value.to_f64() {
Expand Down
5 changes: 2 additions & 3 deletions vegafusion-datafusion-udfs/src/udfs/math/isfinite.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use datafusion_physical_expr::functions::make_scalar_function;
use std::sync::Arc;
use vegafusion_common::arrow::array::{ArrayRef, BooleanArray, Float32Array, Float64Array};
use vegafusion_common::arrow::compute::no_simd_compare_op_scalar;
use vegafusion_common::arrow::datatypes::DataType;
use vegafusion_common::datafusion_expr::{ReturnTypeFunction, ScalarUDF, Signature, Volatility};

Expand All @@ -18,11 +17,11 @@ fn make_is_finite_udf() -> ScalarUDF {
let is_finite_array = match arg.data_type() {
DataType::Float32 => {
let array = arg.as_any().downcast_ref::<Float32Array>().unwrap();
no_simd_compare_op_scalar(array, f32::NAN, |a, _| a.is_finite()).unwrap()
BooleanArray::from_unary(array, |a| a.is_finite())
}
DataType::Float64 => {
let array = arg.as_any().downcast_ref::<Float64Array>().unwrap();
no_simd_compare_op_scalar(array, f64::NAN, |a, _| a.is_finite()).unwrap()
BooleanArray::from_unary(array, |a| a.is_finite())
}
_ => {
// No other type can be non-finite
Expand Down
5 changes: 2 additions & 3 deletions vegafusion-datafusion-udfs/src/udfs/math/isnan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::sync::Arc;
use vegafusion_common::{
arrow::{
array::{ArrayRef, BooleanArray, Float32Array, Float64Array},
compute::no_simd_compare_op_scalar,
datatypes::DataType,
},
datafusion_expr::{ReturnTypeFunction, ScalarUDF, Signature, Volatility},
Expand All @@ -22,11 +21,11 @@ fn make_is_nan_udf() -> ScalarUDF {
let is_nan_array = match arg.data_type() {
DataType::Float32 => {
let array = arg.as_any().downcast_ref::<Float32Array>().unwrap();
no_simd_compare_op_scalar(array, f32::NAN, |a, _| a.is_nan()).unwrap()
BooleanArray::from_unary(array, |a| a.is_nan())
}
DataType::Float64 => {
let array = arg.as_any().downcast_ref::<Float64Array>().unwrap();
no_simd_compare_op_scalar(array, f64::NAN, |a, _| a.is_nan()).unwrap()
BooleanArray::from_unary(array, |a| a.is_nan())
}
_ => {
// No other type can be NaN
Expand Down
2 changes: 1 addition & 1 deletion vegafusion-jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ version = "1.4.1"
features = [ "datafusion-conn",]

[dependencies.tokio]
version = "1.32.0"
workspace = true
features = [ "macros", "rt-multi-thread",]
10 changes: 5 additions & 5 deletions vegafusion-python-embed/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ env_logger = "0.10.0"
pythonize = "0.19.0"
async-trait = "0.1.73"
uuid = "1.3.0"
prost = "0.11.3"
prost = { workspace = true }

[dependencies.deterministic-hash]
version = "1.0.1"
Expand All @@ -31,7 +31,7 @@ features = [ "derive",]
version = "1.0.79"

[dependencies.arrow]
version = "43.0.0"
workspace = true
features = [ "pyarrow",]

[dependencies.vegafusion-common]
Expand Down Expand Up @@ -59,12 +59,12 @@ path = "../vegafusion-dataframe"
version = "1.4.1"

[dependencies.datafusion-proto]
version = "28.0.0"
workspace = true

[dependencies.tokio]
version = "1.32.0"
workspace = true
features = [ "macros", "rt-multi-thread",]

[dependencies.pyo3]
version = "0.19"
workspace = true
features = [ "extension-module", "abi3-py38",]
Loading

0 comments on commit c319a3f

Please sign in to comment.