Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable PhysicalOptimizerRule lazily (#4806) #4807

Merged
merged 1 commit into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1449,9 +1449,8 @@ impl SessionState {
}

// We need to take care of the rule ordering. They may influence each other.
let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> =
vec![Arc::new(AggregateStatistics::new())];

let physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
// - In order to increase the parallelism, it will change the output partitioning
// of some operators in the plan tree, which will influence other rules.
// Therefore, it should be run as soon as possible.
Expand All @@ -1460,48 +1459,44 @@ impl SessionState {
// - it's conflicted with some parts of the BasicEnforcement, since it will
// introduce additional repartitioning while the BasicEnforcement aims at
// reducing unnecessary repartitioning.
if config.options.optimizer.enable_round_robin_repartition {
physical_optimizers.push(Arc::new(Repartition::new()));
}
Arc::new(Repartition::new()),
//- Currently it will depend on the partition number to decide whether to change the
// single node sort to parallel local sort and merge. Therefore, it should be run
// after the Repartition.
// - Since it will change the output ordering of some operators, it should be run
// before JoinSelection and BasicEnforcement, which may depend on that.
physical_optimizers.push(Arc::new(GlobalSortSelection::new()));
Arc::new(GlobalSortSelection::new()),
// Statistics-base join selection will change the Auto mode to real join implementation,
// like collect left, or hash join, or future sort merge join, which will
// influence the BasicEnforcement to decide whether to add additional repartition
// and local sort to meet the distribution and ordering requirements.
// Therefore, it should be run before BasicEnforcement
physical_optimizers.push(Arc::new(JoinSelection::new()));
Arc::new(JoinSelection::new()),
// If the query is processing infinite inputs, the PipelineFixer rule applies the
// necessary transformations to make the query runnable (if it is not already runnable).
// If the query can not be made runnable, the rule emits an error with a diagnostic message.
// Since the transformations it applies may alter output partitioning properties of operators
// (e.g. by swapping hash join sides), this rule runs before BasicEnforcement.
physical_optimizers.push(Arc::new(PipelineFixer::new()));
Arc::new(PipelineFixer::new()),
// It's for adding essential repartition and local sorting operator to satisfy the
// required distribution and local sort.
// Please make sure that the whole plan tree is determined.
physical_optimizers.push(Arc::new(BasicEnforcement::new()));
Arc::new(BasicEnforcement::new()),
// `BasicEnforcement` stage conservatively inserts `SortExec`s to satisfy ordering requirements.
// However, a deeper analysis may sometimes reveal that such a `SortExec` is actually unnecessary.
// These cases typically arise when we have reversible `WindowAggExec`s or deep subqueries. The
// rule below performs this analysis and removes unnecessary `SortExec`s.
physical_optimizers.push(Arc::new(OptimizeSorts::new()));
Arc::new(OptimizeSorts::new()),
// It will not influence the distribution and ordering of the whole plan tree.
// Therefore, to avoid influencing other rules, it should be run at last.
if config.options.execution.coalesce_batches {
physical_optimizers.push(Arc::new(CoalesceBatches::new(
config.options.execution.batch_size,
)));
}
Arc::new(CoalesceBatches::new()),
// The PipelineChecker rule will reject non-runnable query plans that use
// pipeline-breaking operators on infinite input(s). The rule generates a
// diagnostic error message when this happens. It makes no changes to the
// given query plan; i.e. it only acts as a final gatekeeping rule.
physical_optimizers.push(Arc::new(PipelineChecker::new()));
Arc::new(PipelineChecker::new()),
];

SessionState {
session_id,
optimizer: Optimizer::new(),
Expand Down
17 changes: 9 additions & 8 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,25 @@ use std::sync::Arc;
/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
/// are produced by highly selective filters
#[derive(Default)]
pub struct CoalesceBatches {
/// Target batch size
target_batch_size: usize,
}
pub struct CoalesceBatches {}

impl CoalesceBatches {
#[allow(missing_docs)]
pub fn new(target_batch_size: usize) -> Self {
Self { target_batch_size }
pub fn new() -> Self {
Self::default()
}
}
impl PhysicalOptimizerRule for CoalesceBatches {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
_config: &ConfigOptions,
config: &ConfigOptions,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
let target_batch_size = self.target_batch_size;
if !config.execution.coalesce_batches {
return Ok(plan);
}

let target_batch_size = config.execution.batch_size;
plan.transform_up(&|plan| {
let plan_any = plan.as_any();
// The goal here is to detect operators that could produce small batches and only
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ impl PhysicalOptimizerRule for Repartition {
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let target_partitions = config.execution.target_partitions;
let enabled = config.optimizer.enable_round_robin_repartition;
// Don't run optimizer if target_partitions == 1
if target_partitions == 1 {
if !enabled || target_partitions == 1 {
Ok(plan)
} else {
optimize_partitions(
Expand Down