Skip to content

Commit 169ffa5

Browse files
authored
Upgrade DataFusion to 27.0.0 (#834)
* Upgrade DataFusion to 27.0.0 * Fix optimizer rule 'push_down_projection' failed * Fix tests error of bencmarks * Fix physical plan encode/decode error
1 parent 5dfdfea commit 169ffa5

21 files changed

+306
-128
lines changed

Cargo.toml

+20-13
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,21 @@
1919
members = ["ballista-cli", "ballista/client", "ballista/core", "ballista/executor", "ballista/scheduler", "benchmarks", "examples"]
2020

2121
[workspace.dependencies]
22-
arrow = { version = "40.0.0" }
23-
arrow-flight = { version = "40.0.0", features = ["flight-sql-experimental"] }
24-
arrow-schema = { version = "40.0.0", default-features = false }
25-
configure_me = { version = "0.4.0" }
26-
configure_me_codegen = { version = "0.4.4" }
27-
datafusion = "26.0.0"
28-
datafusion-cli = "26.0.0"
29-
datafusion-proto = "26.0.0"
30-
object_store = "0.5.6"
31-
sqlparser = "0.34.0"
32-
tonic = { version = "0.9" }
33-
tonic-build = { version = "0.9", default-features = false, features = ["transport", "prost"] }
22+
arrow = {version = "43.0.0"}
23+
arrow-flight = {version = "43.0.0", features = ["flight-sql-experimental"]}
24+
arrow-schema = {version = "43.0.0", default-features = false}
25+
configure_me = {version = "0.4.0"}
26+
configure_me_codegen = {version = "0.4.4"}
27+
datafusion = "27.0.0"
28+
datafusion-cli = "27.0.0"
29+
datafusion-proto = "27.0.0"
30+
object_store = "0.6.1"
31+
sqlparser = "0.35.0"
32+
tonic = {version = "0.9"}
33+
tonic-build = {version = "0.9", default-features = false, features = ["transport", "prost"]}
3434
tracing = "0.1.36"
3535
tracing-appender = "0.2.2"
36-
tracing-subscriber = { version = "0.3.15", features = ["env-filter"] }
36+
tracing-subscriber = {version = "0.3.15", features = ["env-filter"]}
3737

3838
# cargo build --profile release-lto
3939
[profile.release-lto]
@@ -54,3 +54,10 @@ opt-level = 3
5454
overflow-checks = false
5555
panic = 'unwind'
5656
rpath = false
57+
58+
[patch.crates-io]
59+
# TODO remove on upgrade to DataFusion 28.0.0
60+
# fix for https://github.com/apache/arrow-datafusion/issues/6819 and https://github.com/apache/arrow-datafusion/issues/6898
61+
datafusion = {git = "https://github.com/apache/arrow-datafusion.git", rev = "4e2a72f6c7109d40a4986e3d05360524be078dd4"}
62+
datafusion-cli = {git = "https://github.com/apache/arrow-datafusion.git", rev = "4e2a72f6c7109d40a4986e3d05360524be078dd4"}
63+
datafusion-proto = {git = "https://github.com/apache/arrow-datafusion.git", rev = "4e2a72f6c7109d40a4986e3d05360524be078dd4"}

ballista/client/README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ To build a simple ballista example, add the following dependencies to your `Carg
8484

8585
```toml
8686
[dependencies]
87-
ballista = "0.10"
88-
datafusion = "21.0.0"
87+
ballista = "0.11"
88+
datafusion = "27.0.0"
8989
tokio = "1.0"
9090
```
9191

ballista/core/src/execution_plans/distributed_query.rs

+20-17
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ use datafusion::logical_expr::LogicalPlan;
3333
use datafusion::physical_plan::expressions::PhysicalSortExpr;
3434
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
3535
use datafusion::physical_plan::{
36-
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
36+
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
37+
Statistics,
3738
};
3839
use datafusion_proto::logical_plan::{
3940
AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
@@ -119,6 +120,24 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
119120
}
120121
}
121122

123+
impl<T: 'static + AsLogicalPlan> DisplayAs for DistributedQueryExec<T> {
124+
fn fmt_as(
125+
&self,
126+
t: DisplayFormatType,
127+
f: &mut std::fmt::Formatter,
128+
) -> std::fmt::Result {
129+
match t {
130+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
131+
write!(
132+
f,
133+
"DistributedQueryExec: scheduler_url={}",
134+
self.scheduler_url
135+
)
136+
}
137+
}
138+
}
139+
}
140+
122141
impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
123142
fn as_any(&self) -> &dyn Any {
124143
self
@@ -191,22 +210,6 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
191210
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
192211
}
193212

194-
fn fmt_as(
195-
&self,
196-
t: DisplayFormatType,
197-
f: &mut std::fmt::Formatter,
198-
) -> std::fmt::Result {
199-
match t {
200-
DisplayFormatType::Default => {
201-
write!(
202-
f,
203-
"DistributedQueryExec: scheduler_url={}",
204-
self.scheduler_url
205-
)
206-
}
207-
}
208-
}
209-
210213
fn statistics(&self) -> Statistics {
211214
// This execution plan sends the logical plan to the scheduler without
212215
// performing the node by node conversion to a full physical plan.

ballista/core/src/execution_plans/shuffle_reader.rs

+15-13
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use datafusion::error::Result;
3737
use datafusion::physical_plan::expressions::PhysicalSortExpr;
3838
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
3939
use datafusion::physical_plan::{
40-
DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
40+
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
4141
SendableRecordBatchStream, Statistics,
4242
};
4343
use futures::{Stream, StreamExt, TryStreamExt};
@@ -83,6 +83,20 @@ impl ShuffleReaderExec {
8383
}
8484
}
8585

86+
impl DisplayAs for ShuffleReaderExec {
87+
fn fmt_as(
88+
&self,
89+
t: DisplayFormatType,
90+
f: &mut std::fmt::Formatter,
91+
) -> std::fmt::Result {
92+
match t {
93+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
94+
write!(f, "ShuffleReaderExec: partitions={}", self.partition.len())
95+
}
96+
}
97+
}
98+
}
99+
86100
impl ExecutionPlan for ShuffleReaderExec {
87101
fn as_any(&self) -> &dyn Any {
88102
self
@@ -154,18 +168,6 @@ impl ExecutionPlan for ShuffleReaderExec {
154168
Ok(Box::pin(result))
155169
}
156170

157-
fn fmt_as(
158-
&self,
159-
t: DisplayFormatType,
160-
f: &mut std::fmt::Formatter,
161-
) -> std::fmt::Result {
162-
match t {
163-
DisplayFormatType::Default => {
164-
write!(f, "ShuffleReaderExec: partitions={}", self.partition.len())
165-
}
166-
}
167-
}
168-
169171
fn metrics(&self) -> Option<MetricsSet> {
170172
Some(self.metrics.clone_inner())
171173
}

ballista/core/src/execution_plans/shuffle_writer.rs

+20-17
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ use datafusion::physical_plan::metrics::{
4747
};
4848

4949
use datafusion::physical_plan::{
50-
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
50+
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
51+
Statistics,
5152
};
5253
use futures::{StreamExt, TryFutureExt, TryStreamExt};
5354

@@ -294,6 +295,24 @@ impl ShuffleWriterExec {
294295
}
295296
}
296297

298+
impl DisplayAs for ShuffleWriterExec {
299+
fn fmt_as(
300+
&self,
301+
t: DisplayFormatType,
302+
f: &mut std::fmt::Formatter,
303+
) -> std::fmt::Result {
304+
match t {
305+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
306+
write!(
307+
f,
308+
"ShuffleWriterExec: {:?}",
309+
self.shuffle_output_partitioning
310+
)
311+
}
312+
}
313+
}
314+
}
315+
297316
impl ExecutionPlan for ShuffleWriterExec {
298317
fn as_any(&self) -> &dyn Any {
299318
self
@@ -399,22 +418,6 @@ impl ExecutionPlan for ShuffleWriterExec {
399418
Some(self.metrics.clone_inner())
400419
}
401420

402-
fn fmt_as(
403-
&self,
404-
t: DisplayFormatType,
405-
f: &mut std::fmt::Formatter,
406-
) -> std::fmt::Result {
407-
match t {
408-
DisplayFormatType::Default => {
409-
write!(
410-
f,
411-
"ShuffleWriterExec: {:?}",
412-
self.shuffle_output_partitioning
413-
)
414-
}
415-
}
416-
}
417-
418421
fn statistics(&self) -> Statistics {
419422
self.plan.statistics()
420423
}

ballista/core/src/execution_plans/unresolved_shuffle.rs

+16-13
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ use datafusion::error::{DataFusionError, Result};
2323
use datafusion::execution::context::TaskContext;
2424
use datafusion::physical_plan::expressions::PhysicalSortExpr;
2525
use datafusion::physical_plan::{
26-
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
26+
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream,
27+
Statistics,
2728
};
2829

2930
/// UnresolvedShuffleExec represents a dependency on the results of a ShuffleWriterExec node which hasn't computed yet.
@@ -57,6 +58,20 @@ impl UnresolvedShuffleExec {
5758
}
5859
}
5960

61+
impl DisplayAs for UnresolvedShuffleExec {
62+
fn fmt_as(
63+
&self,
64+
t: DisplayFormatType,
65+
f: &mut std::fmt::Formatter,
66+
) -> std::fmt::Result {
67+
match t {
68+
DisplayFormatType::Default | DisplayFormatType::Verbose => {
69+
write!(f, "UnresolvedShuffleExec")
70+
}
71+
}
72+
}
73+
}
74+
6075
impl ExecutionPlan for UnresolvedShuffleExec {
6176
fn as_any(&self) -> &dyn Any {
6277
self
@@ -100,18 +115,6 @@ impl ExecutionPlan for UnresolvedShuffleExec {
100115
))
101116
}
102117

103-
fn fmt_as(
104-
&self,
105-
t: DisplayFormatType,
106-
f: &mut std::fmt::Formatter,
107-
) -> std::fmt::Result {
108-
match t {
109-
DisplayFormatType::Default => {
110-
write!(f, "UnresolvedShuffleExec")
111-
}
112-
}
113-
}
114-
115118
fn statistics(&self) -> Statistics {
116119
// The full statistics are computed in the `ShuffleReaderExec` node
117120
// that replaces this one once the previous stage is completed.

ballista/core/src/serde/scheduler/from_proto.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use chrono::{TimeZone, Utc};
1919
use datafusion::common::tree_node::{Transformed, TreeNode};
2020
use datafusion::execution::runtime_env::RuntimeEnv;
21-
use datafusion::logical_expr::{AggregateUDF, ScalarUDF};
21+
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
2222
use datafusion::physical_plan::metrics::{
2323
Count, Gauge, MetricValue, MetricsSet, Time, Timestamp,
2424
};
@@ -279,6 +279,7 @@ pub fn get_task_definition<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
279279
runtime: Arc<RuntimeEnv>,
280280
scalar_functions: HashMap<String, Arc<ScalarUDF>>,
281281
aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
282+
window_functions: HashMap<String, Arc<WindowUDF>>,
282283
codec: BallistaCodec<T, U>,
283284
) -> Result<TaskDefinition, BallistaError> {
284285
let mut props = HashMap::new();
@@ -289,16 +290,21 @@ pub fn get_task_definition<T: 'static + AsLogicalPlan, U: 'static + AsExecutionP
289290

290291
let mut task_scalar_functions = HashMap::new();
291292
let mut task_aggregate_functions = HashMap::new();
293+
let mut task_window_functions = HashMap::new();
292294
// TODO combine the functions from Executor's functions and TaskDefinition's function resources
293295
for scalar_func in scalar_functions {
294296
task_scalar_functions.insert(scalar_func.0, scalar_func.1);
295297
}
296298
for agg_func in aggregate_functions {
297299
task_aggregate_functions.insert(agg_func.0, agg_func.1);
298300
}
301+
for agg_func in window_functions {
302+
task_window_functions.insert(agg_func.0, agg_func.1);
303+
}
299304
let function_registry = Arc::new(SimpleFunctionRegistry {
300305
scalar_functions: task_scalar_functions,
301306
aggregate_functions: task_aggregate_functions,
307+
window_functions: task_window_functions,
302308
});
303309

304310
let encoded_plan = task.plan.as_slice();
@@ -342,6 +348,7 @@ pub fn get_task_definition_vec<
342348
runtime: Arc<RuntimeEnv>,
343349
scalar_functions: HashMap<String, Arc<ScalarUDF>>,
344350
aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
351+
window_functions: HashMap<String, Arc<WindowUDF>>,
345352
codec: BallistaCodec<T, U>,
346353
) -> Result<Vec<TaskDefinition>, BallistaError> {
347354
let mut props = HashMap::new();
@@ -352,16 +359,21 @@ pub fn get_task_definition_vec<
352359

353360
let mut task_scalar_functions = HashMap::new();
354361
let mut task_aggregate_functions = HashMap::new();
362+
let mut task_window_functions = HashMap::new();
355363
// TODO combine the functions from Executor's functions and TaskDefinition's function resources
356364
for scalar_func in scalar_functions {
357365
task_scalar_functions.insert(scalar_func.0, scalar_func.1);
358366
}
359367
for agg_func in aggregate_functions {
360368
task_aggregate_functions.insert(agg_func.0, agg_func.1);
361369
}
370+
for agg_func in window_functions {
371+
task_window_functions.insert(agg_func.0, agg_func.1);
372+
}
362373
let function_registry = Arc::new(SimpleFunctionRegistry {
363374
scalar_functions: task_scalar_functions,
364375
aggregate_functions: task_aggregate_functions,
376+
window_functions: task_window_functions,
365377
});
366378

367379
let encoded_plan = multi_task.plan.as_slice();

ballista/core/src/serde/scheduler/mod.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use datafusion::arrow::array::{
2525
use datafusion::arrow::datatypes::{DataType, Field};
2626
use datafusion::common::DataFusionError;
2727
use datafusion::execution::FunctionRegistry;
28-
use datafusion::logical_expr::{AggregateUDF, ScalarUDF};
28+
use datafusion::logical_expr::{AggregateUDF, ScalarUDF, WindowUDF};
2929
use datafusion::physical_plan::ExecutionPlan;
3030
use datafusion::physical_plan::Partitioning;
3131
use serde::Serialize;
@@ -295,6 +295,7 @@ pub struct TaskDefinition {
295295
pub struct SimpleFunctionRegistry {
296296
pub scalar_functions: HashMap<String, Arc<ScalarUDF>>,
297297
pub aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
298+
pub window_functions: HashMap<String, Arc<WindowUDF>>,
298299
}
299300

300301
impl FunctionRegistry for SimpleFunctionRegistry {
@@ -321,4 +322,14 @@ impl FunctionRegistry for SimpleFunctionRegistry {
321322
))
322323
})
323324
}
325+
326+
fn udwf(&self, name: &str) -> datafusion::common::Result<Arc<WindowUDF>> {
327+
let result = self.window_functions.get(name);
328+
329+
result.cloned().ok_or_else(|| {
330+
DataFusionError::Internal(format!(
331+
"There is no UDWF named \"{name}\" in the TaskContext"
332+
))
333+
})
334+
}
324335
}

ballista/core/src/utils.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use datafusion::arrow::{ipc::writer::FileWriter, record_batch::RecordBatch};
2727
use datafusion::datasource::object_store::{
2828
DefaultObjectStoreRegistry, ObjectStoreRegistry,
2929
};
30+
use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
3031
use datafusion::error::DataFusionError;
3132
use datafusion::execution::context::{
3233
QueryPlanner, SessionConfig, SessionContext, SessionState,
@@ -38,7 +39,6 @@ use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
3839
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
3940
use datafusion::physical_plan::common::batch_byte_size;
4041
use datafusion::physical_plan::empty::EmptyExec;
41-
use datafusion::physical_plan::file_format::{CsvExec, ParquetExec};
4242
use datafusion::physical_plan::filter::FilterExec;
4343
use datafusion::physical_plan::joins::HashJoinExec;
4444
use datafusion::physical_plan::metrics::MetricsSet;

0 commit comments

Comments
 (0)