From 8929a34d128cfdca844813ba642765fea78c2fd9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 27 Jun 2021 09:50:01 -0600 Subject: [PATCH 1/2] Rename MergeExec to CoalescePartitionsExec --- ballista/rust/core/proto/ballista.proto | 4 ++-- .../core/src/serde/physical_plan/from_proto.rs | 4 ++-- .../core/src/serde/physical_plan/to_proto.rs | 6 +++--- ballista/rust/core/src/utils.rs | 10 +++++----- ballista/rust/scheduler/src/planner.rs | 14 +++++++------- ballista/rust/scheduler/src/test_utils.rs | 4 ++-- datafusion/src/execution/context.rs | 4 ++-- .../src/physical_optimizer/merge_exec.rs | 18 +++++++++--------- .../{merge.rs => coalesce_partitions.rs} | 12 ++++++------ datafusion/src/physical_plan/cross_join.rs | 4 ++-- datafusion/src/physical_plan/hash_aggregate.rs | 4 ++-- datafusion/src/physical_plan/hash_join.rs | 4 ++-- datafusion/src/physical_plan/limit.rs | 4 ++-- datafusion/src/physical_plan/mod.rs | 6 +++--- datafusion/src/physical_plan/sort.rs | 4 ++-- .../src/physical_plan/sort_preserving_merge.rs | 4 ++-- 16 files changed, 53 insertions(+), 53 deletions(-) rename datafusion/src/physical_plan/{merge.rs => coalesce_partitions.rs} (95%) diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index 365d8e9fd9a4..2aa6102c2180 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -414,7 +414,7 @@ message PhysicalPlanNode { SortExecNode sort = 11; CoalesceBatchesExecNode coalesce_batches = 12; FilterExecNode filter = 13; - MergeExecNode merge = 14; + CoalescePartitionsExecNode merge = 14; UnresolvedShuffleExecNode unresolved = 15; RepartitionExecNode repartition = 16; WindowAggExecNode window = 17; @@ -648,7 +648,7 @@ message CoalesceBatchesExecNode { uint32 target_batch_size = 2; } -message MergeExecNode { +message CoalescePartitionsExecNode { PhysicalPlanNode input = 1; } diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 4b87be4105be..cc74181e8b2e 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -39,7 +39,7 @@ use datafusion::logical_plan::{window_frames::WindowFrame, DFSchema, Expr}; use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction}; use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use datafusion::physical_plan::hash_join::PartitionMode; -use datafusion::physical_plan::merge::MergeExec; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::planner::DefaultPhysicalPlanner; use datafusion::physical_plan::window_functions::{ BuiltInWindowFunction, WindowFunction, @@ -147,7 +147,7 @@ impl TryInto> for &protobuf::PhysicalPlanNode { } PhysicalPlanType::Merge(merge) => { let input: Arc = convert_box_required!(merge.input)?; - Ok(Arc::new(MergeExec::new(input))) + Ok(Arc::new(CoalescePartitionsExec::new(input))) } PhysicalPlanType::Repartition(repart) => { let input: Arc = convert_box_required!(repart.input)?; diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index cf5401b65019..24405bbbe799 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -60,7 +60,7 @@ use crate::serde::protobuf::repartition_exec_node::PartitionMethod; use crate::serde::scheduler::PartitionLocation; use crate::serde::{protobuf, BallistaError}; use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr}; -use datafusion::physical_plan::merge::MergeExec; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::repartition::RepartitionExec; impl TryInto for Arc { @@ -292,11 +292,11 @@ impl TryInto for Arc { }, )), }) - } else if let Some(exec) = plan.downcast_ref::() { + } else if let Some(exec) = plan.downcast_ref::() { let input: protobuf::PhysicalPlanNode = exec.input().to_owned().try_into()?; Ok(protobuf::PhysicalPlanNode { physical_plan_type: Some(PhysicalPlanType::Merge(Box::new( - protobuf::MergeExecNode { + protobuf::CoalescePartitionsExecNode { input: Some(Box::new(input)), }, ))), diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index b58be2800f7b..28a12430ffd9 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -40,7 +40,7 @@ use datafusion::arrow::{ use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; use datafusion::logical_plan::Operator; use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches; -use datafusion::physical_optimizer::merge_exec::AddMergeExec; +use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec; use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::csv::CsvExec; @@ -48,7 +48,7 @@ use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal}; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::hash_aggregate::HashAggregateExec; use datafusion::physical_plan::hash_join::HashJoinExec; -use datafusion::physical_plan::merge::MergeExec; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::parquet::ParquetExec; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::sort::SortExec; @@ -177,8 +177,8 @@ fn build_exec_plan_diagram( .is_some() { "CoalesceBatchesExec" - } else if plan.as_any().downcast_ref::().is_some() { - "MergeExec" + } else if plan.as_any().downcast_ref::().is_some() { + "CoalescePartitionsExec" } else { println!("Unknown: {:?}", plan); "Unknown" @@ -226,7 +226,7 @@ pub fn create_datafusion_context() -> ExecutionContext { // remove Repartition rule because that isn't supported yet let rules: Vec> = vec![ Arc::new(CoalesceBatches::new()), - Arc::new(AddMergeExec::new()), + Arc::new(AddCoalescePartitionsExec::new()), ]; let config = ExecutionConfig::new() .with_concurrency(1) diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 2ac9f6121e00..7e9432eb9bdd 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -30,11 +30,11 @@ use ballista_core::{ }; use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches; -use datafusion::physical_optimizer::merge_exec::AddMergeExec; +use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec; use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use datafusion::physical_plan::hash_join::HashJoinExec; -use datafusion::physical_plan::merge::MergeExec; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::windows::WindowAggExec; use datafusion::physical_plan::ExecutionPlan; use log::info; @@ -101,12 +101,12 @@ impl DistributedPlanner { // remove Repartition rule because that isn't supported yet let rules: Vec> = vec![ Arc::new(CoalesceBatches::new()), - Arc::new(AddMergeExec::new()), + Arc::new(AddCoalescePartitionsExec::new()), ]; let config = ExecutionConfig::new().with_physical_optimizer_rules(rules); let ctx = ExecutionContext::with_config(config); Ok((ctx.create_physical_plan(&adapter.logical_plan)?, stages)) - } else if let Some(merge) = execution_plan.as_any().downcast_ref::() { + } else if let Some(merge) = execution_plan.as_any().downcast_ref::() { let query_stage = create_query_stage( job_id, self.next_stage_id(), @@ -245,7 +245,7 @@ mod test { use datafusion::physical_plan::hash_aggregate::HashAggregateExec; use datafusion::physical_plan::sort::SortExec; use datafusion::physical_plan::{displayable, ExecutionPlan}; - use datafusion::physical_plan::{merge::MergeExec, projection::ProjectionExec}; + use datafusion::physical_plan::{coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec}; use std::convert::TryInto; use std::sync::Arc; use uuid::Uuid; @@ -284,7 +284,7 @@ mod test { HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"] CsvExec: testdata/lineitem; partitions=2 QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2 - MergeExec + CoalescePartitionsExec UnresolvedShuffleExec: stages=[1] QueryStageExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3 SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext @@ -309,7 +309,7 @@ mod test { assert_eq!(unresolved_shuffle.query_stage_ids, vec![2]); let merge_exec = stages[1].children()[0].clone(); - let merge_exec = downcast_exec!(merge_exec, MergeExec); + let merge_exec = downcast_exec!(merge_exec, CoalescePartitionsExec); let unresolved_shuffle = merge_exec.children()[0].clone(); let unresolved_shuffle = diff --git a/ballista/rust/scheduler/src/test_utils.rs b/ballista/rust/scheduler/src/test_utils.rs index 311f9a7a3de0..becb95f961ac 100644 --- a/ballista/rust/scheduler/src/test_utils.rs +++ b/ballista/rust/scheduler/src/test_utils.rs @@ -22,7 +22,7 @@ use ballista_core::error::Result; use datafusion::arrow::datatypes::{DataType, Field, Schema}; use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches; -use datafusion::physical_optimizer::merge_exec::AddMergeExec; +use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec; use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::csv::CsvReadOptions; @@ -33,7 +33,7 @@ pub const TPCH_TABLES: &[&str] = &[ pub fn datafusion_test_context(path: &str) -> Result { // remove Repartition rule because that isn't supported yet let rules: Vec> = vec![ - Arc::new(AddMergeExec::new()), + Arc::new(AddCoalescePartitionsExec::new()), Arc::new(CoalesceBatches::new()), ]; let config = ExecutionConfig::new() diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 8ce408de86a5..17625c92c696 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -61,7 +61,7 @@ use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::projection_push_down::ProjectionPushDown; use crate::optimizer::simplify_expressions::SimplifyExpressions; use crate::physical_optimizer::coalesce_batches::CoalesceBatches; -use crate::physical_optimizer::merge_exec::AddMergeExec; +use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec; use crate::physical_optimizer::repartition::Repartition; use crate::physical_plan::csv::CsvReadOptions; @@ -643,7 +643,7 @@ impl Default for ExecutionConfig { physical_optimizers: vec![ Arc::new(CoalesceBatches::new()), Arc::new(Repartition::new()), - Arc::new(AddMergeExec::new()), + Arc::new(AddCoalescePartitionsExec::new()), ], query_planner: Arc::new(DefaultQueryPlanner {}), default_catalog: "datafusion".to_owned(), diff --git a/datafusion/src/physical_optimizer/merge_exec.rs b/datafusion/src/physical_optimizer/merge_exec.rs index 877c0be00e1b..0127313bb1eb 100644 --- a/datafusion/src/physical_optimizer/merge_exec.rs +++ b/datafusion/src/physical_optimizer/merge_exec.rs @@ -15,27 +15,27 @@ // specific language governing permissions and limitations // under the License. -//! AddMergeExec adds MergeExec to merge plans -//! with more partitions into one partition when the node -//! needs a single partition +//! AddCoalescePartitionsExec adds CoalescePartitionsExec to plans +//! with more than one partition, to coalesce them into one partition +//! when the node needs a single partition use super::optimizer::PhysicalOptimizerRule; use crate::{ error::Result, - physical_plan::{merge::MergeExec, Distribution}, + physical_plan::{coalesce_partitions::CoalescePartitionsExec, Distribution}, }; use std::sync::Arc; -/// Introduces MergeExec -pub struct AddMergeExec {} +/// Introduces CoalescePartitionsExec +pub struct AddCoalescePartitionsExec {} -impl AddMergeExec { +impl AddCoalescePartitionsExec { #[allow(missing_docs)] pub fn new() -> Self { Self {} } } -impl PhysicalOptimizerRule for AddMergeExec { +impl PhysicalOptimizerRule for AddCoalescePartitionsExec { fn optimize( &self, plan: Arc, @@ -60,7 +60,7 @@ impl PhysicalOptimizerRule for AddMergeExec { if child.output_partitioning().partition_count() == 1 { child.clone() } else { - Arc::new(MergeExec::new(child.clone())) + Arc::new(CoalescePartitionsExec::new(child.clone())) } }) .collect(), diff --git a/datafusion/src/physical_plan/merge.rs b/datafusion/src/physical_plan/coalesce_partitions.rs similarity index 95% rename from datafusion/src/physical_plan/merge.rs rename to datafusion/src/physical_plan/coalesce_partitions.rs index a25f5c7909fd..94ff230b8125 100644 --- a/datafusion/src/physical_plan/merge.rs +++ b/datafusion/src/physical_plan/coalesce_partitions.rs @@ -40,15 +40,15 @@ use pin_project_lite::pin_project; /// Merge execution plan executes partitions in parallel and combines them into a single /// partition. No guarantees are made about the order of the resulting partition. #[derive(Debug)] -pub struct MergeExec { +pub struct CoalescePartitionsExec { /// Input execution plan input: Arc, } -impl MergeExec { +impl CoalescePartitionsExec { /// Create a new MergeExec pub fn new(input: Arc) -> Self { - MergeExec { input } + CoalescePartitionsExec { input } } /// Input execution plan @@ -58,7 +58,7 @@ impl MergeExec { } #[async_trait] -impl ExecutionPlan for MergeExec { +impl ExecutionPlan for CoalescePartitionsExec { /// Return a reference to Any that can be used for downcasting fn as_any(&self) -> &dyn Any { self @@ -82,7 +82,7 @@ impl ExecutionPlan for MergeExec { children: Vec>, ) -> Result> { match children.len() { - 1 => Ok(Arc::new(MergeExec::new(children[0].clone()))), + 1 => Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone()))), _ => Err(DataFusionError::Internal( "MergeExec wrong number of children".to_string(), )), @@ -194,7 +194,7 @@ mod tests { // input should have 4 partitions assert_eq!(csv.output_partitioning().partition_count(), num_partitions); - let merge = MergeExec::new(Arc::new(csv)); + let merge = CoalescePartitionsExec::new(Arc::new(csv)); // output of MergeExec should have a single partition assert_eq!(merge.output_partitioning().partition_count(), 1); diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index f6f5da4cf8db..17a48a10407c 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -27,7 +27,7 @@ use arrow::record_batch::RecordBatch; use futures::{Stream, TryStreamExt}; -use super::{hash_utils::check_join_is_valid, merge::MergeExec}; +use super::{hash_utils::check_join_is_valid, coalesce_partitions::CoalescePartitionsExec}; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, @@ -144,7 +144,7 @@ impl ExecutionPlan for CrossJoinExec { let start = Instant::now(); // merge all left parts into a single stream - let merge = MergeExec::new(self.left.clone()); + let merge = CoalescePartitionsExec::new(self.left.clone()); let stream = merge.execute(0).await?; // Load all batches and count the rows diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 250ba2b08306..e157243dd8c2 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -1230,7 +1230,7 @@ mod tests { use crate::physical_plan::expressions::{col, Avg}; use crate::{assert_batches_sorted_eq, physical_plan::common}; - use crate::physical_plan::merge::MergeExec; + use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; /// some mock data to aggregates fn some_data() -> (Arc, Vec) { @@ -1298,7 +1298,7 @@ mod tests { ]; assert_batches_sorted_eq!(expected, &result); - let merge = Arc::new(MergeExec::new(partial_aggregate)); + let merge = Arc::new(CoalescePartitionsExec::new(partial_aggregate)); let final_group: Vec> = (0..groups.len()) .map(|i| col(&groups[i].1, &input_schema)) diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index ad356079387a..0fddce939330 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -55,7 +55,7 @@ use arrow::array::{ use super::expressions::Column; use super::{ hash_utils::{build_join_schema, check_join_is_valid, JoinOn, JoinType}, - merge::MergeExec, + coalesce_partitions::CoalescePartitionsExec, }; use crate::error::{DataFusionError, Result}; @@ -260,7 +260,7 @@ impl ExecutionPlan for HashJoinExec { let start = Instant::now(); // merge all left parts into a single stream - let merge = MergeExec::new(self.left.clone()); + let merge = CoalescePartitionsExec::new(self.left.clone()); let stream = merge.execute(0).await?; // This operation performs 2 steps at once: diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs index c56dbe141b2d..e5660b4df121 100644 --- a/datafusion/src/physical_plan/limit.rs +++ b/datafusion/src/physical_plan/limit.rs @@ -297,7 +297,7 @@ mod tests { use super::*; use crate::physical_plan::common; use crate::physical_plan::csv::{CsvExec, CsvReadOptions}; - use crate::physical_plan::merge::MergeExec; + use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::test; #[tokio::test] @@ -319,7 +319,7 @@ mod tests { // input should have 4 partitions assert_eq!(csv.output_partitioning().partition_count(), num_partitions); - let limit = GlobalLimitExec::new(Arc::new(MergeExec::new(Arc::new(csv))), 7); + let limit = GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7); // the result should contain 4 batches (one per input partition) let iter = limit.execute(0).await?; diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 7f9f7eace835..14ad92772ff9 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -17,7 +17,7 @@ //! Traits for physical query plan, supporting parallel execution for partitioned relations. -use self::{display::DisplayableExecutionPlan, merge::MergeExec}; +use self::{display::DisplayableExecutionPlan, coalesce_partitions::CoalescePartitionsExec}; use crate::execution::context::ExecutionContextState; use crate::logical_plan::LogicalPlan; use crate::physical_plan::expressions::PhysicalSortExpr; @@ -315,7 +315,7 @@ pub async fn collect(plan: Arc) -> Result> { } _ => { // merge into a single partition - let plan = MergeExec::new(plan.clone()); + let plan = CoalescePartitionsExec::new(plan.clone()); // MergeExec must produce a single partition assert_eq!(1, plan.output_partitioning().partition_count()); common::collect(plan.execute(0).await?).await @@ -613,7 +613,7 @@ pub mod json; pub mod limit; pub mod math_expressions; pub mod memory; -pub mod merge; +pub mod coalesce_partitions; pub mod parquet; pub mod planner; pub mod projection; diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs index 365097822cc7..0c64820c9b03 100644 --- a/datafusion/src/physical_plan/sort.rs +++ b/datafusion/src/physical_plan/sort.rs @@ -317,7 +317,7 @@ mod tests { use super::*; use crate::physical_plan::expressions::col; use crate::physical_plan::memory::MemoryExec; - use crate::physical_plan::merge::MergeExec; + use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::{ collect, csv::{CsvExec, CsvReadOptions}, @@ -357,7 +357,7 @@ mod tests { options: SortOptions::default(), }, ], - Arc::new(MergeExec::new(Arc::new(csv))), + Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), )?); let result: Vec = collect(sort_exec).await?; diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index b8ca97cc5974..f8aa5f34dd05 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -545,7 +545,7 @@ mod tests { use crate::physical_plan::csv::CsvExec; use crate::physical_plan::expressions::col; use crate::physical_plan::memory::MemoryExec; - use crate::physical_plan::merge::MergeExec; + use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::sort::SortExec; use crate::physical_plan::{collect, common}; use crate::test; @@ -639,7 +639,7 @@ mod tests { src: Arc, sort: Vec, ) -> RecordBatch { - let merge = Arc::new(MergeExec::new(src)); + let merge = Arc::new(CoalescePartitionsExec::new(src)); let sort_exec = Arc::new(SortExec::try_new(sort, merge).unwrap()); let mut result = collect(sort_exec).await.unwrap(); assert_eq!(result.len(), 1); From b3551a75848b4d08d66138e3cbc09e22eeba8a4c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 27 Jun 2021 10:34:23 -0600 Subject: [PATCH 2/2] format --- .../rust/core/src/serde/physical_plan/from_proto.rs | 2 +- .../rust/core/src/serde/physical_plan/to_proto.rs | 2 +- ballista/rust/core/src/utils.rs | 8 ++++++-- ballista/rust/scheduler/src/planner.rs | 11 ++++++++--- datafusion/src/physical_plan/cross_join.rs | 4 +++- datafusion/src/physical_plan/hash_join.rs | 2 +- datafusion/src/physical_plan/limit.rs | 5 +++-- datafusion/src/physical_plan/mod.rs | 6 ++++-- datafusion/src/physical_plan/sort.rs | 2 +- datafusion/src/physical_plan/sort_preserving_merge.rs | 2 +- 10 files changed, 29 insertions(+), 15 deletions(-) diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index cc74181e8b2e..83cbdb4ccec4 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -37,9 +37,9 @@ use datafusion::execution::context::{ }; use datafusion::logical_plan::{window_frames::WindowFrame, DFSchema, Expr}; use datafusion::physical_plan::aggregates::{create_aggregate_expr, AggregateFunction}; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use datafusion::physical_plan::hash_join::PartitionMode; -use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::planner::DefaultPhysicalPlanner; use datafusion::physical_plan::window_functions::{ BuiltInWindowFunction, WindowFunction, diff --git a/ballista/rust/core/src/serde/physical_plan/to_proto.rs b/ballista/rust/core/src/serde/physical_plan/to_proto.rs index 24405bbbe799..306abc166ad5 100644 --- a/ballista/rust/core/src/serde/physical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/to_proto.rs @@ -59,8 +59,8 @@ use crate::execution_plans::{ShuffleReaderExec, UnresolvedShuffleExec}; use crate::serde::protobuf::repartition_exec_node::PartitionMethod; use crate::serde::scheduler::PartitionLocation; use crate::serde::{protobuf, BallistaError}; -use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::functions::{BuiltinScalarFunction, ScalarFunctionExpr}; use datafusion::physical_plan::repartition::RepartitionExec; impl TryInto for Arc { diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index 28a12430ffd9..26bdb00f34fb 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -43,12 +43,12 @@ use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches; use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec; use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::csv::CsvExec; use datafusion::physical_plan::expressions::{BinaryExpr, Column, Literal}; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::hash_aggregate::HashAggregateExec; use datafusion::physical_plan::hash_join::HashJoinExec; -use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::parquet::ParquetExec; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::sort::SortExec; @@ -177,7 +177,11 @@ fn build_exec_plan_diagram( .is_some() { "CoalesceBatchesExec" - } else if plan.as_any().downcast_ref::().is_some() { + } else if plan + .as_any() + .downcast_ref::() + .is_some() + { "CoalescePartitionsExec" } else { println!("Unknown: {:?}", plan); diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 7e9432eb9bdd..32fc9a9e25eb 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -32,9 +32,9 @@ use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches; use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec; use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use datafusion::physical_plan::hash_join::HashJoinExec; -use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::windows::WindowAggExec; use datafusion::physical_plan::ExecutionPlan; use log::info; @@ -106,7 +106,10 @@ impl DistributedPlanner { let config = ExecutionConfig::new().with_physical_optimizer_rules(rules); let ctx = ExecutionContext::with_config(config); Ok((ctx.create_physical_plan(&adapter.logical_plan)?, stages)) - } else if let Some(merge) = execution_plan.as_any().downcast_ref::() { + } else if let Some(merge) = execution_plan + .as_any() + .downcast_ref::() + { let query_stage = create_query_stage( job_id, self.next_stage_id(), @@ -244,8 +247,10 @@ mod test { use ballista_core::serde::protobuf; use datafusion::physical_plan::hash_aggregate::HashAggregateExec; use datafusion::physical_plan::sort::SortExec; + use datafusion::physical_plan::{ + coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec, + }; use datafusion::physical_plan::{displayable, ExecutionPlan}; - use datafusion::physical_plan::{coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec}; use std::convert::TryInto; use std::sync::Arc; use uuid::Uuid; diff --git a/datafusion/src/physical_plan/cross_join.rs b/datafusion/src/physical_plan/cross_join.rs index 17a48a10407c..98ad3440aa4a 100644 --- a/datafusion/src/physical_plan/cross_join.rs +++ b/datafusion/src/physical_plan/cross_join.rs @@ -27,7 +27,9 @@ use arrow::record_batch::RecordBatch; use futures::{Stream, TryStreamExt}; -use super::{hash_utils::check_join_is_valid, coalesce_partitions::CoalescePartitionsExec}; +use super::{ + coalesce_partitions::CoalescePartitionsExec, hash_utils::check_join_is_valid, +}; use crate::{ error::{DataFusionError, Result}, scalar::ScalarValue, diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 0fddce939330..eb5ceaf0d949 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -54,8 +54,8 @@ use arrow::array::{ use super::expressions::Column; use super::{ - hash_utils::{build_join_schema, check_join_is_valid, JoinOn, JoinType}, coalesce_partitions::CoalescePartitionsExec, + hash_utils::{build_join_schema, check_join_is_valid, JoinOn, JoinType}, }; use crate::error::{DataFusionError, Result}; diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs index e5660b4df121..361e26e5e94e 100644 --- a/datafusion/src/physical_plan/limit.rs +++ b/datafusion/src/physical_plan/limit.rs @@ -295,9 +295,9 @@ mod tests { use common::collect; use super::*; + use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::common; use crate::physical_plan::csv::{CsvExec, CsvReadOptions}; - use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::test; #[tokio::test] @@ -319,7 +319,8 @@ mod tests { // input should have 4 partitions assert_eq!(csv.output_partitioning().partition_count(), num_partitions); - let limit = GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7); + let limit = + GlobalLimitExec::new(Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), 7); // the result should contain 4 batches (one per input partition) let iter = limit.execute(0).await?; diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 14ad92772ff9..2122751abb60 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -17,7 +17,9 @@ //! Traits for physical query plan, supporting parallel execution for partitioned relations. -use self::{display::DisplayableExecutionPlan, coalesce_partitions::CoalescePartitionsExec}; +use self::{ + coalesce_partitions::CoalescePartitionsExec, display::DisplayableExecutionPlan, +}; use crate::execution::context::ExecutionContextState; use crate::logical_plan::LogicalPlan; use crate::physical_plan::expressions::PhysicalSortExpr; @@ -592,6 +594,7 @@ pub trait Accumulator: Send + Sync + Debug { pub mod aggregates; pub mod array_expressions; pub mod coalesce_batches; +pub mod coalesce_partitions; pub mod common; pub mod cross_join; #[cfg(feature = "crypto_expressions")] @@ -613,7 +616,6 @@ pub mod json; pub mod limit; pub mod math_expressions; pub mod memory; -pub mod coalesce_partitions; pub mod parquet; pub mod planner; pub mod projection; diff --git a/datafusion/src/physical_plan/sort.rs b/datafusion/src/physical_plan/sort.rs index 0c64820c9b03..faaa10d10936 100644 --- a/datafusion/src/physical_plan/sort.rs +++ b/datafusion/src/physical_plan/sort.rs @@ -315,9 +315,9 @@ impl RecordBatchStream for SortStream { #[cfg(test)] mod tests { use super::*; + use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::expressions::col; use crate::physical_plan::memory::MemoryExec; - use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::{ collect, csv::{CsvExec, CsvReadOptions}, diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index f8aa5f34dd05..316f0509960d 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -542,10 +542,10 @@ mod tests { use crate::arrow::array::{Int32Array, StringArray, TimestampNanosecondArray}; use crate::assert_batches_eq; use crate::datasource::CsvReadOptions; + use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::csv::CsvExec; use crate::physical_plan::expressions::col; use crate::physical_plan::memory::MemoryExec; - use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::sort::SortExec; use crate::physical_plan::{collect, common}; use crate::test;