Skip to content

Commit

Permalink
chore: update datafusion
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelScofield committed Mar 27, 2024
1 parent d832795 commit 3d0ad5d
Show file tree
Hide file tree
Showing 104 changed files with 1,070 additions and 919 deletions.
33 changes: 17 additions & 16 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ rust.unknown_lints = "deny"
[workspace.dependencies]
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.3"
arrow = { version = "47.0" }
arrow-array = "47.0"
arrow-flight = "47.0"
arrow-ipc = { version = "47.0", features = ["lz4"] }
arrow-schema = { version = "47.0", features = ["serde"] }
arrow = { version = "50.0" }
arrow-array = "50.0"
arrow-flight = "50.0"
arrow-ipc = { version = "50.0", features = ["lz4"] }
arrow-schema = { version = "50.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
axum = { version = "0.6", features = ["headers"] }
Expand All @@ -91,13 +91,14 @@ bytes = { version = "1.5", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.4", features = ["derive"] }
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b0b329ba39403b9e87156d6f9b8c5464dc6d2480" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b0b329ba39403b9e87156d6f9b8c5464dc6d2480" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b0b329ba39403b9e87156d6f9b8c5464dc6d2480" }
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b0b329ba39403b9e87156d6f9b8c5464dc6d2480" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b0b329ba39403b9e87156d6f9b8c5464dc6d2480" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b0b329ba39403b9e87156d6f9b8c5464dc6d2480" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b0b329ba39403b9e87156d6f9b8c5464dc6d2480" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "b0b329ba39403b9e87156d6f9b8c5464dc6d2480" }
derive_builder = "0.12"
etcd-client = "0.12"
fst = "0.4.7"
Expand All @@ -118,7 +119,7 @@ opentelemetry-proto = { git = "https://github.com/waynexia/opentelemetry-rust.gi
"metrics",
"trace",
] }
parquet = "47.0"
parquet = "50.0"
paste = "1.0"
pin-project = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
Expand All @@ -140,13 +141,13 @@ serde_with = "3"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.7"
sysinfo = "0.30"
# on branch v0.38.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "6a93567ae38d42be5c8d08b13c8ff4dde26502ef", features = [
# on branch v0.44.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "c919990bf62ad38d2b0c0a3bc90b26ad919d51b0", features = [
"visitor",
] }
strum = { version = "0.25", features = ["derive"] }
tempfile = "3"
tokio = { version = "1.28", features = ["full"] }
tokio = { version = "1.36", features = ["full"] }
tokio-stream = { version = "0.1" }
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
Expand Down
3 changes: 2 additions & 1 deletion src/common/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ derive_builder.workspace = true
futures.workspace = true
lazy_static.workspace = true
object-store.workspace = true
orc-rust = "0.2"
# TODO(LFC): "orc-rs" has been archived, use "datafusion-orc" instead.
orc-rust = { git = "https://github.com/MichaelScofield/orc-rs.git", rev = "b6520cd68931f77b2941a7fee4643ab0339eaccc" }
parquet.workspace = true
paste = "1.0"
regex = "1.7"
Expand Down
2 changes: 1 addition & 1 deletion src/common/datasource/src/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl CsvConfig {
let mut builder = csv::ReaderBuilder::new(self.file_schema.clone())
.with_delimiter(self.delimiter)
.with_batch_size(self.batch_size)
.has_header(self.has_header);
.with_header(self.has_header);

if let Some(proj) = &self.file_projection {
builder = builder.with_projection(proj.clone());
Expand Down
3 changes: 2 additions & 1 deletion src/common/datasource/src/file_format/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::vec;

use common_test_util::find_workspace_path;
use datafusion::assert_batches_eq;
use datafusion::config::TableParquetOptions;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream, ParquetExec};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
Expand Down Expand Up @@ -166,7 +167,7 @@ async fn test_parquet_exec() {
.to_string();
let base_config = scan_config(schema.clone(), None, path);

let exec = ParquetExec::new(base_config, None, None)
let exec = ParquetExec::new(base_config, None, None, TableParquetOptions::default())
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(store)));

let ctx = SessionContext::new();
Expand Down
6 changes: 3 additions & 3 deletions src/common/datasource/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema, SchemaRef};
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datafusion::common::Statistics;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileScanConfig, FileStream};
Expand Down Expand Up @@ -72,17 +73,16 @@ pub fn test_basic_schema() -> SchemaRef {
pub fn scan_config(file_schema: SchemaRef, limit: Option<usize>, filename: &str) -> FileScanConfig {
// object_store only recognize the Unix style path, so make it happy.
let filename = &filename.replace('\\', "/");

let statistics = Statistics::new_unknown(file_schema.as_ref());
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
file_schema,
file_groups: vec![vec![PartitionedFile::new(filename.to_string(), 10)]],
statistics: Default::default(),
statistics,
projection: None,
limit,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
}
}

