Skip to content

Commit

Permalink
chore: update datafusion
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelScofield committed Mar 29, 2024
1 parent 500f9f1 commit 069b3a9
Show file tree
Hide file tree
Showing 134 changed files with 1,949 additions and 1,610 deletions.
789 changes: 499 additions & 290 deletions Cargo.lock

Large diffs are not rendered by default.

49 changes: 28 additions & 21 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,18 @@ clippy.implicit_clone = "warn"
rust.unknown_lints = "deny"

[workspace.dependencies]
# We turn off default-features for some dependencies here so the workspaces which inherit them can
# selectively turn them on if needed, since we can override default-features = true (from false)
# for the inherited dependency but cannot do the reverse (override from true to false).
#
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.3"
arrow = { version = "47.0" }
arrow-array = "47.0"
arrow-flight = "47.0"
arrow-ipc = { version = "47.0", features = ["lz4"] }
arrow-schema = { version = "47.0", features = ["serde"] }
arrow = { version = "51.0.0", features = ["prettyprint"] }
arrow-array = { version = "51.0.0", default-features = false, features = ["chrono-tz"] }
arrow-flight = "51.0"
arrow-ipc = { version = "51.0.0", default-features = false, features = ["lz4"] }
arrow-schema = { version = "51.0", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
axum = { version = "0.6", features = ["headers"] }
Expand All @@ -91,19 +96,21 @@ bytes = { version = "1.5", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.4", features = ["derive"] }
dashmap = "5.4"
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" }
datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "56c735c458c6d6dd7696941457dd4bbe95eaa2e0" }
datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "56c735c458c6d6dd7696941457dd4bbe95eaa2e0" }
datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "56c735c458c6d6dd7696941457dd4bbe95eaa2e0" }
datafusion-functions = { git = "https://github.com/apache/arrow-datafusion.git", rev = "56c735c458c6d6dd7696941457dd4bbe95eaa2e0" }
datafusion-optimizer = { git = "https://github.com/apache/arrow-datafusion.git", rev = "56c735c458c6d6dd7696941457dd4bbe95eaa2e0" }
datafusion-physical-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "56c735c458c6d6dd7696941457dd4bbe95eaa2e0" }
datafusion-sql = { git = "https://github.com/apache/arrow-datafusion.git", rev = "56c735c458c6d6dd7696941457dd4bbe95eaa2e0" }
datafusion-substrait = { git = "https://github.com/apache/arrow-datafusion.git", rev = "56c735c458c6d6dd7696941457dd4bbe95eaa2e0" }
derive_builder = "0.12"
etcd-client = "0.12"
# TODO(LFC): Wait for https://github.com/etcdv3/etcd-client/pull/76
etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev = "4c371e9b3ea8e0a8ee2f9cbd7ded26e54a45df3b" }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "06f6297ff3cab578a1589741b504342fbad70453" }
greptime-proto = { git = "https://github.com/MichaelScofield/greptime-proto.git", rev = "cd44c4b9ab39f8081c1dac809400c0141b48ed96" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
Expand All @@ -113,12 +120,12 @@ moka = "0.12"
notify = "6.1"
num_cpus = "1.16"
once_cell = "1.18"
opentelemetry-proto = { git = "https://github.com/waynexia/opentelemetry-rust.git", rev = "33841b38dda79b15f2024952be5f32533325ca02", features = [
opentelemetry-proto = { version = "0.5", features = [
"gen-tonic",
"metrics",
"trace",
] }
parquet = "47.0"
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
prometheus = { version = "0.13.3", features = ["process"] }
Expand All @@ -141,18 +148,18 @@ serde_with = "3"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.7"
sysinfo = "0.30"
# on branch v0.38.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "6a93567ae38d42be5c8d08b13c8ff4dde26502ef", features = [
# on branch v0.44.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "c919990bf62ad38d2b0c0a3bc90b26ad919d51b0", features = [
"visitor",
] }
strum = { version = "0.25", features = ["derive"] }
tempfile = "3"
tokio = { version = "1.28", features = ["full"] }
tokio = { version = "1.36", features = ["full"] }
tokio-stream = { version = "0.1" }
tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.10", features = ["tls"] }
uuid = { version = "1", features = ["serde", "v4", "fast-rng"] }
tonic = { version = "0.11", features = ["tls"] }
uuid = { version = "1.7", features = ["serde", "v4", "fast-rng"] }

## workspaces members
api = { path = "src/api" }
Expand Down
32 changes: 1 addition & 31 deletions benchmarks/src/bin/nyc-taxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,37 +215,7 @@ fn build_values(column: &ArrayRef) -> (Values, ColumnDataType) {
ColumnDataType::String,
)
}
DataType::Null
| DataType::Boolean
| DataType::Int8
| DataType::Int16
| DataType::Int32
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64
| DataType::Float16
| DataType::Float32
| DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Time64(_)
| DataType::Duration(_)
| DataType::Interval(_)
| DataType::Binary
| DataType::FixedSizeBinary(_)
| DataType::LargeBinary
| DataType::LargeUtf8
| DataType::List(_)
| DataType::FixedSizeList(_, _)
| DataType::LargeList(_)
| DataType::Struct(_)
| DataType::Union(_, _)
| DataType::Dictionary(_, _)
| DataType::Decimal128(_, _)
| DataType::Decimal256(_, _)
| DataType::RunEndEncoded(_, _)
| DataType::Map(_, _) => todo!(),
_ => todo!(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/datasource/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ derive_builder.workspace = true
futures.workspace = true
lazy_static.workspace = true
object-store.workspace = true
orc-rust = "0.2"
orc-rust = { git = "https://github.com/MichaelScofield/orc-rs.git", rev = "17347f5f084ac937863317df882218055c4ea8c1" }
parquet.workspace = true
paste = "1.0"
regex = "1.7"
Expand Down
2 changes: 1 addition & 1 deletion src/common/datasource/src/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl CsvConfig {
let mut builder = csv::ReaderBuilder::new(self.file_schema.clone())
.with_delimiter(self.delimiter)
.with_batch_size(self.batch_size)
.has_header(self.has_header);
.with_header(self.has_header);

if let Some(proj) = &self.file_projection {
builder = builder.with_projection(proj.clone());
Expand Down
3 changes: 2 additions & 1 deletion src/common/datasource/src/file_format/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::vec;

use common_test_util::find_workspace_path;
use datafusion::assert_batches_eq;
use datafusion::config::TableParquetOptions;
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileStream, ParquetExec};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
Expand Down Expand Up @@ -166,7 +167,7 @@ async fn test_parquet_exec() {
.to_string();
let base_config = scan_config(schema.clone(), None, path);

let exec = ParquetExec::new(base_config, None, None)
let exec = ParquetExec::new(base_config, None, None, TableParquetOptions::default())
.with_parquet_file_reader_factory(Arc::new(DefaultParquetFileReaderFactory::new(store)));

let ctx = SessionContext::new();
Expand Down
6 changes: 3 additions & 3 deletions src/common/datasource/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema, SchemaRef};
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datafusion::common::Statistics;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{FileScanConfig, FileStream};
Expand Down Expand Up @@ -72,17 +73,16 @@ pub fn test_basic_schema() -> SchemaRef {
pub fn scan_config(file_schema: SchemaRef, limit: Option<usize>, filename: &str) -> FileScanConfig {
// object_store only recognize the Unix style path, so make it happy.
let filename = &filename.replace('\\', "/");

let statistics = Statistics::new_unknown(file_schema.as_ref());
FileScanConfig {
object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
file_schema,
file_groups: vec![vec![PartitionedFile::new(filename.to_string(), 10)]],
statistics: Default::default(),
statistics,
projection: None,
limit,
table_partition_cols: vec![],
output_ordering: vec![],
infinite_source: false,
}
}

Expand Down
22 changes: 5 additions & 17 deletions src/common/function/src/scalars/aggregate/diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where
.map(|&n| n.into())
.collect::<Vec<Value>>();
Ok(vec![Value::List(ListValue::new(
Some(Box::new(nums)),
nums,
I::LogicalType::build_data_type(),
))])
}
Expand Down Expand Up @@ -120,10 +120,7 @@ where
O::from_native(native).into()
})
.collect::<Vec<Value>>();
let diff = Value::List(ListValue::new(
Some(Box::new(diff)),
O::LogicalType::build_data_type(),
));
let diff = Value::List(ListValue::new(diff, O::LogicalType::build_data_type()));
Ok(diff)
}
}
Expand Down Expand Up @@ -218,10 +215,7 @@ mod test {
let values = vec![Value::from(2_i64), Value::from(1_i64)];
diff.update_batch(&v).unwrap();
assert_eq!(
Value::List(ListValue::new(
Some(Box::new(values)),
ConcreteDataType::int64_datatype()
)),
Value::List(ListValue::new(values, ConcreteDataType::int64_datatype())),
diff.evaluate().unwrap()
);

Expand All @@ -236,10 +230,7 @@ mod test {
let values = vec![Value::from(5_i64), Value::from(1_i64)];
diff.update_batch(&v).unwrap();
assert_eq!(
Value::List(ListValue::new(
Some(Box::new(values)),
ConcreteDataType::int64_datatype()
)),
Value::List(ListValue::new(values, ConcreteDataType::int64_datatype())),
diff.evaluate().unwrap()
);

Expand All @@ -252,10 +243,7 @@ mod test {
let values = vec![Value::from(0_i64), Value::from(0_i64), Value::from(0_i64)];
diff.update_batch(&v).unwrap();
assert_eq!(
Value::List(ListValue::new(
Some(Box::new(values)),
ConcreteDataType::int64_datatype()
)),
Value::List(ListValue::new(values, ConcreteDataType::int64_datatype())),
diff.evaluate().unwrap()
);
}
Expand Down
5 changes: 1 addition & 4 deletions src/common/function/src/scalars/aggregate/percentile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ where
.map(|&n| n.into())
.collect::<Vec<Value>>();
Ok(vec![
Value::List(ListValue::new(
Some(Box::new(nums)),
T::LogicalType::build_data_type(),
)),
Value::List(ListValue::new(nums, T::LogicalType::build_data_type())),
self.p.into(),
])
}
Expand Down
5 changes: 1 addition & 4 deletions src/common/function/src/scalars/aggregate/polyval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ where
.map(|&n| n.into())
.collect::<Vec<Value>>();
Ok(vec![
Value::List(ListValue::new(
Some(Box::new(nums)),
T::LogicalType::build_data_type(),
)),
Value::List(ListValue::new(nums, T::LogicalType::build_data_type())),
self.x.into(),
])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ where
.map(|&x| x.into())
.collect::<Vec<Value>>();
Ok(vec![
Value::List(ListValue::new(
Some(Box::new(nums)),
T::LogicalType::build_data_type(),
)),
Value::List(ListValue::new(nums, T::LogicalType::build_data_type())),
self.x.into(),
])
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,7 @@ where
.map(|&x| x.into())
.collect::<Vec<Value>>();
Ok(vec![
Value::List(ListValue::new(
Some(Box::new(nums)),
T::LogicalType::build_data_type(),
)),
Value::List(ListValue::new(nums, T::LogicalType::build_data_type())),
self.x.into(),
])
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/function/src/scalars/math.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl Function for RangeFunction {
/// `range_fn` will never been used. As long as a legal signature is returned, the specific content of the signature does not matter.
/// In fact, the arguments loaded by `range_fn` are very complicated, and it is difficult to use `Signature` to describe
fn signature(&self) -> Signature {
Signature::any(0, Volatility::Immutable)
Signature::variadic_any(Volatility::Immutable)
}

fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
Expand Down
14 changes: 8 additions & 6 deletions src/common/macro/src/range_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,17 @@ fn build_struct(
}

pub fn scalar_udf() -> ScalarUDF {
ScalarUDF {
name: Self::name().to_string(),
signature: Signature::new(
// TODO(LFC): Use the new Datafusion UDF impl.
#[allow(deprecated)]
ScalarUDF::new(
Self::name(),
&Signature::new(
TypeSignature::Exact(Self::input_type()),
Volatility::Immutable,
),
return_type: Arc::new(|_| Ok(Arc::new(Self::return_type()))),
fun: Arc::new(Self::calc),
}
&(Arc::new(|_: &_| Ok(Arc::new(Self::return_type()))) as _),
&(Arc::new(Self::calc) as _),
)
}

fn input_type() -> Vec<DataType> {
Expand Down
6 changes: 4 additions & 2 deletions src/common/query/src/columnar_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::vectors::{Helper, VectorRef};
use snafu::ResultExt;

use crate::error::{self, IntoVectorSnafu, Result};
use crate::error::{self, GeneralDataFusionSnafu, IntoVectorSnafu, Result};
use crate::prelude::ScalarValue;

/// Represents the result from an expression
Expand All @@ -43,7 +43,9 @@ impl ColumnarValue {
Ok(match self {
ColumnarValue::Vector(v) => v,
ColumnarValue::Scalar(s) => {
let v = s.to_array_of_size(num_rows);
let v = s
.to_array_of_size(num_rows)
.context(GeneralDataFusionSnafu)?;
let data_type = v.data_type().clone();
Helper::try_into_vector(v).context(IntoVectorSnafu { data_type })?
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ impl Debug for OutputData {
OutputData::RecordBatches(recordbatches) => {
write!(f, "OutputData::RecordBatches({recordbatches:?})")
}
OutputData::Stream(_) => {
write!(f, "OutputData::Stream(<stream>)")
OutputData::Stream(s) => {
write!(f, "OutputData::Stream(<{}>)", s.name())
}
}
}
Expand Down
Loading

0 comments on commit 069b3a9

Please sign in to comment.