Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multistage] default enable dynamic broadcast for SEMI #11696

Merged
merged 2 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public static class AggregateOptions {

public static class JoinHintOptions {
public static final String JOIN_STRATEGY = "join_strategy";
public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = "dynamic_broadcast";
public static final String HASH_TABLE_JOIN_STRATEGY = "hash_table";
/**
* Max rows allowed to build the right table hash collection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@
public class PinotJoinToDynamicBroadcastRule extends RelOptRule {
public static final PinotJoinToDynamicBroadcastRule INSTANCE =
new PinotJoinToDynamicBroadcastRule(PinotRuleUtils.PINOT_REL_FACTORY);
private static final String DYNAMIC_BROADCAST_HINT_OPTION_VALUE = "dynamic_broadcast";

public PinotJoinToDynamicBroadcastRule(RelBuilderFactory factory) {
super(operand(LogicalJoin.class, any()), factory, null);
Expand All @@ -134,17 +133,18 @@ public boolean matches(RelOptRuleCall call) {
PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
List<String> joinStrategies = joinStrategyString != null ? StringUtils.split(joinStrategyString, ",")
: Collections.emptyList();
if (!joinStrategies.contains(DYNAMIC_BROADCAST_HINT_OPTION_VALUE)) {
return false;
}
boolean explicitOtherStrategy = joinStrategies.size() > 0
&& !joinStrategies.contains(PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY);

JoinInfo joinInfo = join.analyzeCondition();
RelNode left = join.getLeft() instanceof HepRelVertex ? ((HepRelVertex) join.getLeft()).getCurrentRel()
: join.getLeft();
RelNode right = join.getRight() instanceof HepRelVertex ? ((HepRelVertex) join.getRight()).getCurrentRel()
: join.getRight();
return left instanceof Exchange && right instanceof Exchange
&& PinotRuleUtils.noExchangeInSubtree(left.getInput(0))
&& (join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty());
// default enable dynamic broadcast for SEMI join unless other join strategy were specified
&& (!explicitOtherStrategy && join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is dynamic broadcast useful for non-SEMI join? Currently it won't be applied even if we explicitly hint about it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be in the future. currently it will only apply to SEMI join

}

@Override
Expand Down
4 changes: 2 additions & 2 deletions pinot-query-planner/src/test/resources/queries/JoinPlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@
},
{
"description": "Semi join with IN clause",
"sql": "EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col3 IN (SELECT col3 FROM b)",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy = 'hash_table') */ col1, col2 FROM a WHERE col3 IN (SELECT col3 FROM b)",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$1])",
Expand All @@ -239,7 +239,7 @@
},
{
"description": "Semi join with multiple IN clause",
"sql": "EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col2 = 'test' AND col3 IN (SELECT col3 FROM b WHERE col1='foo') AND col3 IN (SELECT col3 FROM b WHERE col1='bar') AND col3 IN (SELECT col3 FROM b WHERE col1='foobar')",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy = 'hash_table') */ col1, col2 FROM a WHERE col2 = 'test' AND col3 IN (SELECT col3 FROM b WHERE col1='foo') AND col3 IN (SELECT col3 FROM b WHERE col1='bar') AND col3 IN (SELECT col3 FROM b WHERE col1='foobar')",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$1])",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
},
{
"description": "semi-join with dynamic_broadcast join strategy",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ a.col1, a.col2 FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0)",
"sql": "EXPLAIN PLAN FOR SELECT a.col1, a.col2 FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0)",
"output": [
"Execution Plan",
"\nPinotLogicalExchange(distribution=[hash[0]])",
Expand All @@ -38,7 +38,7 @@
},
{
"description": "semi-join with dynamic_broadcast join strategy then group-by on same key",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'), aggOptionsInternal(agg_type='DIRECT') */ a.col1, SUM(a.col3) FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) GROUP BY 1",
"sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptionsInternal(agg_type='DIRECT') */ a.col1, SUM(a.col3) FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) GROUP BY 1",
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
Expand All @@ -55,7 +55,7 @@
},
{
"description": "semi-join with dynamic_broadcast join strategy then group-by on different key",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ a.col2, SUM(a.col3) FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) GROUP BY 1",
"sql": "EXPLAIN PLAN FOR SELECT a.col2, SUM(a.col3) FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) GROUP BY 1",
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
Expand Down