Expand Down
22 changes: 5 additions & 17 deletions src/common/function/src/scalars/aggregate/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where
.map(|&n| n.into())
.collect::<Vec<Value>>();
Ok(vec![Value::List(ListValue::new(
Some(Box::new(nums)),
nums,
I::LogicalType::build_data_type(),
))])
}
Expand Down Expand Up @@ -120,10 +120,7 @@ where
O::from_native(native).into()
})
.collect::<Vec<Value>>();
let diff = Value::List(ListValue::new(
Some(Box::new(diff)),
O::LogicalType::build_data_type(),
));
let diff = Value::List(ListValue::new(diff, O::LogicalType::build_data_type()));
Ok(diff)
}
}
Expand Down Expand Up @@ -218,10 +215,7 @@ mod test {
let values = vec![Value::from(2_i64), Value::from(1_i64)];
diff.update_batch(&v).unwrap();
assert_eq!(
Value::List(ListValue::new(
Some(Box::new(values)),
ConcreteDataType::int64_datatype()
)),
Value::List(ListValue::new(values, ConcreteDataType::int64_datatype())),
diff.evaluate().unwrap()
);

Expand All @@ -236,10 +230,7 @@ mod test {
let values = vec![Value::from(5_i64), Value::from(1_i64)];
diff.update_batch(&v).unwrap();
assert_eq!(
Value::List(ListValue::new(
Some(Box::new(values)),
ConcreteDataType::int64_datatype()
)),
Value::List(ListValue::new(values, ConcreteDataType::int64_datatype())),
diff.evaluate().unwrap()
);

Expand All @@ -252,10 +243,7 @@ mod test {
let values = vec![Value::from(0_i64), Value::from(0_i64), Value::from(0_i64)];
diff.update_batch(&v).unwrap();
assert_eq!(
Value::List(ListValue::new(
Some(Box::new(values)),
ConcreteDataType::int64_datatype()
)),
Value::List(ListValue::new(values, ConcreteDataType::int64_datatype())),
diff.evaluate().unwrap()
);
}
Expand Down
5 changes: 1 addition & 4 deletions src/common/function/src/scalars/aggregate/percentile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ where
.map(|&n| n.into())
.collect::<Vec<Value>>();
Ok(vec![
Value::List(ListValue::new(
Some(Box::new(nums)),
T::LogicalType::build_data_type(),
)),
Value::List(ListValue::new(nums, T::LogicalType::build_data_type())),
self.p.into(),
])
}
Expand Down
5 changes: 1 addition & 4 deletions src/common/function/src/scalars/aggregate/polyval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ where
.map(|&n| n.into())
.collect::<Vec<Value>>();
Ok(vec![
Value::List(ListValue::new(
Some(Box::new(nums)),
T::LogicalType::build_data_type(),
)),
Value::List(ListValue::new(nums, T::LogicalType::build_data_type())),
self.x.into(),
])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ where
.map(|&x| x.into())
.collect::<Vec<Value>>();
Ok(vec![
Value::List(ListValue::new(
Some(Box::new(nums)),
T::LogicalType::build_data_type(),
)),
Value::List(ListValue::new(nums, T::LogicalType::build_data_type())),
self.x.into(),
])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ where
.map(|&x| x.into())
.collect::<Vec<Value>>();
Ok(vec![
Value::List(ListValue::new(
Some(Box::new(nums)),
T::LogicalType::build_data_type(),
)),
Value::List(ListValue::new(nums, T::LogicalType::build_data_type())),
self.x.into(),
])
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/function/src/scalars/math.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl Function for RangeFunction {
/// `range_fn` will never been used. As long as a legal signature is returned, the specific content of the signature does not matter.
/// In fact, the arguments loaded by `range_fn` are very complicated, and it is difficult to use `Signature` to describe
fn signature(&self) -> Signature {
Signature::any(0, Volatility::Immutable)
Signature::variadic_any(Volatility::Immutable)
}

fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
Expand Down
14 changes: 8 additions & 6 deletions src/common/macro/src/range_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,17 @@ fn build_struct(
}

pub fn scalar_udf() -> ScalarUDF {
ScalarUDF {
name: Self::name().to_string(),
signature: Signature::new(
// TODO(LFC): Use the new Datafusion UDF impl.
#[allow(deprecated)]
ScalarUDF::new(
Self::name(),
&Signature::new(
TypeSignature::Exact(Self::input_type()),
Volatility::Immutable,
),
return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))),
fun: Arc::new(Self::calc),
}
&(Arc::new(|_: &_| Ok(Arc::new(Self::return_type()))) as _),
&(Arc::new(Self::calc) as _),
)
}

