Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename MergeExec to CoalescePartitionsExec #635

Merged
merged 2 commits into from
Jun 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ message PhysicalPlanNode {
SortExecNode sort = 11;
CoalesceBatchesExecNode coalesce_batches = 12;
FilterExecNode filter = 13;
MergeExecNode merge = 14;
CoalescePartitionsExecNode merge = 14;
UnresolvedShuffleExecNode unresolved = 15;
RepartitionExecNode repartition = 16;
WindowAggExecNode window = 17;
Expand Down Expand Up @@ -648,7 +648,7 @@ message CoalesceBatchesExecNode {
uint32 target_batch_size = 2;
}

message MergeExecNode {
message CoalescePartitionsExecNode {
PhysicalPlanNode input = 1;
}

Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ use datafusion::execution::context::{
};
use datafusion::logical_plan::{window_frames::WindowFrame, DFSchema, Expr};
use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::PartitionMode;
use datafusion::physical_plan::merge::MergeExec;
use datafusion::physical_plan::planner::DefaultPhysicalPlanner;
use datafusion::physical_plan::window_functions::{
BuiltInWindowFunction, WindowFunction,
Expand Down Expand Up @@ -147,7 +147,7 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
}
PhysicalPlanType::Merge(merge) => {
let input: Arc<dyn ExecutionPlan> = convert_box_required!(merge.input)?;
Ok(Arc::new(MergeExec::new(input)))
Ok(Arc::new(CoalescePartitionsExec::new(input)))
}
PhysicalPlanType::Repartition(repart) => {
let input: Arc<dyn ExecutionPlan> = convert_box_required!(repart.input)?;
Expand Down
6 changes: 3 additions & 3 deletions ballista/rust/core/src/serde/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec};
use crate::serde::protobuf::repartition_exec_node::PartitionMethod;
use crate::serde::scheduler::PartitionLocation;
use crate::serde::{protobuf, BallistaError};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr};
use datafusion::physical_plan::merge::MergeExec;
use datafusion::physical_plan::repartition::RepartitionExec;

impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
Expand Down Expand Up @@ -292,11 +292,11 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
},
)),
})
} else if let Some(exec) = plan.downcast_ref::<MergeExec>() {
} else if let Some(exec) = plan.downcast_ref::<CoalescePartitionsExec>() {
let input: protobuf::PhysicalPlanNode = exec.input().to_owned().try_into()?;
Ok(protobuf::PhysicalPlanNode {
physical_plan_type: Some(PhysicalPlanType::Merge(Box::new(
protobuf::MergeExecNode {
protobuf::CoalescePartitionsExecNode {
input: Some(Box::new(input)),
},
))),
Expand Down
14 changes: 9 additions & 5 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ use datafusion::arrow::{
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
use datafusion::logical_plan::Operator;
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
use datafusion::physical_optimizer::merge_exec::AddMergeExec;
use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::csv::CsvExec;
use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::merge::MergeExec;
use datafusion::physical_plan::parquet::ParquetExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sort::SortExec;
Expand Down Expand Up @@ -177,8 +177,12 @@ fn build_exec_plan_diagram(
.is_some()
{
"CoalesceBatchesExec"
} else if plan.as_any().downcast_ref::<MergeExec>().is_some() {
"MergeExec"
} else if plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
.is_some()
{
"CoalescePartitionsExec"
} else {
println!("Unknown: {:?}", plan);
"Unknown"
Expand Down Expand Up @@ -226,7 +230,7 @@ pub fn create_datafusion_context() -> ExecutionContext {
// remove Repartition rule because that isn't supported yet
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Arc::new(CoalesceBatches::new()),
Arc::new(AddMergeExec::new()),
Arc::new(AddCoalescePartitionsExec::new()),
];
let config = ExecutionConfig::new()
.with_concurrency(1)
Expand Down
19 changes: 12 additions & 7 deletions ballista/rust/scheduler/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ use ballista_core::{
};
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
use datafusion::physical_optimizer::merge_exec::AddMergeExec;
use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
use datafusion::physical_plan::hash_join::HashJoinExec;
use datafusion::physical_plan::merge::MergeExec;
use datafusion::physical_plan::windows::WindowAggExec;
use datafusion::physical_plan::ExecutionPlan;
use log::info;
Expand Down Expand Up @@ -101,12 +101,15 @@ impl DistributedPlanner {
// remove Repartition rule because that isn't supported yet
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Arc::new(CoalesceBatches::new()),
Arc::new(AddMergeExec::new()),
Arc::new(AddCoalescePartitionsExec::new()),
];
let config = ExecutionConfig::new().with_physical_optimizer_rules(rules);
let ctx = ExecutionContext::with_config(config);
Ok((ctx.create_physical_plan(&adapter.logical_plan)?, stages))
} else if let Some(merge) = execution_plan.as_any().downcast_ref::<MergeExec>() {
} else if let Some(merge) = execution_plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
{
let query_stage = create_query_stage(
job_id,
self.next_stage_id(),
Expand Down Expand Up @@ -244,8 +247,10 @@ mod test {
use ballista_core::serde::protobuf;
use datafusion::physical_plan::hash_aggregate::HashAggregateExec;
use datafusion::physical_plan::sort::SortExec;
use datafusion::physical_plan::{
coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec,
};
use datafusion::physical_plan::{displayable, ExecutionPlan};
use datafusion::physical_plan::{merge::MergeExec, projection::ProjectionExec};
use std::convert::TryInto;
use std::sync::Arc;
use uuid::Uuid;
Expand Down Expand Up @@ -284,7 +289,7 @@ mod test {
HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"]
CsvExec: testdata/lineitem; partitions=2
QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2
MergeExec
CoalescePartitionsExec
UnresolvedShuffleExec: stages=[1]
QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3
SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext
Expand All @@ -309,7 +314,7 @@ mod test {
assert_eq!(unresolved_shuffle.query_stage_ids, vec![2]);

let merge_exec = stages[1].children()[0].clone();
let merge_exec = downcast_exec!(merge_exec, MergeExec);
let merge_exec = downcast_exec!(merge_exec, CoalescePartitionsExec);

let unresolved_shuffle = merge_exec.children()[0].clone();
let unresolved_shuffle =
Expand Down
4 changes: 2 additions & 2 deletions ballista/rust/scheduler/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use ballista_core::error::Result;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::execution::context::{ExecutionConfig, ExecutionContext};
use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches;
use datafusion::physical_optimizer::merge_exec::AddMergeExec;
use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::csv::CsvReadOptions;

Expand All @@ -33,7 +33,7 @@ pub const TPCH_TABLES: &[&str] = &[
pub fn datafusion_test_context(path: &str) -> Result<ExecutionContext> {
// remove Repartition rule because that isn't supported yet
let rules: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Arc::new(AddMergeExec::new()),
Arc::new(AddCoalescePartitionsExec::new()),
Arc::new(CoalesceBatches::new()),
];
let config = ExecutionConfig::new()
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::optimizer::optimizer::OptimizerRule;
use crate::optimizer::projection_push_down::ProjectionPushDown;
use crate::optimizer::simplify_expressions::SimplifyExpressions;
use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddMergeExec;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;

use crate::physical_plan::csv::CsvReadOptions;
Expand Down Expand Up @@ -643,7 +643,7 @@ impl Default for ExecutionConfig {
physical_optimizers: vec![
Arc::new(CoalesceBatches::new()),
Arc::new(Repartition::new()),
Arc::new(AddMergeExec::new()),
Arc::new(AddCoalescePartitionsExec::new()),
],
query_planner: Arc::new(DefaultQueryPlanner {}),
default_catalog: "datafusion".to_owned(),
Expand Down
18 changes: 9 additions & 9 deletions datafusion/src/physical_optimizer/merge_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,27 @@
// specific language governing permissions and limitations
// under the License.

//! AddMergeExec adds MergeExec to merge plans
//! with more partitions into one partition when the node
//! needs a single partition
//! AddCoalescePartitionsExec adds CoalescePartitionsExec to plans
//! with more than one partition, to coalesce them into one partition
//! when the node needs a single partition
use super::optimizer::PhysicalOptimizerRule;
use crate::{
error::Result,
physical_plan::{merge::MergeExec, Distribution},
physical_plan::{coalesce_partitions::CoalescePartitionsExec, Distribution},
};
use std::sync::Arc;

/// Introduces MergeExec
pub struct AddMergeExec {}
/// Introduces CoalescePartitionsExec
pub struct AddCoalescePartitionsExec {}

impl AddMergeExec {
impl AddCoalescePartitionsExec {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}

impl PhysicalOptimizerRule for AddMergeExec {
impl PhysicalOptimizerRule for AddCoalescePartitionsExec {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
Expand All @@ -60,7 +60,7 @@ impl PhysicalOptimizerRule for AddMergeExec {
if child.output_partitioning().partition_count() == 1 {
child.clone()
} else {
Arc::new(MergeExec::new(child.clone()))
Arc::new(CoalescePartitionsExec::new(child.clone()))
}
})
.collect(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ use pin_project_lite::pin_project;
/// Merge execution plan executes partitions in parallel and combines them into a single
/// partition. No guarantees are made about the order of the resulting partition.
#[derive(Debug)]
pub struct MergeExec {
pub struct CoalescePartitionsExec {
/// Input execution plan
input: Arc<dyn ExecutionPlan>,
}

impl MergeExec {
impl CoalescePartitionsExec {
/// Create a new MergeExec
pub fn new(input: Arc<dyn ExecutionPlan>) -> Self {
MergeExec { input }
CoalescePartitionsExec { input }
}

/// Input execution plan
Expand All @@ -58,7 +58,7 @@ impl MergeExec {
}

#[async_trait]
impl ExecutionPlan for MergeExec {
impl ExecutionPlan for CoalescePartitionsExec {
/// Return a reference to Any that can be used for downcasting
fn as_any(&self) -> &dyn Any {
self
Expand All @@ -82,7 +82,7 @@ impl ExecutionPlan for MergeExec {
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
match children.len() {
1 => Ok(Arc::new(MergeExec::new(children[0].clone()))),
1 => Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone()))),
_ => Err(DataFusionError::Internal(
"MergeExec wrong number of children".to_string(),
)),
Expand Down Expand Up @@ -194,7 +194,7 @@ mod tests {
// input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);

let merge = MergeExec::new(Arc::new(csv));
let merge = CoalescePartitionsExec::new(Arc::new(csv));

// output of MergeExec should have a single partition
assert_eq!(merge.output_partitioning().partition_count(), 1);
Expand Down
6 changes: 4 additions & 2 deletions datafusion/src/physical_plan/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use arrow::record_batch::RecordBatch;

use futures::{Stream, TryStreamExt};

use super::{hash_utils::check_join_is_valid, merge::MergeExec};
use super::{
coalesce_partitions::CoalescePartitionsExec, hash_utils::check_join_is_valid,
};
use crate::{
error::{DataFusionError, Result},
scalar::ScalarValue,
Expand Down Expand Up @@ -144,7 +146,7 @@ impl ExecutionPlan for CrossJoinExec {
let start = Instant::now();

// merge all left parts into a single stream
let merge = MergeExec::new(self.left.clone());
let merge = CoalescePartitionsExec::new(self.left.clone());
let stream = merge.execute(0).await?;

// Load all batches and count the rows
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,7 @@ mod tests {
use crate::physical_plan::expressions::{col, Avg};
use crate::{assert_batches_sorted_eq, physical_plan::common};

use crate::physical_plan::merge::MergeExec;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;

/// some mock data to aggregates
fn some_data() -> (Arc<Schema>, Vec<RecordBatch>) {
Expand Down Expand Up @@ -1298,7 +1298,7 @@ mod tests {
];
assert_batches_sorted_eq!(expected, &result);

let merge = Arc::new(MergeExec::new(partial_aggregate));
let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate));

let final_group: Vec<Arc<dyn PhysicalExpr>> = (0..groups.len())
.map(|i| col(&groups[i].1, &input_schema))
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/physical_plan/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ use arrow::array::{

use super::expressions::Column;
use super::{
coalesce_partitions::CoalescePartitionsExec,
hash_utils::{build_join_schema, check_join_is_valid, JoinOn, JoinType},
merge::MergeExec,
};
use crate::error::{DataFusionError, Result};

Expand Down Expand Up @@ -260,7 +260,7 @@ impl ExecutionPlan for HashJoinExec {
let start = Instant::now();

// merge all left parts into a single stream
let merge = MergeExec::new(self.left.clone());
let merge = CoalescePartitionsExec::new(self.left.clone());
let stream = merge.execute(0).await?;

// This operation performs 2 steps at once:
Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/physical_plan/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,9 @@ mod tests {
use common::collect;

use super::*;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::common;
use crate::physical_plan::csv::{CsvExec, CsvReadOptions};
use crate::physical_plan::merge::MergeExec;
use crate::test;

#[tokio::test]
Expand All @@ -319,7 +319,8 @@ mod tests {
// input should have 4 partitions
assert_eq!(csv.output_partitioning().partition_count(), num_partitions);

let limit = GlobalLimitExec::new(Arc::new(MergeExec::new(Arc::new(csv))), 7);
let limit =
GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7);

// the result should contain 4 batches (one per input partition)
let iter = limit.execute(0).await?;
Expand Down
Loading