Skip to content

Commit

Permalink
Reorder the physical plan optimizer rules, extract `GlobalSortSelecti…
Browse files Browse the repository at this point in the history
…on`, make `Repartition` optional (#4714)

* Extract the global sort algorithm selection from the BasicEnforcement to be a separate rule, GlobalSortSelection

* Make the optimizer rule of Repartition optional

* Fix EmptyExec data() method

* Reorder the physical plan optimizer rules

* Refine the UT for the repartition rule by adding the essential BasicEnforcement rule

* Fix UT failure for window function

* Fix example testing

Co-authored-by: yangzhong <[email protected]>
  • Loading branch information
yahoNanJing and kyotoYaho authored Dec 27, 2022
1 parent 981a9bb commit 899c86a
Show file tree
Hide file tree
Showing 16 changed files with 344 additions and 177 deletions.
2 changes: 1 addition & 1 deletion datafusion-examples/examples/custom_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,6 @@ impl ExecutionPlan for CustomExec {
}

fn statistics(&self) -> Statistics {
todo!()
Statistics::default()
}
}
9 changes: 9 additions & 0 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ pub const OPT_PREFER_HASH_JOIN: &str = "datafusion.optimizer.prefer_hash_join";
pub const OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD: &str =
"datafusion.optimizer.hash_join_single_partition_threshold";

/// Configuration option "datafusion.execution.round_robin_repartition"
pub const OPT_ENABLE_ROUND_ROBIN_REPARTITION: &str =
"datafusion.optimizer.enable_round_robin_repartition";

/// Definition of a configuration option
pub struct ConfigDefinition {
/// key used to identifier this configuration option
Expand Down Expand Up @@ -409,6 +413,11 @@ impl BuiltInConfigs {
"The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition",
1024 * 1024,
),
ConfigDefinition::new_bool(
OPT_ENABLE_ROUND_ROBIN_REPARTITION,
"When set to true, the physical plan optimizer will try to add round robin repartition to increase parallelism to leverage more CPU cores",
true,
),
]
}
}
Expand Down
58 changes: 43 additions & 15 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ use crate::physical_optimizer::repartition::Repartition;

use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
OPT_ENABLE_ROUND_ROBIN_REPARTITION, OPT_FILTER_NULL_JOIN_KEYS,
OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
use crate::physical_optimizer::enforcement::BasicEnforcement;
Expand All @@ -98,6 +99,7 @@ use crate::catalog::information_schema::{InformationSchemaProvider, INFORMATION_
use crate::catalog::listing_schema::ListingSchemaProvider;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::memory_pool::MemoryPool;
use crate::physical_optimizer::global_sort_selection::GlobalSortSelection;
use crate::physical_optimizer::optimize_sorts::OptimizeSorts;
use uuid::Uuid;

Expand Down Expand Up @@ -1530,11 +1532,47 @@ impl SessionState {
);
}

let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
Arc::new(JoinSelection::new()),
];
// We need to take care of the rule ordering. They may influence each other.
let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> =
vec![Arc::new(AggregateStatistics::new())];
// - In order to increase the parallelism, it will change the output partitioning
// of some operators in the plan tree, which will influence other rules.
// Therefore, it should be run as soon as possible.
// - The reason to make it optional is
// - it's not used for the distributed engine, Ballista.
// - it's conflicted with some parts of the BasicEnforcement, since it will
// introduce additional repartitioning while the BasicEnforcement aims at
// reducing unnecessary repartitioning.
if config
.config_options
.get_bool(OPT_ENABLE_ROUND_ROBIN_REPARTITION)
.unwrap_or_default()
{
physical_optimizers.push(Arc::new(Repartition::new()));
}
//- Currently it will depend on the partition number to decide whether to change the
// single node sort to parallel local sort and merge. Therefore, it should be run
// after the Repartition.
// - Since it will change the output ordering of some operators, it should be run
// before JoinSelection and BasicEnforcement, which may depend on that.
physical_optimizers.push(Arc::new(GlobalSortSelection::new()));
// Statistics-base join selection will change the Auto mode to real join implementation,
// like collect left, or hash join, or future sort merge join, which will
// influence the BasicEnforcement to decide whether to add additional repartition
// and local sort to meet the distribution and ordering requirements.
// Therefore, it should be run before BasicEnforcement
physical_optimizers.push(Arc::new(JoinSelection::new()));
// It's for adding essential repartition and local sorting operator to satisfy the
// required distribution and local sort.
// Please make sure that the whole plan tree is determined.
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
// rule below performs this analysis and removes unnecessary `SortExec`s.
physical_optimizers.push(Arc::new(OptimizeSorts::new()));
// It will not influence the distribution and ordering of the whole plan tree.
// Therefore, to avoid influencing other rules, it should be run at last.
if config
.config_options
.get_bool(OPT_COALESCE_BATCHES)
Expand All @@ -1549,16 +1587,6 @@ impl SessionState {
.unwrap(),
)));
}
physical_optimizers.push(Arc::new(Repartition::new()));
// Repartition rule could introduce additional RepartitionExec with RoundRobin partitioning.
// To make sure the SinglePartition is satisfied, run the BasicEnforcement again, originally it was the AddCoalescePartitionsExec here.
physical_optimizers.push(Arc::new(BasicEnforcement::new()));

// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
// rule below performs this analysis and removes unnecessary `SortExec`s.
physical_optimizers.push(Arc::new(OptimizeSorts::new()));

SessionState {
session_id,
Expand Down
13 changes: 11 additions & 2 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{
coalesce_batches::CoalesceBatchesExec, filter::FilterExec, joins::HashJoinExec,
repartition::RepartitionExec, rewrite::TreeNodeRewritable,
repartition::RepartitionExec, rewrite::TreeNodeRewritable, Partitioning,
},
};
use std::sync::Arc;
Expand Down Expand Up @@ -57,7 +57,16 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// See https://github.com/apache/arrow-datafusion/issues/139
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
|| plan_any.downcast_ref::<RepartitionExec>().is_some();
// Don't need to add CoalesceBatchesExec after a round robin RepartitionExec
|| plan_any
.downcast_ref::<RepartitionExec>()
.map(|repart_exec| {
!matches!(
repart_exec.partitioning().clone(),
Partitioning::RoundRobinBatch(_)
)
})
.unwrap_or(false);
if wrap_in_coalesce {
Ok(Some(Arc::new(CoalesceBatchesExec::new(
plan.clone(),
Expand Down
33 changes: 1 addition & 32 deletions datafusion/core/src/physical_optimizer/enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ use crate::physical_plan::joins::{
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::{SortExec, SortOptions};
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::sorts::sort::SortOptions;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
Expand Down Expand Up @@ -844,36 +843,6 @@ fn ensure_distribution_and_ordering(
if plan.children().is_empty() {
return Ok(plan);
}
// It's mainly for changing the single node global SortExec to
// the SortPreservingMergeExec with multiple local SortExec.
// What's more, if limit exists, it can also be pushed down to the local sort
let plan = plan
.as_any()
.downcast_ref::<SortExec>()
.and_then(|sort_exec| {
// There are three situations that there's no need for this optimization
// - There's only one input partition;
// - It's already preserving the partitioning so that it can be regarded as a local sort
// - There's no limit pushed down to the local sort (It's still controversial)
if sort_exec.input().output_partitioning().partition_count() > 1
&& !sort_exec.preserve_partitioning()
&& sort_exec.fetch().is_some()
{
let sort = SortExec::new_with_partitioning(
sort_exec.expr().to_vec(),
sort_exec.input().clone(),
true,
sort_exec.fetch(),
);
Some(Arc::new(SortPreservingMergeExec::new(
sort_exec.expr().to_vec(),
Arc::new(sort),
)))
} else {
None
}
})
.map_or(plan, |new_plan| new_plan);

let required_input_distributions = plan.required_input_distribution();
let required_input_orderings = plan.required_input_ordering();
Expand Down
89 changes: 89 additions & 0 deletions datafusion/core/src/physical_optimizer/global_sort_selection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Select the efficient global sort implementation based on sort details.
use std::sync::Arc;

use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::ExecutionPlan;
use crate::prelude::SessionConfig;

/// Currently for a sort operator, if
/// - there are more than one input partitions
/// - and there's some limit which can be pushed down to each of its input partitions
/// then [SortPreservingMergeExec] with local sort with a limit pushed down will be preferred;
/// Otherwise, the normal global sort [SortExec] will be used.
/// Later more intelligent statistics-based decision can also be introduced.
/// For example, for a small data set, the global sort may be efficient enough
#[derive(Default)]
pub struct GlobalSortSelection {}

impl GlobalSortSelection {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {}
}
}

impl PhysicalOptimizerRule for GlobalSortSelection {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &SessionConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(&|plan| {
Ok(plan
.as_any()
.downcast_ref::<SortExec>()
.and_then(|sort_exec| {
if sort_exec.input().output_partitioning().partition_count() > 1
&& sort_exec.fetch().is_some()
// It's already preserving the partitioning so that it can be regarded as a local sort
&& !sort_exec.preserve_partitioning()
{
let sort = SortExec::new_with_partitioning(
sort_exec.expr().to_vec(),
sort_exec.input().clone(),
true,
sort_exec.fetch(),
);
let global_sort: Arc<dyn ExecutionPlan> =
Arc::new(SortPreservingMergeExec::new(
sort_exec.expr().to_vec(),
Arc::new(sort),
));
Some(global_sort)
} else {
None
}
}))
})
}

fn name(&self) -> &str {
"global_sort_selection"
}

fn schema_check(&self) -> bool {
false
}
}
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
pub mod aggregate_statistics;
pub mod coalesce_batches;
pub mod enforcement;
pub mod global_sort_selection;
pub mod join_selection;
pub mod optimize_sorts;
pub mod optimizer;
Expand Down
Loading

0 comments on commit 899c86a

Please sign in to comment.