fn input_type() -> Vec<DataType> {
Expand Down
6 changes: 4 additions & 2 deletions src/common/query/src/columnar_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::{Helper, VectorRef};
use snafu::ResultExt;

use crate::error::{self, IntoVectorSnafu, Result};
use crate::error::{self, GeneralDataFusionSnafu, IntoVectorSnafu, Result};
use crate::prelude::ScalarValue;

/// Represents the result from an expression
Expand All @@ -43,7 +43,9 @@ impl ColumnarValue {
Ok(match self {
ColumnarValue::Vector(v) => v,
ColumnarValue::Scalar(s) => {
let v = s.to_array_of_size(num_rows);
let v = s
.to_array_of_size(num_rows)
.context(GeneralDataFusionSnafu)?;
let data_type = v.data_type().clone();
Helper::try_into_vector(v).context(IntoVectorSnafu { data_type })?
}
Expand Down
13 changes: 8 additions & 5 deletions src/common/query/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub fn create_aggregate_function(
mod tests {
use std::sync::Arc;

use datafusion_common::DFSchema;
use datafusion_expr::{
ColumnarValue as DfColumnarValue, ScalarUDF as DfScalarUDF,
TypeSignature as DfTypeSignature,
Expand Down Expand Up @@ -135,15 +136,17 @@ mod tests {

// test into_df_udf
let df_udf: DfScalarUDF = udf.into();
assert_eq!("and", df_udf.name);
assert_eq!("and", df_udf.name());

let types = vec![DataType::Boolean, DataType::Boolean];
assert!(
matches!(&df_udf.signature.type_signature, DfTypeSignature::Exact(ts) if ts.clone() == types)
matches!(&df_udf.signature().type_signature, DfTypeSignature::Exact(ts) if ts.clone() == types)
);
assert_eq!(
Arc::new(DataType::Boolean),
(df_udf.return_type)(&[]).unwrap()
DataType::Boolean,
df_udf
.return_type_from_exprs(&[], &DFSchema::empty(), &[])
.unwrap()
);

let args = vec![
Expand All @@ -152,7 +155,7 @@ mod tests {
];

// call the function
let result = (df_udf.fun)(&args).unwrap();
let result = (df_udf.fun())(&args).unwrap();

match result {
DfColumnarValue::Array(arr) => {
Expand Down
4 changes: 2 additions & 2 deletions src/common/query/src/logical_plan/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl DfAccumulatorAdaptor {
}

impl DfAccumulator for DfAccumulatorAdaptor {
fn state(&self) -> DfResult<Vec<ScalarValue>> {
fn state(&mut self) -> DfResult<Vec<ScalarValue>> {
let state_values = self.accumulator.state()?;
let state_types = self.creator.state_types()?;
if state_values.len() != state_types.len() {
Expand Down Expand Up @@ -161,7 +161,7 @@ impl DfAccumulator for DfAccumulatorAdaptor {
Ok(())
}

fn evaluate(&self) -> DfResult<ScalarValue> {
fn evaluate(&mut self) -> DfResult<ScalarValue> {
let value = self.accumulator.evaluate()?;
let output_type = self.creator.output_type()?;
let scalar_value = value
Expand Down
4 changes: 2 additions & 2 deletions src/common/query/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ mod tests {

#[test]
fn test_from_df_expr() {
let df_expr = DfExpr::Wildcard;
let df_expr = DfExpr::Wildcard { qualifier: None };

let expr: Expr = df_expr.into();

assert_eq!(DfExpr::Wildcard, *expr.df_expr());
assert_eq!(DfExpr::Wildcard { qualifier: None }, *expr.df_expr());
}
}
2 changes: 2 additions & 0 deletions src/common/query/src/logical_plan/udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ impl AggregateFunction {

impl From<AggregateFunction> for DfAggregateUdf {
fn from(udaf: AggregateFunction) -> Self {
// TODO(LFC): See how to fit the new DataFusion UDAF implementation.
#[allow(deprecated)]
DfAggregateUdf::new(
&udaf.name,
&udaf.signature.into(),
Expand Down
2 changes: 2 additions & 0 deletions src/common/query/src/logical_plan/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ impl ScalarUdf {

impl From<ScalarUdf> for DfScalarUDF {
fn from(udf: ScalarUdf) -> Self {
// TODO(LFC): remove deprecated
#[allow(deprecated)]
DfScalarUDF::new(
&udf.name,
&udf.signature.into(),
Expand Down
Loading

0 comments on commit 3d0ad5d

Please sign in to comment.