Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop most copying LogicalPlan and Exprs in ScalarSubqueryToJoin #10489

Merged
merged 1 commit into from
May 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 54 additions & 29 deletions datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use datafusion_common::alias::AliasGenerator;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
};
use datafusion_common::{plan_err, Column, Result, ScalarValue};
use datafusion_common::{internal_err, plan_err, Column, Result, ScalarValue};
use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
use datafusion_expr::logical_plan::{JoinType, Subquery};
use datafusion_expr::utils::conjunction;
Expand All @@ -50,7 +50,7 @@ impl ScalarSubqueryToJoin {
/// # Arguments
/// * `predicate` - A conjunction to split and search
///
/// Returns a tuple (subqueries, rewrite expression)
/// Returns a tuple (subqueries, alias)
fn extract_subquery_exprs(
&self,
predicate: &Expr,
Expand All @@ -71,19 +71,36 @@ impl ScalarSubqueryToJoin {
impl OptimizerRule for ScalarSubqueryToJoin {
fn try_optimize(
&self,
plan: &LogicalPlan,
config: &dyn OptimizerConfig,
_plan: &LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
internal_err!("Should have called ScalarSubqueryToJoin::rewrite")
}

fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
// Optimization: skip the rest of the rule and its copies if
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note there is a still a bunch of copying in the actual rewrite logic itself below, but I could not figure out a way to remove it easily as there are several paths that return the original (unmodified) plan for unsupported subqueries

I think we would have to split the check for "is rewrite supported" and the "do the rewrite" logic or something

Given this isn't a tall pole in planning, I don't have any more time to spend on this particular rule so I want to call this good enough

// there are no scalar subqueries
if !contains_scalar_subquery(&filter.predicate) {
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}

let (subqueries, mut rewrite_expr) = self.extract_subquery_exprs(
&filter.predicate,
config.alias_generator(),
)?;

if subqueries.is_empty() {
// regular filter, no subquery exists clause here
return Ok(None);
return internal_err!("Expected subqueries not found in filter");
}

// iterate through all subqueries in predicate, turning each into a left join
Expand All @@ -94,16 +111,13 @@ impl OptimizerRule for ScalarSubqueryToJoin {
{
if !expr_check_map.is_empty() {
rewrite_expr = rewrite_expr
.clone()
.transform_up(|expr| {
if let Expr::Column(col) = &expr {
if let Some(map_expr) =
expr_check_map.get(&col.name)
{
Ok(Transformed::yes(map_expr.clone()))
} else {
Ok(Transformed::no(expr))
}
// replace column references with entry in map, if it exists
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove some indenting using the api added in #10448, but the logic is the same

if let Some(map_expr) = expr
.try_as_col()
.and_then(|col| expr_check_map.get(&col.name))
{
Ok(Transformed::yes(map_expr.clone()))
} else {
Ok(Transformed::no(expr))
}
Expand All @@ -113,15 +127,21 @@ impl OptimizerRule for ScalarSubqueryToJoin {
cur_input = optimized_subquery;
} else {
// if we can't handle all of the subqueries then bail for now
return Ok(None);
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}
}
let new_plan = LogicalPlanBuilder::from(cur_input)
.filter(rewrite_expr)?
.build()?;
Ok(Some(new_plan))
Ok(Transformed::yes(new_plan))
}
LogicalPlan::Projection(projection) => {
// Optimization: skip the rest of the rule and its copies if
// there are no scalar subqueries
if !projection.expr.iter().any(contains_scalar_subquery) {
return Ok(Transformed::no(LogicalPlan::Projection(projection)));
}

let mut all_subqueryies = vec![];
let mut expr_to_rewrite_expr_map = HashMap::new();
let mut subquery_to_expr_map = HashMap::new();
Expand All @@ -135,8 +155,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
expr_to_rewrite_expr_map.insert(expr, rewrite_exprs);
}
if all_subqueryies.is_empty() {
// regular projection, no subquery exists clause here
return Ok(None);
return internal_err!("Expected subqueries not found in projection");
}
// iterate through all subqueries in predicate, turning each into a left join
let mut cur_input = projection.input.as_ref().clone();
Expand All @@ -153,14 +172,13 @@ impl OptimizerRule for ScalarSubqueryToJoin {
let new_expr = rewrite_expr
.clone()
.transform_up(|expr| {
if let Expr::Column(col) = &expr {
if let Some(map_expr) =
// replace column references with entry in map, if it exists
if let Some(map_expr) =
expr.try_as_col().and_then(|col| {
expr_check_map.get(&col.name)
{
Ok(Transformed::yes(map_expr.clone()))
} else {
Ok(Transformed::no(expr))
}
})
{
Ok(Transformed::yes(map_expr.clone()))
} else {
Ok(Transformed::no(expr))
}
Expand All @@ -172,7 +190,7 @@ impl OptimizerRule for ScalarSubqueryToJoin {
}
} else {
// if we can't handle all of the subqueries then bail for now
return Ok(None);
return Ok(Transformed::no(LogicalPlan::Projection(projection)));
}
}

Expand All @@ -190,10 +208,10 @@ impl OptimizerRule for ScalarSubqueryToJoin {
let new_plan = LogicalPlanBuilder::from(cur_input)
.project(proj_exprs)?
.build()?;
Ok(Some(new_plan))
Ok(Transformed::yes(new_plan))
}

_ => Ok(None),
plan => Ok(Transformed::no(plan)),
}
}

Expand All @@ -206,6 +224,13 @@ impl OptimizerRule for ScalarSubqueryToJoin {
}
}

/// Returns true if the expression has a scalar subquery somewhere in it
/// false otherwise
fn contains_scalar_subquery(expr: &Expr) -> bool {
expr.exists(|expr| Ok(matches!(expr, Expr::ScalarSubquery(_))))
.expect("Inner is always Ok")
}

struct ExtractScalarSubQuery {
sub_query_info: Vec<(Subquery, String)>,
alias_gen: Arc<AliasGenerator>,
Expand Down