Skip to content

Commit

Permalink
[multistage] default enable dynamic broadcast for SEMI join (#11696)
Browse files Browse the repository at this point in the history
Switch dynamic broadcast to default enable unless otherwise specified with join_strategy

Enable dynamic broadcast for SEMI joins generally are more efficient

Exceptions:
- the situation where the right table is so large and data cannot fit in memory
- the left table is partitioned but we haven't implement logic to partition the right table based on the partition function. 
in either case we have a fallback option to use hash table join: `joinOptions(join_strategy = 'hash_table')`

---------

Co-authored-by: Rong Rong <[email protected]>
  • Loading branch information
walterddr and Rong Rong authored Sep 27, 2023
1 parent b16cdec commit 0e24863
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public static class AggregateOptions {

public static class JoinHintOptions {
public static final String JOIN_STRATEGY = "join_strategy";
public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = "dynamic_broadcast";
public static final String HASH_TABLE_JOIN_STRATEGY = "hash_table";
/**
* Max rows allowed to build the right table hash collection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@
public class PinotJoinToDynamicBroadcastRule extends RelOptRule {
public static final PinotJoinToDynamicBroadcastRule INSTANCE =
new PinotJoinToDynamicBroadcastRule(PinotRuleUtils.PINOT_REL_FACTORY);
private static final String DYNAMIC_BROADCAST_HINT_OPTION_VALUE = "dynamic_broadcast";

public PinotJoinToDynamicBroadcastRule(RelBuilderFactory factory) {
super(operand(LogicalJoin.class, any()), factory, null);
Expand All @@ -134,17 +133,18 @@ public boolean matches(RelOptRuleCall call) {
PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
List<String> joinStrategies = joinStrategyString != null ? StringUtils.split(joinStrategyString, ",")
: Collections.emptyList();
if (!joinStrategies.contains(DYNAMIC_BROADCAST_HINT_OPTION_VALUE)) {
return false;
}
boolean explicitOtherStrategy = joinStrategies.size() > 0
&& !joinStrategies.contains(PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY);

JoinInfo joinInfo = join.analyzeCondition();
RelNode left = join.getLeft() instanceof HepRelVertex ? ((HepRelVertex) join.getLeft()).getCurrentRel()
: join.getLeft();
RelNode right = join.getRight() instanceof HepRelVertex ? ((HepRelVertex) join.getRight()).getCurrentRel()
: join.getRight();
return left instanceof Exchange && right instanceof Exchange
&& PinotRuleUtils.noExchangeInSubtree(left.getInput(0))
&& (join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty());
// default enable dynamic broadcast for SEMI join unless other join strategy were specified
&& (!explicitOtherStrategy && join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty());
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions pinot-query-planner/src/test/resources/queries/JoinPlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@
},
{
"description": "Semi join with IN clause",
"sql": "EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col3 IN (SELECT col3 FROM b)",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy = 'hash_table') */ col1, col2 FROM a WHERE col3 IN (SELECT col3 FROM b)",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$1])",
Expand All @@ -239,7 +239,7 @@
},
{
"description": "Semi join with multiple IN clause",
"sql": "EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col2 = 'test' AND col3 IN (SELECT col3 FROM b WHERE col1='foo') AND col3 IN (SELECT col3 FROM b WHERE col1='bar') AND col3 IN (SELECT col3 FROM b WHERE col1='foobar')",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy = 'hash_table') */ col1, col2 FROM a WHERE col2 = 'test' AND col3 IN (SELECT col3 FROM b WHERE col1='foo') AND col3 IN (SELECT col3 FROM b WHERE col1='bar') AND col3 IN (SELECT col3 FROM b WHERE col1='foobar')",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$1])",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
},
{
"description": "semi-join with dynamic_broadcast join strategy",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ a.col1, a.col2 FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0)",
"sql": "EXPLAIN PLAN FOR SELECT a.col1, a.col2 FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0)",
"output": [
"Execution Plan",
"\nPinotLogicalExchange(distribution=[hash[0]])",
Expand All @@ -38,7 +38,7 @@
},
{
"description": "semi-join with dynamic_broadcast join strategy then group-by on same key",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'), aggOptionsInternal(agg_type='DIRECT') */ a.col1, SUM(a.col3) FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) GROUP BY 1",
"sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptionsInternal(agg_type='DIRECT') */ a.col1, SUM(a.col3) FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) GROUP BY 1",
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
Expand All @@ -55,7 +55,7 @@
},
{
"description": "semi-join with dynamic_broadcast join strategy then group-by on different key",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ a.col2, SUM(a.col3) FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) GROUP BY 1",
"sql": "EXPLAIN PLAN FOR SELECT a.col2, SUM(a.col3) FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) GROUP BY 1",
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
Expand Down

0 comments on commit 0e24863

Please sign in to comment.