Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multistage] refactor traversals of stage nodes into visitor pattern #9560

Merged
merged 9 commits into from
Oct 17, 2022

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Oct 8, 2022

This PR improves the traversal code around StageNode. It sets up a common pattern for visiting nodes, collecting information, rewriting and making other changes. This PR is setup for one that will help us implement a global sort stage for LIMIT/OFFSET queries and support sort push down.

There are five main parts to look at:

  1. I added the StageNodeVisitor interface and implemented visit in all of the Stage Node implementations
  2. I refactored the partitionKey optimization (that removes a shuffle if not necessary) into a Visitor (ShuffleRewriter)
  3. I refactored constructing the QueryPlan metadata into a visitor (this is in preparation for the next PR) (QueryPlanGenerator)
  4. I refactored constructing the Operator into a visitor (PhyscialPlanBuilder)
  5. I refactored QueryPlan#explain into a visitor, and also improved the functionality (see new plan explain below)

Lastly, I added some quality of life improvements in debug-ability and I identified a "bug" in nested loop joins - though I'll fix that one in a future PR (see FIXME comment)

Example of the improved explain (it now properly recognizes which nodes are executing what code):

[0]@localhost:51925 MAIL_RECEIVE(RANDOM_DISTRIBUTED)
├── [1]@localhost:51923 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost:51925} (Subtree Omitted)
└── [1]@localhost:51924 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost:51925}
   └── [1]@localhost:51924 JOIN
      ├── [1]@localhost:51924 MAIL_RECEIVE(HASH_DISTRIBUTED)
      │   ├── [2]@localhost:51924 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:51923,[1]@localhost:51924}
      │   │   └── [2]@localhost:51924 TABLE SCAN (a) {REALTIME=[a3]}
      │   └── [2]@localhost:51923 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:51923,[1]@localhost:51924}
      │      └── [2]@localhost:51923 TABLE SCAN (a) {REALTIME=[a1, a2]}
      └── [1]@localhost:51924 MAIL_RECEIVE(HASH_DISTRIBUTED)
         └── [3]@localhost:51923 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:51923,[1]@localhost:51924}
            └── [3]@localhost:51923 TABLE SCAN (b) {REALTIME=[b1]}

agavra added 2 commits October 7, 2022 20:53
This PR improves the traversal code around StageNode. It sets up a
common pattern for visiting nodes, collecting information, rewriting and
making other changes.

This PR is setup for one that will help us implement a global sort stage
for LIMIT/OFFSET queries.
@codecov-commenter
Copy link

codecov-commenter commented Oct 11, 2022

Codecov Report

Merging #9560 (f390492) into master (8cdae92) will increase coverage by 40.25%.
The diff coverage is 63.67%.

@@              Coverage Diff              @@
##             master    #9560       +/-   ##
=============================================
+ Coverage     28.28%   68.54%   +40.25%     
- Complexity       53     4920     +4867     
=============================================
  Files          1917     1938       +21     
  Lines        102594   103383      +789     
  Branches      15586    15683       +97     
=============================================
+ Hits          29022    70867    +41845     
+ Misses        70735    27497    -43238     
- Partials       2837     5019     +2182     
Flag Coverage Δ
integration1 ?
integration2 24.71% <0.00%> (+0.13%) ⬆️
unittests1 67.34% <63.67%> (?)
unittests2 15.60% <63.67%> (?)

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...e/pinot/query/planner/ExplainPlanStageVisitor.java 0.00% <0.00%> (ø)
...java/org/apache/pinot/query/planner/QueryPlan.java 88.88% <0.00%> (+88.88%) ⬆️
.../org/apache/pinot/query/planner/StageMetadata.java 95.00% <0.00%> (+95.00%) ⬆️
...e/pinot/query/planner/stage/AbstractStageNode.java 100.00% <ø> (+100.00%) ⬆️
...apache/pinot/query/mailbox/GrpcMailboxService.java 94.11% <0.00%> (+94.11%) ⬆️
.../pinot/query/runtime/blocks/TransferableBlock.java 69.56% <0.00%> (+69.56%) ⬆️
...inot/query/runtime/operator/AggregateOperator.java 84.21% <ø> (+84.21%) ⬆️
...pinot/query/runtime/operator/HashJoinOperator.java 82.60% <ø> (+82.60%) ⬆️
...query/runtime/operator/MailboxReceiveOperator.java 79.31% <ø> (+79.31%) ⬆️
...ot/query/runtime/operator/MailboxSendOperator.java 90.42% <ø> (+90.42%) ⬆️
... and 1423 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partial review on the planner side. will do another pass on the runtime a bit later

@@ -87,7 +87,7 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana
_serverExecutor = new ServerQueryExecutorV1Impl();
_serverExecutor.init(config, instanceDataManager, serverMetrics);
_workerExecutor = new WorkerQueryExecutor();
_workerExecutor.init(config, serverMetrics, _mailboxService, _hostname, _port);
_workerExecutor.init(_mailboxService, _hostname, _port);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why reducing the init method signature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these weren't used - so just some cleanup. I can remove it if I'm missing something

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please do not remove them for now. config is useful for overrides and metrics we will be using in the future for sure

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't fixed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops! sorry I thought I had done that but looks like I only fixed the other code path 😬 shame on me

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you revert this change?

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good to me overall. please see the comments and otherwise good to go

@@ -87,7 +87,7 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana
_serverExecutor = new ServerQueryExecutorV1Impl();
_serverExecutor.init(config, instanceDataManager, serverMetrics);
_workerExecutor = new WorkerQueryExecutor();
_workerExecutor.init(config, serverMetrics, _mailboxService, _hostname, _port);
_workerExecutor.init(_mailboxService, _hostname, _port);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't fixed

Comment on lines +83 to +87
// FIXME: there's a bug where singletonInstance may be null in the case of a JOIN where
// one side is BROADCAST and the other is SINGLETON (this is the case with nested loop
// joins for inequality conditions). This causes NPEs in the logs, but actually works
// because the side that hits the NPE doesn't expect to get any data anyway (that's the
// side that gets the broadcast from one side but nothing from the SINGLETON)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we create an issue and link it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#9592 - added to comment as well

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm.

Comment on lines 41 to 42
* inspecting whether all data required by a specific subtree already resides on
* a single host. It gathers the information recursively by checking which partitioned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* inspecting whether all data required by a specific subtree already resides on
* a single host. It gathers the information recursively by checking which partitioned
* inspecting whether all data required by a specific subtree are already colocated.
* It gathers the information recursively by checking which partitioned

import org.apache.pinot.query.runtime.operator.TransformOperator;


public class PhysicalPlanVisitor implements StageNodeVisitor<Operator<TransferableBlock>, Void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc

@walterddr walterddr merged commit 4935326 into apache:master Oct 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants