-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix push_down_filter
for pushing filters on grouping columns rather than aggregate columns
#4447
Conversation
|| !columns | ||
.intersection(&used_columns) | ||
.collect::<HashSet<_>>() | ||
.is_empty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original performance was bad
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned in the last PR, I think we do not need to check the aggregate Exprs, but just check the group by Exprs. In some cases, the same column can exist in both aggregate Exprs and group by Exprs, for example select count(distinct col_a), col_a from table group by col_a;
. If there is a Filter applied to col_a, the Filter can still be pushed down even it is referred by the agg Exprs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic should check all the columns used by the Filter predicate is the subset of the group by Exprs output Columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. For push_down_filter through Agg, we can push Expr
in groupby_expr
.
Has add it.
@@ -910,11 +922,9 @@ mod tests { | |||
// rewrite to CNF | |||
// (c = 1 OR c = 1) [can pushDown] AND (c = 1 OR b > 3) AND (b > 2 OR C = 1) AND (b > 2 OR b > 3) | |||
|
|||
let expected = "\ | |||
Filter: (test.c = Int64(1) OR b > Int64(3)) AND (b > Int64(2) OR test.c = Int64(1)) AND (b > Int64(2) OR b > Int64(3))\ | |||
let expected = "Filter: (test.c = Int64(1) OR test.c = Int64(1)) AND (test.c = Int64(1) OR b > Int64(3)) AND (b > Int64(2) OR test.c = Int64(1)) AND (b > Int64(2) OR b > Int64(3))\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @Ted-Jiang
let expected = "\ | ||
Filter: (test.c = Int64(1) OR b > Int64(3)) AND (b > Int64(2) OR test.c = Int64(1)) AND (b > Int64(2) OR b > Int64(3))\ | ||
let expected = "Filter: (test.c = Int64(1) OR test.c = Int64(1)) AND (test.c = Int64(1) OR b > Int64(3)) AND (b > Int64(2) OR test.c = Int64(1)) AND (b > Int64(2) OR b > Int64(3))\ | ||
\n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b) AS b]]\ | ||
\n Filter: test.c = Int64(1) OR test.c = Int64(1)\ | ||
\n TableScan: test"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Original plan is wrong.😂
I think we need to delete this wrong UT.
Filter include column that not in output of Aggregate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it was my fault.
col_c
should not exist in filter. Need delete it 😂
@jackwener
LGTM. |
@alamb @Dandandan PTAL |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me -- thank you @jackwener
} | ||
} | ||
|
||
let child = match conjunction(push_predicates) { | ||
// As for plan Filter: Column(a+b) > 0 -- Agg: groupby:[Column(a)+Column(b)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice -- this is getting quite sophisticated.
// So we need create a replace_map, add {`a+b` --> Expr(Column(a)+Column(b))} | ||
let mut replace_map = HashMap::new(); | ||
for expr in &agg.group_expr { | ||
replace_map.insert(expr.display_name()?, expr.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Double checked that display_name
is the right one: https://docs.rs/datafusion/14.0.0/datafusion/prelude/enum.Expr.html#method.display_name 👍
\n TableScan: test"; | ||
let expected = | ||
"Aggregate: groupBy=[[test.b + test.a]], aggr=[[SUM(test.a), test.b]]\ | ||
\n Filter: test.b + test.a > Int64(10)\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 very nice
\n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b) AS b]]\ | ||
\n Filter: test.c = Int64(1) OR test.c = Int64(1)\ | ||
\n TableScan: test"; | ||
Filter: b > Int64(10)\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this new plan is correct
let expected = "Projection: c, COUNT(UInt8(1))\ | ||
\n Projection: test.col_int32 + test.col_uint32 AS c, COUNT(UInt8(1))\ | ||
\n Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS Int32)]], aggr=[[COUNT(UInt8(1))]]\ | ||
\n Filter: test.col_int32 + CAST(test.col_uint32 AS Int32) > Int32(3)\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
push_down_filter
push Expr
instead of column
.push_down_filter
for pushing filters on grouping columns rather than aggregate columns
Benchmark runs are scheduled for baseline = 8db99d2 and contender = 0509692. 0509692 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Closes #4401.
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?