Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: Add PullUpCorrelatedExpr::new and improve documentation #10500

Merged
merged 5 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 53 additions & 7 deletions datafusion/optimizer/src/decorrelate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,71 @@ use datafusion_physical_expr::execution_props::ExecutionProps;
/// 'Filter'. It adds the inner reference columns to the 'Projection' or
/// 'Aggregate' of the subquery if they are missing, so that they can be
/// evaluated by the parent operator as the join condition.
#[derive(Debug)]
pub struct PullUpCorrelatedExpr {
pub join_filters: Vec<Expr>,
// mapping from the plan to its holding correlated columns
/// mapping from the plan to its holding correlated columns
pub correlated_subquery_cols_map: HashMap<LogicalPlan, BTreeSet<Column>>,
pub in_predicate_opt: Option<Expr>,
// indicate whether it is Exists(Not Exists) SubQuery
/// Is this an Exists(Not Exists) SubQuery. Defaults to **FALSE**
pub exists_sub_query: bool,
// indicate whether the correlated expressions can pull up or not
/// Can the correlated expressions be pulled up. Defaults to **TRUE**
pub can_pull_up: bool,
// indicate whether need to handle the Count bug during the pull up process
/// Do we need to handle [the Count bug] during the pull up process
///
/// [the Count bug]: https://github.com/apache/datafusion/pull/10500
pub need_handle_count_bug: bool,
// mapping from the plan to its expressions' evaluation result on empty batch
/// mapping from the plan to its expressions' evaluation result on empty batch
pub collected_count_expr_map: HashMap<LogicalPlan, ExprResultMap>,
// pull up having expr, which must be evaluated after the Join
/// pull up having expr, which must be evaluated after the Join
pub pull_up_having_expr: Option<Expr>,
}

impl Default for PullUpCorrelatedExpr {
fn default() -> Self {
Self::new()
}
}

impl PullUpCorrelatedExpr {
pub fn new() -> Self {
Self {
join_filters: vec![],
correlated_subquery_cols_map: HashMap::new(),
in_predicate_opt: None,
exists_sub_query: false,
can_pull_up: true,
need_handle_count_bug: false,
collected_count_expr_map: HashMap::new(),
pull_up_having_expr: None,
}
}

/// Set if we need to handle [the Count bug] during the pull up process
///
/// [the Count bug]: https://github.com/apache/datafusion/pull/10500
pub fn with_need_handle_count_bug(mut self, need_handle_count_bug: bool) -> Self {
self.need_handle_count_bug = need_handle_count_bug;
self
}

/// Set the in_predicate_opt
pub fn with_in_predicate_opt(mut self, in_predicate_opt: Option<Expr>) -> Self {
self.in_predicate_opt = in_predicate_opt;
self
}

/// Set if this is an Exists(Not Exists) SubQuery
pub fn with_exists_sub_query(mut self, exists_sub_query: bool) -> Self {
self.exists_sub_query = exists_sub_query;
self
}
}

/// Used to indicate the unmatched rows from the inner(subquery) table after the left out Join
/// This is used to handle the Count bug
/// This is used to handle [the Count bug]
///
/// [the Count bug]: https://github.com/apache/datafusion/pull/10500
pub const UN_MATCHED_ROW_INDICATOR: &str = "__always_true";

/// Mapping from expr display name to its evaluation result on empty record
Expand Down
14 changes: 4 additions & 10 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,10 @@ fn build_join(
let subquery = query_info.query.subquery.as_ref();
let subquery_alias = alias.next("__correlated_sq");

let mut pull_up = PullUpCorrelatedExpr {
join_filters: vec![],
correlated_subquery_cols_map: Default::default(),
in_predicate_opt: in_predicate_opt.clone(),
exists_sub_query: in_predicate_opt.is_none(),
can_pull_up: true,
need_handle_count_bug: false,
collected_count_expr_map: Default::default(),
pull_up_having_expr: None,
};
let mut pull_up = PullUpCorrelatedExpr::new()
.with_in_predicate_opt(in_predicate_opt.clone())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these were the two fields that are set differently which I think is much clearer now

.with_exists_sub_query(in_predicate_opt.is_none());

let new_plan = subquery.clone().rewrite(&mut pull_up).data()?;
if !pull_up.can_pull_up {
return Ok(None);
Expand Down
11 changes: 1 addition & 10 deletions datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,16 +305,7 @@ fn build_join(
subquery_alias: &str,
) -> Result<Option<(LogicalPlan, HashMap<String, Expr>)>> {
let subquery_plan = subquery.subquery.as_ref();
let mut pull_up = PullUpCorrelatedExpr {
join_filters: vec![],
correlated_subquery_cols_map: Default::default(),
in_predicate_opt: None,
exists_sub_query: false,
can_pull_up: true,
need_handle_count_bug: true,
collected_count_expr_map: Default::default(),
pull_up_having_expr: None,
};
let mut pull_up = PullUpCorrelatedExpr::new().with_need_handle_count_bug(true);
let new_plan = subquery_plan.clone().rewrite(&mut pull_up).data()?;
if !pull_up.can_pull_up {
return Ok(None);
Expand Down
Loading