Skip to content

Commit 4068f8b

Browse files
authored
Rename MergeExec to CoalescePartitionsExec (#635)
1 parent 27dc5d6 commit 4068f8b

File tree

16 files changed

+67
-53
lines changed

16 files changed

+67
-53
lines changed

ballista/rust/core/proto/ballista.proto

+2-2
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ message PhysicalPlanNode {
414414
SortExecNode sort = 11;
415415
CoalesceBatchesExecNode coalesce_batches = 12;
416416
FilterExecNode filter = 13;
417-
MergeExecNode merge = 14;
417+
CoalescePartitionsExecNode merge = 14;
418418
UnresolvedShuffleExecNode unresolved = 15;
419419
RepartitionExecNode repartition = 16;
420420
WindowAggExecNode window = 17;
@@ -648,7 +648,7 @@ message CoalesceBatchesExecNode {
648648
uint32 target_batch_size = 2;
649649
}
650650

651-
message MergeExecNode {
651+
message CoalescePartitionsExecNode {
652652
PhysicalPlanNode input = 1;
653653
}
654654

ballista/rust/core/src/serde/physical_plan/from_proto.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ use datafusion::execution::context::{
3737
};
3838
use datafusion::logical_plan::{window_frames::WindowFrame, DFSchema, Expr};
3939
use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
40+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
4041
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
4142
use datafusion::physical_plan::hash_join::PartitionMode;
42-
use datafusion::physical_plan::merge::MergeExec;
4343
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
4444
use datafusion::physical_plan::window_functions::{
4545
BuiltInWindowFunction, WindowFunction,
@@ -147,7 +147,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
147147
}
148148
PhysicalPlanType::Merge(merge) => {
149149
let input: Arc<dyn ExecutionPlan> = convert_box_required!(merge.input)?;
150-
Ok(Arc::new(MergeExec::new(input)))
150+
Ok(Arc::new(CoalescePartitionsExec::new(input)))
151151
}
152152
PhysicalPlanType::Repartition(repart) => {
153153
let input: Arc<dyn ExecutionPlan> = convert_box_required!(repart.input)?;

ballista/rust/core/src/serde/physical_plan/to_proto.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
5959
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
6060
use crate::serde::scheduler::PartitionLocation;
6161
use crate::serde::{protobuf, BallistaError};
62+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
6263
use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr};
63-
use datafusion::physical_plan::merge::MergeExec;
6464
use datafusion::physical_plan::repartition::RepartitionExec;
6565

6666
impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
@@ -292,11 +292,11 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
292292
},
293293
)),
294294
})
295-
} else if let Some(exec) = plan.downcast_ref::<MergeExec>() {
295+
} else if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
296296
let input: protobuf::PhysicalPlanNode = exec.input().to_owned().try_into()?;
297297
Ok(protobuf::PhysicalPlanNode {
298298
physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
299-
protobuf::MergeExecNode {
299+
protobuf::CoalescePartitionsExecNode {
300300
input: Some(Box::new(input)),
301301
},
302302
))),

