Skip to content

Commit

Permalink
Deprecate duplicate function LogicalPlan::with_new_inputs (#8707)
Browse files Browse the repository at this point in the history
* Remove duplicate function with_new_inputs

* Make it as deprecated function
  • Loading branch information
viirya authored Jan 2, 2024
1 parent d2b3d1c commit bf0a39a
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 67 deletions.
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ impl LogicalPlanBuilder {
)
})
.collect::<Result<Vec<_>>>()?;
curr_plan.with_new_inputs(&new_inputs)
curr_plan.with_new_exprs(curr_plan.expressions(), &new_inputs)
}
}
}
Expand Down
47 changes: 5 additions & 42 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,35 +541,9 @@ impl LogicalPlan {
}

/// Returns a copy of this `LogicalPlan` with the new inputs
#[deprecated(since = "35.0.0", note = "please use `with_new_exprs` instead")]
pub fn with_new_inputs(&self, inputs: &[LogicalPlan]) -> Result<LogicalPlan> {
// with_new_inputs use original expression,
// so we don't need to recompute Schema.
match &self {
LogicalPlan::Projection(projection) => {
// Schema of the projection may change
// when its input changes. Hence we should use
// `try_new` method instead of `try_new_with_schema`.
Projection::try_new(projection.expr.to_vec(), Arc::new(inputs[0].clone()))
.map(LogicalPlan::Projection)
}
LogicalPlan::Window(Window { window_expr, .. }) => Ok(LogicalPlan::Window(
Window::try_new(window_expr.to_vec(), Arc::new(inputs[0].clone()))?,
)),
LogicalPlan::Aggregate(Aggregate {
group_expr,
aggr_expr,
..
}) => Aggregate::try_new(
// Schema of the aggregate may change
// when its input changes. Hence we should use
// `try_new` method instead of `try_new_with_schema`.
Arc::new(inputs[0].clone()),
group_expr.to_vec(),
aggr_expr.to_vec(),
)
.map(LogicalPlan::Aggregate),
_ => self.with_new_exprs(self.expressions(), inputs),
}
self.with_new_exprs(self.expressions(), inputs)
}

/// Returns a new `LogicalPlan` based on `self` with inputs and
Expand All @@ -591,10 +565,6 @@ impl LogicalPlan {
/// // create new plan using rewritten_exprs in same position
/// let new_plan = plan.new_with_exprs(rewritten_exprs, new_inputs);
/// ```
///
/// Note: sometimes [`Self::with_new_exprs`] will use schema of
/// original plan, it will not change the scheam. Such as
/// `Projection/Aggregate/Window`
pub fn with_new_exprs(
&self,
mut expr: Vec<Expr>,
Expand Down Expand Up @@ -706,17 +676,10 @@ impl LogicalPlan {
}))
}
},
LogicalPlan::Window(Window {
window_expr,
schema,
..
}) => {
LogicalPlan::Window(Window { window_expr, .. }) => {
assert_eq!(window_expr.len(), expr.len());
Ok(LogicalPlan::Window(Window {
input: Arc::new(inputs[0].clone()),
window_expr: expr,
schema: schema.clone(),
}))
Window::try_new(expr, Arc::new(inputs[0].clone()))
.map(LogicalPlan::Window)
}
LogicalPlan::Aggregate(Aggregate { group_expr, .. }) => {
// group exprs are the first expressions
Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/src/tree_node/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl TreeNode for LogicalPlan {
.zip(new_children.iter())
.any(|(c1, c2)| c1 != &c2)
{
self.with_new_inputs(new_children.as_slice())
self.with_new_exprs(self.expressions(), new_children.as_slice())
} else {
Ok(self)
}
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/eliminate_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ impl OptimizerRule for EliminateOuterJoin {
schema: join.schema.clone(),
null_equals_null: join.null_equals_null,
});
let new_plan = plan.with_new_inputs(&[new_join])?;
let new_plan =
plan.with_new_exprs(plan.expressions(), &[new_join])?;
Ok(Some(new_plan))
}
_ => Ok(None),
Expand Down
3 changes: 2 additions & 1 deletion datafusion/optimizer/src/optimize_projections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,8 @@ fn optimize_projections(
// `old_child` during construction:
.map(|(new_input, old_child)| new_input.unwrap_or_else(|| old_child.clone()))
.collect::<Vec<_>>();
plan.with_new_inputs(&new_inputs).map(Some)
plan.with_new_exprs(plan.expressions(), &new_inputs)
.map(Some)
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ impl Optimizer {
})
.collect::<Vec<_>>();

Ok(Some(plan.with_new_inputs(&new_inputs)?))
Ok(Some(plan.with_new_exprs(plan.expressions(), &new_inputs)?))
}

