-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support join-filter pushdown for semi/anti join #4923
Conversation
is this PR ready for review now? I'm working on another issue which might be related to the things that you are working on. When decorrelates the where Exists/IN Subquery to Joins, there is some bug here and the filter was added to wrong side of the plan |
cc: @mingmwang @alamb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ygf11
@@ -0,0 +1,62 @@ | |||
# Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
// filter_push_down does not yet support SEMI/ANTI joins with join conditions | ||
Ok((false, false)) | ||
} | ||
JoinType::LeftSemi | JoinType::RightSemi => Ok((true, true)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought a Semi join had the same semantics as Left join -- namely that the left/right side were preserved but the others weren't
So I would expect this to be something like
JoinType::LeftSemi | JoinType::RightSemi => Ok((true, true)), | |
JoinType::LeftSemi => Ok((false, true)), | |
JoinType::RightSemi => Ok((true, false)), |
Maybe I am mis understanding the semi join
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I played around with it in postgres:
postgres=# create table bar as values (2, 'a'), (3, 'b');
SELECT 2
postgres=# create table foo as values (1, 'X'), (2, 'Y'), (3, 'Z');
SELECT 3
postgres=# explain select * from foo SEMI JOIN bar USING (column1) where bar.column2 != 'b' ;
QUERY PLAN
-------------------------------------------------------------------------
Merge Join (cost=179.17..305.88 rows=8026 width=68)
Merge Cond: (bar.column1 = semi.column1)
-> Sort (cost=90.99..94.15 rows=1264 width=36)
Sort Key: bar.column1
-> Seq Scan on bar (cost=0.00..25.88 rows=1264 width=36)
Filter: (column2 <> 'b'::text)
-> Sort (cost=88.17..91.35 rows=1270 width=36)
Sort Key: semi.column1
-> Seq Scan on foo semi (cost=0.00..22.70 rows=1270 width=36)
(9 rows)
postgres=# select * from foo SEMI JOIN bar USING (column1);
column1 | column2 | column2
---------+---------+---------
2 | Y | a
3 | Z | b
(2 rows)
postgres=# select * from foo SEMI JOIN bar USING (column1) where bar.column2 != 'b' ;
column1 | column2 | column2
---------+---------+---------
2 | Y | a
(1 row)
And this this looks correct to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update 🤦 the postgres foo SEMI JOIN bar
is being interpreted as though SEMI
is an alias, not a different join type
\n Filter: test1.b > UInt32(1)\ | ||
\n TableScan: test1\ | ||
\n Projection: test2.a, test2.b\ | ||
\n Filter: test2.b > UInt32(2)\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above I don't think this is correct -- but I think we would need to review the implementation in spark
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this method's name on_lr_is_preserved()
is very miss leading. It means whether the Join condition can be changed to a Filter condition and push down. For example, for the Full Out Join, both sides should be preserved in the join result, but join conditions can not be changed to Filter and pushed down, and the method returns (false, false)
.
And per my understanding, LeftSemi/Right Semi join are similar to the Inner join case, so the implement should return (true, true)
.
Just share you the SparkSQL's result: explain extended
SELECT t1_id, t1_name FROM t1 LEFT SEMI JOIN t2 ON (t1_id = t2_id and t2_id >= 100 and t1_id >= 100); == Optimized Logical Plan ==
Join LeftSemi, (t1_id#1987050 = t2_id#1987346), Statistics(sizeInBytes=1.0 B)
:- Filter (isnotnull(t1_id#1987050) AND (t1_id#1987050 >= 100)), Statistics(sizeInBytes=1.0 B)
: +- Relation access_views.t1[t1_id#1987050,t1_name#1987051] parquet, Statistics(sizeInBytes=0.0 B)
+- Project [t2_id#1987346], Statistics(sizeInBytes=1.0 B)
+- Filter (isnotnull(t2_id#1987346) AND (t2_id#1987346 >= 100)), Statistics(sizeInBytes=1.0 B)
+- Relation access_views.t2[t2_id#1987346,t2_name#1987347] parquet, Statistics(sizeInBytes=0.0 B)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [t1_id#1987050], [t2_id#1987346], LeftSemi, BuildRight, false
:- Project [t1_id#1987050, t1_name#1987051]
: +- Filter (isnotnull(t1_id#1987050) AND (t1_id#1987050 >= 100))
: +- FileScan parquet access_views.t1[t1_id#1987050,t1_name#1987051] Batched: true, DataFilters: [isnotnull(t1_id#1987050), (t1_id#1987050 >= 100)], Format: Parquet, Location: InMemoryFileIndex[viewfs://hermes-rno/tmp/spark/[email protected]/temp-d6be7dc2-736c-4205-8b..., PartitionFilters: [], PushedFilters: [IsNotNull(t1_id), GreaterThanOrEqual(t1_id,100)], ReadSchema: struct<t1_id:int,t1_name:string>, UsedIndexes: []
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#301830]
+- Project [t2_id#1987346]
+- Filter (isnotnull(t2_id#1987346) AND (t2_id#1987346 >= 100))
+- FileScan parquet access_views.t2[t2_id#1987346] Batched: true, DataFilters: [isnotnull(t2_id#1987346), (t2_id#1987346 >= 100)], Format: Parquet, Location: InMemoryFileIndex[viewfs://hermes-rno/tmp/spark/[email protected]/temp-d6be7dc2-736c-4205-8b..., PartitionFilters: [], PushedFilters: [IsNotNull(t2_id), GreaterThanOrEqual(t2_id,100)], ReadSchema: struct<t2_id:int>, UsedIndexes: []
|
Except for the method name |
@alamb I guess you are mislead by the title of this pr. I also run this in the spark and get the same result with @mingmwang. |
@ygf11 You can have a try. -- left semi join, t2.id filter push down, t1 filter push down
explain extended
SELECT t1.id, t1.name FROM t1 LEFT SEMI JOIN t2 ON (t1.id = t2.id and t2.id > 11);
-- left semi join, t1 filter push down, t2.id filter not push down ???
explain extended
SELECT t1.id, t1.name FROM t1 WHERE EXISTS (SELECT 1 FROM t2 WHERE t1.id = t2.id and t1.id > 11);
-- left semi join, t2.id filter push down, t1 filter not push down ???
explain extended
SELECT t1.id, t1.name FROM t1 WHERE EXISTS (SELECT 1 FROM t2 WHERE t1.id = t2.id and t2.id > 11); |
If @mingmwang is happy with these semantics I think it would be fine to merge I think my confusion stems from the fact that a SEMI join semantically only outputs columns from one relation and also the So a query like this makes sense: SELECT t1.id FROM t1 SEMI JOIN t2 ON (t1.id = t2.id) WHERE t1.value = 5 But a query like this (predicate on t2 after the semi join) SELECT t1.id FROM t1 SEMI JOIN t2 ON (t1.id = t2.id) WHERE t2.value = 5 Does not make sense as t2.value does not appear in the query output Maybe we can add some error cases shpwing that It would also be good to cover such a predicate in the ON clause (which does make sense and could be pushed down I think): SELECT t1.id FROM t1 SEMI JOIN t2 ON (t1.id = t2.id AND t2.value = 5) Thanks for this pr @ygf11 |
Added. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good -- thank you @ygf11
(44, 'x', 3), | ||
(55, 'w', 3); | ||
|
||
# left semi with wrong where clause |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Benchmark runs are scheduled for baseline = 19f6f19 and contender = dde23ef. dde23ef is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Relative to #4413
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?