ballista/rust/core/src/utils.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,15 @@ use datafusion::arrow::{
4040
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
4141
use datafusion::logical_plan::Operator;
4242
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
43-
use datafusion::physical_optimizer::merge_exec::AddMergeExec;
43+
use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
4444
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
4545
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
46+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
4647
use datafusion::physical_plan::csv::CsvExec;
4748
use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
4849
use datafusion::physical_plan::filter::FilterExec;
4950
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
5051
use datafusion::physical_plan::hash_join::HashJoinExec;
51-
use datafusion::physical_plan::merge::MergeExec;
5252
use datafusion::physical_plan::parquet::ParquetExec;
5353
use datafusion::physical_plan::projection::ProjectionExec;
5454
use datafusion::physical_plan::sort::SortExec;
@@ -177,8 +177,12 @@ fn build_exec_plan_diagram(
177177
.is_some()
178178
{
179179
"CoalesceBatchesExec"
180-
} else if plan.as_any().downcast_ref::<MergeExec>().is_some() {
181-
"MergeExec"
180+
} else if plan
181+
.as_any()
182+
.downcast_ref::<CoalescePartitionsExec>()
183+
.is_some()
184+
{
185+
"CoalescePartitionsExec"
182186
} else {
183187
println!("Unknown: {:?}", plan);
184188
"Unknown"
@@ -226,7 +230,7 @@ pub fn create_datafusion_context() -> ExecutionContext {
226230
// remove Repartition rule because that isn't supported yet
227231
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
228232
Arc::new(CoalesceBatches::new()),
229-
Arc::new(AddMergeExec::new()),
233+
Arc::new(AddCoalescePartitionsExec::new()),
230234
];
231235
let config = ExecutionConfig::new()
232236
.with_concurrency(1)

ballista/rust/scheduler/src/planner.rs

+12-7
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ use ballista_core::{
3030
};
3131
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
3232
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
33-
use datafusion::physical_optimizer::merge_exec::AddMergeExec;
33+
use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
3434
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
35+
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
3536
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
3637
use datafusion::physical_plan::hash_join::HashJoinExec;
37-
use datafusion::physical_plan::merge::MergeExec;
3838
use datafusion::physical_plan::windows::WindowAggExec;
3939
use datafusion::physical_plan::ExecutionPlan;
4040
use log::info;
@@ -101,12 +101,15 @@ impl DistributedPlanner {
101101
// remove Repartition rule because that isn't supported yet
102102
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
103103
Arc::new(CoalesceBatches::new()),
104-
Arc::new(AddMergeExec::new()),
104+
Arc::new(AddCoalescePartitionsExec::new()),
105105
];
106106
let config = ExecutionConfig::new().with_physical_optimizer_rules(rules);
107107
let ctx = ExecutionContext::with_config(config);
108108
Ok((ctx.create_physical_plan(&adapter.logical_plan)?, stages))
109-
} else if let Some(merge) = execution_plan.as_any().downcast_ref::<MergeExec>() {
109+
} else if let Some(merge) = execution_plan
110+
.as_any()
111+
.downcast_ref::<CoalescePartitionsExec>()
112+
{
110113
let query_stage = create_query_stage(
111114
job_id,
112115
self.next_stage_id(),
@@ -244,8 +247,10 @@ mod test {
244247
use ballista_core::serde::protobuf;
245248
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
246249
use datafusion::physical_plan::sort::SortExec;
250+
use datafusion::physical_plan::{
251+
coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec,
252+
};
247253
use datafusion::physical_plan::{displayable, ExecutionPlan};
248-
use datafusion::physical_plan::{merge::MergeExec, projection::ProjectionExec};
249254
use std::convert::TryInto;
250255
use std::sync::Arc;
251256
use uuid::Uuid;
@@ -284,7 +289,7 @@ mod test {
284289
HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]
285290
CsvExec: testdata/lineitem; partitions=2
286291
QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2
287-
MergeExec
292+
CoalescePartitionsExec
288293
UnresolvedShuffleExec: stages=[1]
289294
QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3
290295
SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext
@@ -309,7 +314,7 @@ mod test {
309314
assert_eq!(unresolved_shuffle.query_stage_ids, vec![2]);
310315

311316
let merge_exec = stages[1].children()[0].clone();
312-
let merge_exec = downcast_exec!(merge_exec, MergeExec);
317+
let merge_exec = downcast_exec!(merge_exec, CoalescePartitionsExec);
313318

314319
let unresolved_shuffle = merge_exec.children()[0].clone();
315320
let unresolved_shuffle =

ballista/rust/scheduler/src/test_utils.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use ballista_core::error::Result;
2222
use datafusion::arrow::datatypes::{DataType, Field, Schema};
2323
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
2424
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
25-
use datafusion::physical_optimizer::merge_exec::AddMergeExec;
25+
use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
2626
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
2727
use datafusion::physical_plan::csv::CsvReadOptions;
2828

@@ -33,7 +33,7 @@ pub const TPCH_TABLES: &[&str] = &[
3333
pub fn datafusion_test_context(path: &str) -> Result<ExecutionContext> {
3434
// remove Repartition rule because that isn't supported yet
3535
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
36-
Arc::new(AddMergeExec::new()),
36+
Arc::new(AddCoalescePartitionsExec::new()),
3737
Arc::new(CoalesceBatches::new()),
3838
];
3939
let config = ExecutionConfig::new()

datafusion/src/execution/context.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ use crate::optimizer::optimizer::OptimizerRule;
6161
use crate::optimizer::projection_push_down::ProjectionPushDown;
6262
use crate::optimizer::simplify_expressions::SimplifyExpressions;
6363
use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
64-
use crate::physical_optimizer::merge_exec::AddMergeExec;
64+
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
6565
use crate::physical_optimizer::repartition::Repartition;
6666

6767
use crate::physical_plan::csv::CsvReadOptions;
@@ -643,7 +643,7 @@ impl Default for ExecutionConfig {
643643
physical_optimizers: vec![
644644
Arc::new(CoalesceBatches::new()),
645645
Arc::new(Repartition::new()),
646-
Arc::new(AddMergeExec::new()),
646+
Arc::new(AddCoalescePartitionsExec::new()),
647647
],
648648
query_planner: Arc::new(DefaultQueryPlanner {}),
649649
default_catalog: "datafusion".to_owned(),

datafusion/src/physical_optimizer/merge_exec.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,27 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! AddMergeExec adds MergeExec to merge plans
19-
//! with more partitions into one partition when the node
20-
//! needs a single partition
18+
//! AddCoalescePartitionsExec adds CoalescePartitionsExec to plans
19+
//! with more than one partition, to coalesce them into one partition
20+
//! when the node needs a single partition
2121
use super::optimizer::PhysicalOptimizerRule;
2222
use crate::{
2323
error::Result,
24-
physical_plan::{merge::MergeExec, Distribution},
24+
physical_plan::{coalesce_partitions::CoalescePartitionsExec, Distribution},
2525
};
2626
use std::sync::Arc;
2727

28-
/// Introduces MergeExec
29-
pub struct AddMergeExec {}
28+
/// Introduces CoalescePartitionsExec
29+
pub struct AddCoalescePartitionsExec {}
3030

31-
impl AddMergeExec {
31+
impl AddCoalescePartitionsExec {
3232
#[allow(missing_docs)]
3333
pub fn new() -> Self {
3434
Self {}
3535
}
3636
}
3737

38-
impl PhysicalOptimizerRule for AddMergeExec {
38+
impl PhysicalOptimizerRule for AddCoalescePartitionsExec {
3939
fn optimize(
4040
&self,
4141
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
@@ -60,7 +60,7 @@ impl PhysicalOptimizerRule for AddMergeExec {
6060
if child.output_partitioning().partition_count() == 1 {
6161
child.clone()
6262
} else {
63-
Arc::new(MergeExec::new(child.clone()))
63+
Arc::new(CoalescePartitionsExec::new(child.clone()))
6464
}
6565
})
6666
.collect(),

datafusion/src/physical_plan/merge.rs datafusion/src/physical_plan/coalesce_partitions.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,15 @@ use pin_project_lite::pin_project;
4040
/// Merge execution plan executes partitions in parallel and combines them into a single
4141
/// partition. No guarantees are made about the order of the resulting partition.
4242
#[derive(Debug)]
43-
pub struct MergeExec {
43+
pub struct CoalescePartitionsExec {
4444
/// Input execution plan
4545
input: Arc<dyn ExecutionPlan>,
4646
}
4747

48-
impl MergeExec {
48+
impl CoalescePartitionsExec {
4949
/// Create a new MergeExec
5050
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
51-
MergeExec { input }
51+
CoalescePartitionsExec { input }
5252
}
5353

5454
/// Input execution plan
@@ -58,7 +58,7 @@ impl MergeExec {
5858
}
5959

6060
#[async_trait]
61-
impl ExecutionPlan for MergeExec {
61+
impl ExecutionPlan for CoalescePartitionsExec {
6262
/// Return a reference to Any that can be used for downcasting
6363
fn as_any(&self) -> &dyn Any {
6464
self
@@ -82,7 +82,7 @@ impl ExecutionPlan for MergeExec {
8282
children: Vec<Arc<dyn ExecutionPlan>>,
8383
) -> Result<Arc<dyn ExecutionPlan>> {
8484
match children.len() {
85-
1 => Ok(Arc::new(MergeExec::new(children[0].clone()))),
85+
1 => Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone()))),
8686
_ => Err(DataFusionError::Internal(
8787
"MergeExec wrong number of children".to_string(),
8888
)),
@@ -194,7 +194,7 @@ mod tests {
194194
// input should have 4 partitions
195195
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
196196

197-
let merge = MergeExec::new(Arc::new(csv));
197+
let merge = CoalescePartitionsExec::new(Arc::new(csv));
198198

199199
// output of MergeExec should have a single partition
200200
assert_eq!(merge.output_partitioning().partition_count(), 1);

datafusion/src/physical_plan/cross_join.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ use arrow::record_batch::RecordBatch;
2727

2828
use futures::{Stream, TryStreamExt};
2929

30-
use super::{hash_utils::check_join_is_valid, merge::MergeExec};
30+
use super::{
31+
coalesce_partitions::CoalescePartitionsExec, hash_utils::check_join_is_valid,
32+
};
3133
use crate::{
3234
error::{DataFusionError, Result},
3335
scalar::ScalarValue,
@@ -144,7 +146,7 @@ impl ExecutionPlan for CrossJoinExec {
144146
let start = Instant::now();
145147

146148
// merge all left parts into a single stream
147-
let merge = MergeExec::new(self.left.clone());
149+
let merge = CoalescePartitionsExec::new(self.left.clone());
148150
let stream = merge.execute(0).await?;
149151

150152
// Load all batches and count the rows

datafusion/src/physical_plan/hash_aggregate.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1230,7 +1230,7 @@ mod tests {
12301230
use crate::physical_plan::expressions::{col, Avg};
12311231
use crate::{assert_batches_sorted_eq, physical_plan::common};
12321232

1233-
use crate::physical_plan::merge::MergeExec;
1233+
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
12341234

12351235
/// some mock data to aggregates
12361236
fn some_data() -> (Arc<Schema>, Vec<RecordBatch>) {
@@ -1298,7 +1298,7 @@ mod tests {
12981298
];
12991299
assert_batches_sorted_eq!(expected, &result);
13001300

1301-
let merge = Arc::new(MergeExec::new(partial_aggregate));
1301+
let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate));
13021302

13031303
let final_group: Vec<Arc<dyn PhysicalExpr>> = (0..groups.len())
13041304
.map(|i| col(&groups[i].1, &input_schema))

datafusion/src/physical_plan/hash_join.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ use arrow::array::{
5454

5555
use super::expressions::Column;
5656
use super::{
57+
coalesce_partitions::CoalescePartitionsExec,
5758
hash_utils::{build_join_schema, check_join_is_valid, JoinOn, JoinType},
58-
merge::MergeExec,
5959
};
6060
use crate::error::{DataFusionError, Result};
6161

@@ -260,7 +260,7 @@ impl ExecutionPlan for HashJoinExec {
260260
let start = Instant::now();
261261

262262
// merge all left parts into a single stream
263-
let merge = MergeExec::new(self.left.clone());
263+
let merge = CoalescePartitionsExec::new(self.left.clone());
264264
let stream = merge.execute(0).await?;
265265

266266
// This operation performs 2 steps at once:

datafusion/src/physical_plan/limit.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,9 @@ mod tests {
295295
use common::collect;
296296

297297
use super::*;
298+
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
298299
use crate::physical_plan::common;
299300
use crate::physical_plan::csv::{CsvExec, CsvReadOptions};
300-
use crate::physical_plan::merge::MergeExec;
301301
use crate::test;
302302

303303
#[tokio::test]
@@ -319,7 +319,8 @@ mod tests {
319319
// input should have 4 partitions
320320
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);
321321

322-
let limit = GlobalLimitExec::new(Arc::new(MergeExec::new(Arc::new(csv))), 7);
322+
let limit =
323+
GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7);
323324

324325
// the result should contain 4 batches (one per input partition)
325326
let iter = limit.execute(0).await?;

0 commit comments

Comments
 (0)