/// Use a rule to optimize the whole plan.
Expand Down
28 changes: 19 additions & 9 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,9 +691,11 @@ impl OptimizerRule for PushDownFilter {
| LogicalPlan::Distinct(_)
| LogicalPlan::Sort(_) => {
// commutable
let new_filter =
plan.with_new_inputs(&[child_plan.inputs()[0].clone()])?;
child_plan.with_new_inputs(&[new_filter])?
let new_filter = plan.with_new_exprs(
plan.expressions(),
&[child_plan.inputs()[0].clone()],
)?;
child_plan.with_new_exprs(child_plan.expressions(), &[new_filter])?
}
LogicalPlan::SubqueryAlias(subquery_alias) => {
let mut replace_map = HashMap::new();
Expand All @@ -716,7 +718,7 @@ impl OptimizerRule for PushDownFilter {
new_predicate,
subquery_alias.input.clone(),
)?);
child_plan.with_new_inputs(&[new_filter])?
child_plan.with_new_exprs(child_plan.expressions(), &[new_filter])?
}
LogicalPlan::Projection(projection) => {
// A projection is filter-commutable if it do not contain volatile predicates or contain volatile
Expand Down Expand Up @@ -760,10 +762,15 @@ impl OptimizerRule for PushDownFilter {
)?);

match conjunction(keep_predicates) {
None => child_plan.with_new_inputs(&[new_filter])?,
None => child_plan.with_new_exprs(
child_plan.expressions(),
&[new_filter],
)?,
Some(keep_predicate) => {
let child_plan =
child_plan.with_new_inputs(&[new_filter])?;
let child_plan = child_plan.with_new_exprs(
child_plan.expressions(),
&[new_filter],
)?;
LogicalPlan::Filter(Filter::try_new(
keep_predicate,
Arc::new(child_plan),
Expand Down Expand Up @@ -837,7 +844,9 @@ impl OptimizerRule for PushDownFilter {
)?),
None => (*agg.input).clone(),
};
let new_agg = filter.input.with_new_inputs(&vec![child])?;
let new_agg = filter
.input
.with_new_exprs(filter.input.expressions(), &vec![child])?;
match conjunction(keep_predicates) {
Some(predicate) => LogicalPlan::Filter(Filter::try_new(
predicate,
Expand Down Expand Up @@ -942,7 +951,8 @@ impl OptimizerRule for PushDownFilter {
None => extension_plan.node.inputs().into_iter().cloned().collect(),
};
// extension with new inputs.
let new_extension = child_plan.with_new_inputs(&new_children)?;
let new_extension =
child_plan.with_new_exprs(child_plan.expressions(), &new_children)?;

match conjunction(keep_predicates) {
Some(predicate) => LogicalPlan::Filter(Filter::try_new(
Expand Down
23 changes: 13 additions & 10 deletions datafusion/optimizer/src/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl OptimizerRule for PushDownLimit {
fetch: scan.fetch.map(|x| min(x, limit)).or(Some(limit)),
projected_schema: scan.projected_schema.clone(),
});
Some(plan.with_new_inputs(&[new_input])?)
Some(plan.with_new_exprs(plan.expressions(), &[new_input])?)
}
}
LogicalPlan::Union(union) => {
Expand All @@ -145,7 +145,7 @@ impl OptimizerRule for PushDownLimit {
inputs: new_inputs,
schema: union.schema.clone(),
});
Some(plan.with_new_inputs(&[union])?)
Some(plan.with_new_exprs(plan.expressions(), &[union])?)
}

LogicalPlan::CrossJoin(cross_join) => {
Expand All @@ -166,15 +166,16 @@ impl OptimizerRule for PushDownLimit {
right: Arc::new(new_right),
schema: plan.schema().clone(),
});
Some(plan.with_new_inputs(&[new_cross_join])?)
Some(plan.with_new_exprs(plan.expressions(), &[new_cross_join])?)
}

LogicalPlan::Join(join) => {
let new_join = push_down_join(join, fetch + skip);
match new_join {
Some(new_join) => {
Some(plan.with_new_inputs(&[LogicalPlan::Join(new_join)])?)
}
Some(new_join) => Some(plan.with_new_exprs(
plan.expressions(),
&[LogicalPlan::Join(new_join)],
)?),
None => None,
}
}
Expand All @@ -192,14 +193,16 @@ impl OptimizerRule for PushDownLimit {
input: Arc::new((*sort.input).clone()),
fetch: new_fetch,
});
Some(plan.with_new_inputs(&[new_sort])?)
Some(plan.with_new_exprs(plan.expressions(), &[new_sort])?)
}
}
LogicalPlan::Projection(_) | LogicalPlan::SubqueryAlias(_) => {
// commute
let new_limit =
plan.with_new_inputs(&[child_plan.inputs()[0].clone()])?;
Some(child_plan.with_new_inputs(&[new_limit])?)
let new_limit = plan.with_new_exprs(
plan.expressions(),
&[child_plan.inputs()[0].clone()],
)?;
Some(child_plan.with_new_exprs(child_plan.expressions(), &[new_limit])?)
}
_ => None,
};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub fn optimize_children(
new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
}
if plan_is_changed {
Ok(Some(plan.with_new_inputs(&new_inputs)?))
Ok(Some(plan.with_new_exprs(plan.expressions(), &new_inputs)?))
} else {
Ok(None)
}
Expand Down

0 comments on commit bf0a39a

Please sign in to comment.