From 6f4cfd856f33892e3dabf6f87cdbe894b0930887 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 30 Jun 2021 06:30:54 -0600 Subject: [PATCH 1/2] Refactor Ballista planner to support RepartitionExec --- ballista/rust/core/src/utils.rs | 11 +-- ballista/rust/scheduler/src/planner.rs | 106 ++++++++-------------- ballista/rust/scheduler/src/test_utils.rs | 18 +--- 3 files changed, 41 insertions(+), 94 deletions(-) diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index d043763dc6f1..8a510f480876 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -227,16 +227,7 @@ fn build_exec_plan_diagram( /// Create a DataFusion context that is compatible with Ballista pub fn create_datafusion_context() -> ExecutionContext { - // remove Repartition rule because that isn't supported yet - let rules: Vec> = vec![ - Arc::new(CoalesceBatches::new()), - Arc::new(AddCoalescePartitionsExec::new()), - ]; - let config = ExecutionConfig::new() - .with_concurrency(1) - .with_repartition_joins(false) - .with_repartition_aggregations(false) - .with_physical_optimizer_rules(rules); + let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins ExecutionContext::with_config(config) } diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 70d90a4a07d0..258530c2040a 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -28,15 +28,11 @@ use ballista_core::{ execution_plans::{ShuffleReaderExec, ShuffleWriterExec, UnresolvedShuffleExec}, serde::scheduler::PartitionLocation, }; -use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; -use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches; -use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec; -use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule; +use datafusion::execution::context::ExecutionContext; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; -use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; -use datafusion::physical_plan::hash_join::HashJoinExec; +use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::windows::WindowAggExec; -use datafusion::physical_plan::ExecutionPlan; +use datafusion::physical_plan::{ExecutionPlan, Partitioning}; use log::info; type PartialQueryStageResult = (Arc, Vec>); @@ -71,13 +67,18 @@ impl DistributedPlanner { info!("planning query stages"); let (new_plan, mut stages) = self.plan_query_stages_internal(job_id, execution_plan)?; - stages.push(create_query_stage(job_id, self.next_stage_id(), new_plan)?); + stages.push(create_shuffle_writer( + job_id, + self.next_stage_id(), + new_plan, + None, + )?); Ok(stages) } /// Returns a potentially modified version of the input execution_plan along with the resulting query stages. /// This function is needed because the input execution_plan might need to be modified, but it might not hold a - /// compelte query stage (its parent might also belong to the same stage) + /// complete query stage (its parent might also belong to the same stage) fn plan_query_stages_internal( &mut self, job_id: &str, @@ -98,22 +99,17 @@ impl DistributedPlanner { } if let Some(adapter) = execution_plan.as_any().downcast_ref::() { - // remove Repartition rule because that isn't supported yet - let rules: Vec> = vec![ - Arc::new(CoalesceBatches::new()), - Arc::new(AddCoalescePartitionsExec::new()), - ]; - let config = ExecutionConfig::new().with_physical_optimizer_rules(rules); - let ctx = ExecutionContext::with_config(config); + let ctx = ExecutionContext::new(); Ok((ctx.create_physical_plan(&adapter.logical_plan)?, stages)) - } else if let Some(merge) = execution_plan + } else if let Some(coalesce) = execution_plan .as_any() .downcast_ref::() { - let query_stage = create_query_stage( + let query_stage = create_shuffle_writer( job_id, self.next_stage_id(), - merge.children()[0].clone(), + coalesce.children()[0].clone(), + None, )?; let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( vec![query_stage.stage_id()], @@ -121,35 +117,26 @@ impl DistributedPlanner { query_stage.output_partitioning().partition_count(), )); stages.push(query_stage); - Ok((merge.with_new_children(vec![unresolved_shuffle])?, stages)) - } else if let Some(agg) = - execution_plan.as_any().downcast_ref::() + Ok(( + coalesce.with_new_children(vec![unresolved_shuffle])?, + stages, + )) + } else if let Some(repart) = + execution_plan.as_any().downcast_ref::() { - //TODO should insert query stages in more generic way based on partitioning metadata - // and not specifically for this operator - match agg.mode() { - AggregateMode::Final | AggregateMode::FinalPartitioned => { - let mut new_children: Vec> = vec![]; - for child in &children { - let new_stage = create_query_stage( - job_id, - self.next_stage_id(), - child.clone(), - )?; - new_children.push(Arc::new(UnresolvedShuffleExec::new( - vec![new_stage.stage_id()], - new_stage.schema().clone(), - new_stage.output_partitioning().partition_count(), - ))); - stages.push(new_stage); - } - Ok((agg.with_new_children(new_children)?, stages)) - } - AggregateMode::Partial => Ok((agg.with_new_children(children)?, stages)), - } - } else if let Some(join) = execution_plan.as_any().downcast_ref::() - { - Ok((join.with_new_children(children)?, stages)) + let query_stage = create_shuffle_writer( + job_id, + self.next_stage_id(), + repart.children()[0].clone(), + Some(repart.partitioning().to_owned()), + )?; + let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( + vec![query_stage.stage_id()], + query_stage.schema(), + query_stage.output_partitioning().partition_count(), + )); + stages.push(query_stage); + Ok((unresolved_shuffle, stages)) } else if let Some(window) = execution_plan.as_any().downcast_ref::() { @@ -158,25 +145,7 @@ impl DistributedPlanner { window ))) } else { - // TODO check for compatible partitioning schema, not just count - if execution_plan.output_partitioning().partition_count() - != children[0].output_partitioning().partition_count() - { - let mut new_children: Vec> = vec![]; - for child in &children { - let new_stage = - create_query_stage(job_id, self.next_stage_id(), child.clone())?; - new_children.push(Arc::new(UnresolvedShuffleExec::new( - vec![new_stage.stage_id()], - new_stage.schema().clone(), - new_stage.output_partitioning().partition_count(), - ))); - stages.push(new_stage); - } - Ok((execution_plan.with_new_children(new_children)?, stages)) - } else { - Ok((execution_plan.with_new_children(children)?, stages)) - } + Ok((execution_plan.with_new_children(children)?, stages)) } } @@ -224,17 +193,18 @@ pub fn remove_unresolved_shuffles( Ok(stage.with_new_children(new_children)?) } -fn create_query_stage( +fn create_shuffle_writer( job_id: &str, stage_id: usize, plan: Arc, + partitioning: Option, ) -> Result> { Ok(Arc::new(ShuffleWriterExec::try_new( job_id.to_owned(), stage_id, plan, "".to_owned(), // executor will decide on the work_dir path - None, + partitioning, )?)) } diff --git a/ballista/rust/scheduler/src/test_utils.rs b/ballista/rust/scheduler/src/test_utils.rs index becb95f961ac..2a35b7c17816 100644 --- a/ballista/rust/scheduler/src/test_utils.rs +++ b/ballista/rust/scheduler/src/test_utils.rs @@ -15,15 +15,10 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; - use ballista_core::error::Result; use datafusion::arrow::datatypes::{DataType, Field, Schema}; -use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; -use datafusion::physical_optimizer::coalesce_batches::CoalesceBatches; -use datafusion::physical_optimizer::merge_exec::AddCoalescePartitionsExec; -use datafusion::physical_optimizer::optimizer::PhysicalOptimizerRule; +use datafusion::execution::context::ExecutionContext; use datafusion::physical_plan::csv::CsvReadOptions; pub const TPCH_TABLES: &[&str] = &[ @@ -31,16 +26,7 @@ pub const TPCH_TABLES: &[&str] = &[ ]; pub fn datafusion_test_context(path: &str) -> Result { - // remove Repartition rule because that isn't supported yet - let rules: Vec> = vec![ - Arc::new(AddCoalescePartitionsExec::new()), - Arc::new(CoalesceBatches::new()), - ]; - let config = ExecutionConfig::new() - .with_physical_optimizer_rules(rules) - .with_repartition_aggregations(false); - let mut ctx = ExecutionContext::with_config(config); - + let mut ctx = ExecutionContext::new(); for table in TPCH_TABLES { let schema = get_tpch_schema(table); let options = CsvReadOptions::new() From b54a3518175829bba4283525fd8d1c4031b7ac99 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 3 Jul 2021 09:26:27 -0600 Subject: [PATCH 2/2] Improve tests and replace MergeExec with CoalescePartitionsExec in query plan output --- .../src/execution_plans/shuffle_writer.rs | 20 ++++- .../src/execution_plans/unresolved_shuffle.rs | 15 +++- ballista/rust/scheduler/src/planner.rs | 83 +++++++++++++------ ballista/rust/scheduler/src/test_utils.rs | 5 +- datafusion/src/lib.rs | 2 +- .../src/physical_plan/coalesce_partitions.rs | 14 ++-- .../src/physical_plan/hash_aggregate.rs | 2 +- datafusion/src/physical_plan/limit.rs | 2 +- datafusion/src/physical_plan/mod.rs | 2 +- datafusion/src/physical_plan/planner.rs | 2 +- datafusion/tests/sql.rs | 2 +- 11 files changed, 106 insertions(+), 43 deletions(-) diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs b/ballista/rust/core/src/execution_plans/shuffle_writer.rs index 2d8d78324d2d..7fffaba13217 100644 --- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs @@ -42,7 +42,9 @@ use datafusion::arrow::ipc::writer::FileWriter; use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::{DataFusionError, Result}; use datafusion::physical_plan::hash_join::create_hashes; -use datafusion::physical_plan::{ExecutionPlan, Partitioning, RecordBatchStream}; +use datafusion::physical_plan::{ + DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream, +}; use futures::StreamExt; use log::info; use std::fs::File; @@ -307,6 +309,22 @@ impl ExecutionPlan for ShuffleWriterExec { )), } } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!( + f, + "ShuffleWriterExec: {:?}", + self.shuffle_output_partitioning + ) + } + } + } } fn result_schema() -> SchemaRef { diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs index 9c53bc7a1d43..49b4f7a0992c 100644 --- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs +++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs @@ -23,12 +23,13 @@ use crate::serde::scheduler::PartitionLocation; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; -use datafusion::physical_plan::{ExecutionPlan, Partitioning}; +use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; use datafusion::{ error::{DataFusionError, Result}, physical_plan::RecordBatchStream, }; use log::info; +use std::fmt::Formatter; /// UnresolvedShuffleExec represents a dependency on the results of several ShuffleWriterExec nodes which haven't been computed yet. /// @@ -97,4 +98,16 @@ impl ExecutionPlan for UnresolvedShuffleExec { "Ballista UnresolvedShuffleExec does not support execution".to_owned(), )) } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "UnresolvedShuffleExec") + } + } + } } diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 258530c2040a..319526142bf9 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -215,7 +215,7 @@ mod test { use ballista_core::error::BallistaError; use ballista_core::execution_plans::UnresolvedShuffleExec; use ballista_core::serde::protobuf; - use datafusion::physical_plan::hash_aggregate::HashAggregateExec; + use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use datafusion::physical_plan::sort::SortExec; use datafusion::physical_plan::{ coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec, @@ -232,7 +232,7 @@ mod test { } #[test] - fn test() -> Result<(), BallistaError> { + fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> { let mut ctx = datafusion_test_context("testdata")?; // simplified form of TPC-H query 1 @@ -255,41 +255,72 @@ mod test { } /* Expected result: - ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=1 - HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"] - CsvExec: testdata/lineitem; partitions=2 - ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=2 - CoalescePartitionsExec - UnresolvedShuffleExec: stages=[1] - ShuffleWriterExec: job=f011432e-e424-4016-915d-e3d8b84f6dbd, stage=3 - SortExec { input: ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_ext - ProjectionExec { expr: [(Column { name: "l_returnflag" }, "l_returnflag"), (Column { name: "SUM(l_extendedprice Multip - HashAggregateExec: groupBy=["l_returnflag"], aggrExpr=["SUM(l_extendedprice Multiply Int64(1)) [\"l_extendedprice * CAST(1 AS Float64)\"]"] - UnresolvedShuffleExec: stages=[2] + + ShuffleWriterExec: Some(Hash([Column { name: "l_returnflag", index: 0 }], 2)) + HashAggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))] + CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false + + ShuffleWriterExec: None + ProjectionExec: expr=[l_returnflag@0 as l_returnflag, SUM(lineitem.l_extendedprice Multiply Int64(1))@1 as sum_disc_price] + HashAggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))] + CoalesceBatchesExec: target_batch_size=4096 + RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }], 2) + HashAggregateExec: mode=Partial, gby=[l_returnflag@1 as l_returnflag], aggr=[SUM(l_extendedprice Multiply Int64(1))] + CsvExec: source=Path(testdata/lineitem: [testdata/lineitem/partition0.tbl,testdata/lineitem/partition1.tbl]), has_header=false + + ShuffleWriterExec: None + SortExec: [l_returnflag@0 ASC] + CoalescePartitionsExec + UnresolvedShuffleExec */ - let sort = stages[2].children()[0].clone(); - let sort = downcast_exec!(sort, SortExec); + assert_eq!(3, stages.len()); - let projection = sort.children()[0].clone(); - println!("{:?}", projection); - let projection = downcast_exec!(projection, ProjectionExec); + // verify stage 0 + let stage0 = stages[0].children()[0].clone(); + let partial_hash = downcast_exec!(stage0, HashAggregateExec); + assert!(*partial_hash.mode() == AggregateMode::Partial); + // verify stage 1 + let stage1 = stages[1].children()[0].clone(); + let projection = downcast_exec!(stage1, ProjectionExec); let final_hash = projection.children()[0].clone(); let final_hash = downcast_exec!(final_hash, HashAggregateExec); - - let unresolved_shuffle = final_hash.children()[0].clone(); + assert!(*final_hash.mode() == AggregateMode::FinalPartitioned); + + // verify stage 2 + let stage2 = stages[2].children()[0].clone(); + let sort = downcast_exec!(stage2, SortExec); + let coalesce_partitions = sort.children()[0].clone(); + let coalesce_partitions = + downcast_exec!(coalesce_partitions, CoalescePartitionsExec); + let unresolved_shuffle = coalesce_partitions.children()[0].clone(); let unresolved_shuffle = downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec); assert_eq!(unresolved_shuffle.query_stage_ids, vec![2]); - let merge_exec = stages[1].children()[0].clone(); - let merge_exec = downcast_exec!(merge_exec, CoalescePartitionsExec); + Ok(()) + } - let unresolved_shuffle = merge_exec.children()[0].clone(); - let unresolved_shuffle = - downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec); - assert_eq!(unresolved_shuffle.query_stage_ids, vec![1]); + #[test] + fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> { + let mut ctx = datafusion_test_context("testdata")?; + + // simplified form of TPC-H query 1 + let df = ctx.sql( + "select l_returnflag, sum(l_extendedprice * 1) as sum_disc_price + from lineitem + group by l_returnflag + order by l_returnflag", + )?; + + let plan = df.to_logical_plan(); + let plan = ctx.optimize(&plan)?; + let plan = ctx.create_physical_plan(&plan)?; + + let mut planner = DistributedPlanner::new(); + let job_uuid = Uuid::new_v4(); + let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?; let partial_hash = stages[0].children()[0].clone(); let partial_hash_serde = roundtrip_operator(partial_hash.clone())?; diff --git a/ballista/rust/scheduler/src/test_utils.rs b/ballista/rust/scheduler/src/test_utils.rs index 2a35b7c17816..aa1e2b2575aa 100644 --- a/ballista/rust/scheduler/src/test_utils.rs +++ b/ballista/rust/scheduler/src/test_utils.rs @@ -18,7 +18,7 @@ use ballista_core::error::Result; use datafusion::arrow::datatypes::{DataType, Field, Schema}; -use datafusion::execution::context::ExecutionContext; +use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; use datafusion::physical_plan::csv::CsvReadOptions; pub const TPCH_TABLES: &[&str] = &[ @@ -26,7 +26,8 @@ pub const TPCH_TABLES: &[&str] = &[ ]; pub fn datafusion_test_context(path: &str) -> Result { - let mut ctx = ExecutionContext::new(); + let config = ExecutionConfig::new().with_concurrency(2); // TODO: this is hack to enable partitioned joins + let mut ctx = ExecutionContext::with_config(config); for table in TPCH_TABLES { let schema = get_tpch_schema(table); let options = CsvReadOptions::new() diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs index 64cc0a1349a2..5f07c171ad7c 100644 --- a/datafusion/src/lib.rs +++ b/datafusion/src/lib.rs @@ -167,7 +167,7 @@ //! * Filter: [`FilterExec`](physical_plan::filter::FilterExec) //! * Hash and Grouped aggregations: [`HashAggregateExec`](physical_plan::hash_aggregate::HashAggregateExec) //! * Sort: [`SortExec`](physical_plan::sort::SortExec) -//! * Merge (partitions): [`MergeExec`](physical_plan::merge::MergeExec) +//! * Coalesce partitions: [`CoalescePartitionsExec`](physical_plan::coalesce_partitions::CoalescePartitionsExec) //! * Limit: [`LocalLimitExec`](physical_plan::limit::LocalLimitExec) and [`GlobalLimitExec`](physical_plan::limit::GlobalLimitExec) //! * Scan a CSV: [`CsvExec`](physical_plan::csv::CsvExec) //! * Scan a Parquet: [`ParquetExec`](physical_plan::parquet::ParquetExec) diff --git a/datafusion/src/physical_plan/coalesce_partitions.rs b/datafusion/src/physical_plan/coalesce_partitions.rs index 94ff230b8125..4c040651cd0f 100644 --- a/datafusion/src/physical_plan/coalesce_partitions.rs +++ b/datafusion/src/physical_plan/coalesce_partitions.rs @@ -46,7 +46,7 @@ pub struct CoalescePartitionsExec { } impl CoalescePartitionsExec { - /// Create a new MergeExec + /// Create a new CoalescePartitionsExec pub fn new(input: Arc) -> Self { CoalescePartitionsExec { input } } @@ -84,16 +84,16 @@ impl ExecutionPlan for CoalescePartitionsExec { match children.len() { 1 => Ok(Arc::new(CoalescePartitionsExec::new(children[0].clone()))), _ => Err(DataFusionError::Internal( - "MergeExec wrong number of children".to_string(), + "CoalescePartitionsExec wrong number of children".to_string(), )), } } async fn execute(&self, partition: usize) -> Result { - // MergeExec produces a single partition + // CoalescePartitionsExec produces a single partition if 0 != partition { return Err(DataFusionError::Internal(format!( - "MergeExec invalid partition {}", + "CoalescePartitionsExec invalid partition {}", partition ))); } @@ -101,7 +101,7 @@ impl ExecutionPlan for CoalescePartitionsExec { let input_partitions = self.input.output_partitioning().partition_count(); match input_partitions { 0 => Err(DataFusionError::Internal( - "MergeExec requires at least one input partition".to_owned(), + "CoalescePartitionsExec requires at least one input partition".to_owned(), )), 1 => { // bypass any threading if there is a single partition @@ -135,7 +135,7 @@ impl ExecutionPlan for CoalescePartitionsExec { ) -> std::fmt::Result { match t { DisplayFormatType::Default => { - write!(f, "MergeExec") + write!(f, "CoalescePartitionsExec") } } } @@ -196,7 +196,7 @@ mod tests { let merge = CoalescePartitionsExec::new(Arc::new(csv)); - // output of MergeExec should have a single partition + // output of CoalescePartitionsExec should have a single partition assert_eq!(merge.output_partitioning().partition_count(), 1); // the result should contain 4 batches (one per input partition) diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index e157243dd8c2..b4b7c224024d 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -74,7 +74,7 @@ use super::{ }; /// Hash aggregate modes -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum AggregateMode { /// Partial aggregate that can be applied in parallel across input partitions Partial, diff --git a/datafusion/src/physical_plan/limit.rs b/datafusion/src/physical_plan/limit.rs index 361e26e5e94e..9f4744291c49 100644 --- a/datafusion/src/physical_plan/limit.rs +++ b/datafusion/src/physical_plan/limit.rs @@ -49,7 +49,7 @@ pub struct GlobalLimitExec { } impl GlobalLimitExec { - /// Create a new MergeExec + /// Create a new GlobalLimitExec pub fn new(input: Arc, limit: usize) -> Self { GlobalLimitExec { input, limit } } diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index 307fff619478..a940cbe7963a 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -308,7 +308,7 @@ pub async fn collect(plan: Arc) -> Result> { _ => { // merge into a single partition let plan = CoalescePartitionsExec::new(plan.clone()); - // MergeExec must produce a single partition + // CoalescePartitionsExec must produce a single partition assert_eq!(1, plan.output_partitioning().partition_count()); common::collect(plan.execute(0).await?).await } diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 5b43ec12bbf0..effdefcfabad 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -736,7 +736,7 @@ impl DefaultPhysicalPlanner { input } else { // Apply a LocalLimitExec to each partition. The optimizer will also insert - // a MergeExec between the GlobalLimitExec and LocalLimitExec + // a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec Arc::new(LocalLimitExec::new(input, limit)) }; diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 82c12ce217c9..bd73cb15610a 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -3812,7 +3812,7 @@ async fn test_physical_plan_display_indent() { let expected = vec![ "GlobalLimitExec: limit=10", " SortExec: [the_min@2 DESC]", - " MergeExec", + " CoalescePartitionsExec", " ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 as MAX(c12), MIN(aggregate_test_100.c12)@2 as the_min]", " HashAggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[MAX(c12), MIN(c12)]", " CoalesceBatchesExec: target_batch_size=4096",