Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple physical optimizer from SessionConfig (#3887) #4749

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions datafusion/core/src/physical_optimizer/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
//! Utilizing exact statistics from sources to avoid scanning data
use std::sync::Arc;

use crate::config::ConfigOptions;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;

use crate::execution::context::SessionConfig;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode};
use crate::physical_plan::empty::EmptyExec;
use crate::physical_plan::projection::ProjectionExec;
Expand Down Expand Up @@ -51,7 +51,7 @@ impl PhysicalOptimizerRule for AggregateStatistics {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &SessionConfig,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
if let Some(partial_agg_exec) = take_optimizable(&*plan) {
let partial_agg_exec = partial_agg_exec
Expand Down Expand Up @@ -307,9 +307,10 @@ mod tests {
agg: TestAggregate,
) -> Result<()> {
let session_ctx = SessionContext::new();
let conf = session_ctx.copied_config();
let state = session_ctx.state();
let plan = Arc::new(plan) as _;
let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &conf)?;
let optimized = AggregateStatistics::new()
.optimize(Arc::clone(&plan), state.config_options())?;

// A ProjectionExec is a sign that the count optimization was applied
assert!(optimized.as_any().is::<ProjectionExec>());
Expand Down Expand Up @@ -548,7 +549,7 @@ mod tests {
Arc::clone(&schema),
)?;

let conf = SessionConfig::new();
let conf = ConfigOptions::new();
let optimized =
AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;

Expand Down Expand Up @@ -591,7 +592,7 @@ mod tests {
Arc::clone(&schema),
)?;

let conf = SessionConfig::new();
let conf = ConfigOptions::new();
let optimized =
AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! CoalesceBatches optimizer that groups batches together rows
//! in bigger batches to avoid overhead with small batches

use crate::config::ConfigOptions;
use crate::{
error::Result,
physical_optimizer::PhysicalOptimizerRule,
Expand Down Expand Up @@ -46,7 +47,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
fn optimize(
&self,
plan: Arc<dyn crate::physical_plan::ExecutionPlan>,
_config: &crate::execution::context::SessionConfig,
_config: &ConfigOptions,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
let target_batch_size = self.target_batch_size;
plan.transform_up(&|plan| {
Expand Down
16 changes: 9 additions & 7 deletions datafusion/core/src/physical_optimizer/enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
//! Enforcement optimizer rules are used to make sure the plan's Distribution and Ordering
//! requirements are met by inserting necessary [[RepartitionExec]] and [[SortExec]].
//!
use crate::config::OPT_TOP_DOWN_JOIN_KEY_REORDERING;
use crate::config::{
ConfigOptions, OPT_TARGET_PARTITIONS, OPT_TOP_DOWN_JOIN_KEY_REORDERING,
};
use crate::error::Result;
use crate::physical_optimizer::utils::{add_sort_above_child, ordering_satisfy};
use crate::physical_optimizer::PhysicalOptimizerRule;
Expand All @@ -34,7 +36,6 @@ use crate::physical_plan::sorts::sort::SortOptions;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
use crate::physical_plan::{with_new_children_if_necessary, Distribution, ExecutionPlan};
use crate::prelude::SessionConfig;
use arrow::datatypes::SchemaRef;
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::equivalence::EquivalenceProperties;
Expand Down Expand Up @@ -69,11 +70,10 @@ impl PhysicalOptimizerRule for BasicEnforcement {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &SessionConfig,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let target_partitions = config.target_partitions();
let target_partitions = config.get_usize(OPT_TARGET_PARTITIONS).unwrap();
let top_down_join_key_reordering = config
.config_options()
.get_bool(OPT_TOP_DOWN_JOIN_KEY_REORDERING)
.unwrap_or_default();
let new_plan = if top_down_join_key_reordering {
Expand Down Expand Up @@ -1135,10 +1135,12 @@ mod tests {
($EXPECTED_LINES: expr, $PLAN: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();

let mut config = ConfigOptions::new();
config.set_usize(OPT_TARGET_PARTITIONS, 10);

// run optimizer
let optimizer = BasicEnforcement {};
let optimized = optimizer
.optimize($PLAN, &SessionConfig::new().with_target_partitions(10))?;
let optimized = optimizer.optimize($PLAN, &config)?;

// Now format correctly
let plan = displayable(optimized.as_ref()).indent().to_string();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@

use std::sync::Arc;

use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::ExecutionPlan;
use crate::prelude::SessionConfig;

/// Currently for a sort operator, if
/// - there are more than one input partitions
Expand All @@ -48,7 +48,7 @@ impl PhysicalOptimizerRule for GlobalSortSelection {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &SessionConfig,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(&|plan| {
Ok(plan
Expand Down
20 changes: 9 additions & 11 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ use std::sync::Arc;

use arrow::datatypes::Schema;

use crate::config::OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD;
use crate::execution::context::SessionConfig;
use crate::config::{ConfigOptions, OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD};
use crate::logical_expr::JoinType;
use crate::physical_plan::expressions::Column;
use crate::physical_plan::joins::{
Expand Down Expand Up @@ -211,10 +210,9 @@ impl PhysicalOptimizerRule for JoinSelection {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
session_config: &SessionConfig,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let collect_left_threshold: usize = session_config
.config_options()
let collect_left_threshold: usize = config
.get_u64(OPT_HASH_JOIN_SINGLE_PARTITION_THRESHOLD)
.unwrap_or_default()
.try_into()
Expand Down Expand Up @@ -508,7 +506,7 @@ mod tests {
.unwrap();

let optimized_join = JoinSelection::new()
.optimize(Arc::new(join), &SessionConfig::new())
.optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();

let swapping_projection = optimized_join
Expand Down Expand Up @@ -556,7 +554,7 @@ mod tests {
.unwrap();

let optimized_join = JoinSelection::new()
.optimize(Arc::new(join), &SessionConfig::new())
.optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();

let swapping_projection = optimized_join
Expand Down Expand Up @@ -609,7 +607,7 @@ mod tests {
let original_schema = join.schema();

let optimized_join = JoinSelection::new()
.optimize(Arc::new(join), &SessionConfig::new())
.optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();

let swapped_join = optimized_join
Expand Down Expand Up @@ -638,7 +636,7 @@ mod tests {
$EXPECTED_LINES.iter().map(|s| *s).collect::<Vec<&str>>();

let optimized = JoinSelection::new()
.optimize(Arc::new($PLAN), &SessionConfig::new())
.optimize(Arc::new($PLAN), &ConfigOptions::new())
.unwrap();

let plan = displayable(optimized.as_ref()).indent().to_string();
Expand Down Expand Up @@ -725,7 +723,7 @@ mod tests {
.unwrap();

let optimized_join = JoinSelection::new()
.optimize(Arc::new(join), &SessionConfig::new())
.optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();

let swapped_join = optimized_join
Expand Down Expand Up @@ -950,7 +948,7 @@ mod tests {
.unwrap();

let optimized_join = JoinSelection::new()
.optimize(Arc::new(join), &SessionConfig::new())
.optimize(Arc::new(join), &ConfigOptions::new())
.unwrap();

if !is_swapped {
Expand Down
24 changes: 12 additions & 12 deletions datafusion/core/src/physical_optimizer/optimize_sorts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
//! " SortExec: [non_nullable_col@1 ASC]",
//! in the physical plan. The first sort is unnecessary since its result is overwritten
//! by another SortExec. Therefore, this rule removes it from the physical plan.
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::{
add_sort_above_child, ordering_satisfy, ordering_satisfy_concrete,
Expand All @@ -34,7 +35,6 @@ use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use crate::prelude::SessionConfig;
use arrow::datatypes::SchemaRef;
use datafusion_common::{reverse_sort_options, DataFusionError};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
Expand Down Expand Up @@ -122,7 +122,7 @@ impl PhysicalOptimizerRule for OptimizeSorts {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &SessionConfig,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
// Execute a post-order traversal to adjust input key ordering:
let plan_requirements = PlanWithCorrespondingSort::new(plan);
Expand Down Expand Up @@ -557,7 +557,7 @@ mod tests {
#[tokio::test]
async fn test_remove_unnecessary_sort() -> Result<()> {
let session_ctx = SessionContext::new();
let conf = session_ctx.copied_config();
let state = session_ctx.state();
let schema = create_test_schema()?;
let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
as Arc<dyn ExecutionPlan>;
Expand Down Expand Up @@ -589,7 +589,7 @@ mod tests {
expected, actual
);
let optimized_physical_plan =
OptimizeSorts::new().optimize(physical_plan, &conf)?;
OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
Expand All @@ -608,7 +608,7 @@ mod tests {
#[tokio::test]
async fn test_remove_unnecessary_sort_window_multilayer() -> Result<()> {
let session_ctx = SessionContext::new();
let conf = session_ctx.copied_config();
let state = session_ctx.state();
let schema = create_test_schema()?;
let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
as Arc<dyn ExecutionPlan>;
Expand Down Expand Up @@ -690,7 +690,7 @@ mod tests {
expected, actual
);
let optimized_physical_plan =
OptimizeSorts::new().optimize(physical_plan, &conf)?;
OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
Expand All @@ -715,7 +715,7 @@ mod tests {
#[tokio::test]
async fn test_add_required_sort() -> Result<()> {
let session_ctx = SessionContext::new();
let conf = session_ctx.copied_config();
let state = session_ctx.state();
let schema = create_test_schema()?;
let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
as Arc<dyn ExecutionPlan>;
Expand All @@ -736,7 +736,7 @@ mod tests {
expected, actual
);
let optimized_physical_plan =
OptimizeSorts::new().optimize(physical_plan, &conf)?;
OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
Expand All @@ -760,7 +760,7 @@ mod tests {
#[tokio::test]
async fn test_remove_unnecessary_sort1() -> Result<()> {
let session_ctx = SessionContext::new();
let conf = session_ctx.copied_config();
let state = session_ctx.state();
let schema = create_test_schema()?;
let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
as Arc<dyn ExecutionPlan>;
Expand Down Expand Up @@ -803,7 +803,7 @@ mod tests {
expected, actual
);
let optimized_physical_plan =
OptimizeSorts::new().optimize(physical_plan, &conf)?;
OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
Expand All @@ -827,7 +827,7 @@ mod tests {
#[tokio::test]
async fn test_change_wrong_sorting() -> Result<()> {
let session_ctx = SessionContext::new();
let conf = session_ctx.copied_config();
let state = session_ctx.state();
let schema = create_test_schema()?;
let source = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?)
as Arc<dyn ExecutionPlan>;
Expand Down Expand Up @@ -865,7 +865,7 @@ mod tests {
expected, actual
);
let optimized_physical_plan =
OptimizeSorts::new().optimize(physical_plan, &conf)?;
OptimizeSorts::new().optimize(physical_plan, state.config_options())?;
let formatted = displayable(optimized_physical_plan.as_ref())
.indent()
.to_string();
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/physical_optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@

use std::sync::Arc;

use crate::{
error::Result, execution::context::SessionConfig, physical_plan::ExecutionPlan,
};
use crate::config::ConfigOptions;
use crate::{error::Result, physical_plan::ExecutionPlan};

/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
/// computes the same results, but in a potentially more efficient
Expand All @@ -31,7 +30,7 @@ pub trait PhysicalOptimizerRule {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &SessionConfig,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>>;

/// A human readable name for this optimizer rule
Expand Down
14 changes: 9 additions & 5 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
use std::sync::Arc;

use super::optimizer::PhysicalOptimizerRule;
use crate::config::{ConfigOptions, OPT_TARGET_PARTITIONS};
use crate::error::Result;
use crate::physical_plan::Partitioning::*;
use crate::physical_plan::{
repartition::RepartitionExec, with_new_children_if_necessary, ExecutionPlan,
};
use crate::{error::Result, execution::context::SessionConfig};

/// Optimizer that introduces repartition to introduce more
/// parallelism in the plan
Expand Down Expand Up @@ -207,14 +208,15 @@ impl PhysicalOptimizerRule for Repartition {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &SessionConfig,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let target_partitions = config.get_usize(OPT_TARGET_PARTITIONS).unwrap();
// Don't run optimizer if target_partitions == 1
if config.target_partitions() == 1 {
if target_partitions == 1 {
Ok(plan)
} else {
optimize_partitions(
config.target_partitions(),
target_partitions,
plan.clone(),
plan.output_ordering().is_none(),
false,
Expand Down Expand Up @@ -360,8 +362,10 @@ mod tests {
($EXPECTED_LINES: expr, $PLAN: expr) => {
let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect();

let mut config = ConfigOptions::new();
config.set_usize(OPT_TARGET_PARTITIONS, 10);

// run optimizer
let config = SessionConfig::new().with_target_partitions(10);
let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(Repartition::new()),
// The `BasicEnforcement` is an essential rule to be applied.
Expand Down
Loading