Skip to content

Commit

Permalink
[multistage] refactor traversals of stage nodes into visitor pattern (#…
Browse files Browse the repository at this point in the history
…9560)

This PR improves the traversal code around StageNode. It sets up a common pattern for visiting nodes, collecting information, rewriting and making other changes. This PR is setup for one that will help us implement a global sort stage for LIMIT/OFFSET queries and support sort push down.

There are five main parts to look at:
1. I added the `StageNodeVisitor` interface and implemented `visit` in all of the Stage Node implementations
2. I refactored the `partitionKey` optimization (that removes a shuffle if not necessary) into a Visitor (`ShuffleRewriter`)
3. I refactored constructing the `QueryPlan` metadata into a visitor (this is in preparation for the next PR) (`QueryPlanGenerator`)
4. I refactored constructing the `Operator` into a visitor (`PhyscialPlanBuilder`)
5. I refactored `QueryPlan#explain` into a visitor, and also improved the functionality (see new plan explain below)

Lastly, I added some quality of life improvements in debug-ability and I identified a "bug" in nested loop joins - though I'll fix that one in a future PR (see `FIXME` comment)

Example of the improved explain (it now properly recognizes which nodes are executing what code):
```
[0]@localhost:51925 MAIL_RECEIVE(RANDOM_DISTRIBUTED)
├── [1]@localhost:51923 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost:51925} (Subtree Omitted)
└── [1]@localhost:51924 MAIL_SEND(RANDOM_DISTRIBUTED)->{[0]@localhost:51925}
   └── [1]@localhost:51924 JOIN
      ├── [1]@localhost:51924 MAIL_RECEIVE(HASH_DISTRIBUTED)
      │   ├── [2]@localhost:51924 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:51923,[1]@localhost:51924}
      │   │   └── [2]@localhost:51924 TABLE SCAN (a) {REALTIME=[a3]}
      │   └── [2]@localhost:51923 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:51923,[1]@localhost:51924}
      │      └── [2]@localhost:51923 TABLE SCAN (a) {REALTIME=[a1, a2]}
      └── [1]@localhost:51924 MAIL_RECEIVE(HASH_DISTRIBUTED)
         └── [3]@localhost:51923 MAIL_SEND(HASH_DISTRIBUTED)->{[1]@localhost:51923,[1]@localhost:51924}
            └── [3]@localhost:51923 TABLE SCAN (b) {REALTIME=[b1]}
```
  • Loading branch information
agavra authored Oct 17, 2022
1 parent 0a442b9 commit 4935326
Show file tree
Hide file tree
Showing 29 changed files with 898 additions and 307 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.planner;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.planner.stage.AggregateNode;
import org.apache.pinot.query.planner.stage.FilterNode;
import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
import org.apache.pinot.query.planner.stage.ProjectNode;
import org.apache.pinot.query.planner.stage.SortNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.StageNodeVisitor;
import org.apache.pinot.query.planner.stage.TableScanNode;
import org.apache.pinot.query.planner.stage.ValueNode;


/**
* A visitor that converts a {@code QueryPlan} into a human-readable string representation.
*
* <p>It is currently not used programmatically and cannot be accessed by the user. Instead,
* it is intended for use in manual debugging (e.g. setting breakpoints and calling QueryPlan#explain()).
*/
public class ExplainPlanStageVisitor implements StageNodeVisitor<StringBuilder, ExplainPlanStageVisitor.Context> {

private final QueryPlan _queryPlan;

/**
* Explains the query plan.
*
* @see QueryPlan#explain()
* @param queryPlan the queryPlan to explain
* @return a String representation of the query plan tree
*/
public static String explain(QueryPlan queryPlan) {
if (queryPlan.getQueryStageMap().isEmpty()) {
return "EMPTY";
}

// the root of a query plan always only has a single node
ServerInstance rootServer = queryPlan.getStageMetadataMap().get(0).getServerInstances().get(0);
return explainFrom(queryPlan, queryPlan.getQueryStageMap().get(0), rootServer);
}

/**
* Explains the query plan from a specific point in the subtree, taking {@code rootServer}
* as the node that is executing this sub-tree. This is helpful for debugging what is happening
* at a given point in time (for example, printing the tree that will be executed on a
* local node right before it is executed).
*
* @param queryPlan the entire query plan, including non-executed portions
* @param node the node to begin traversal
* @param rootServer the server instance that is executing this plan (should execute {@code node})
*
* @return a query plan associated with
*/
public static String explainFrom(QueryPlan queryPlan, StageNode node, ServerInstance rootServer) {
final ExplainPlanStageVisitor visitor = new ExplainPlanStageVisitor(queryPlan);
return node
.visit(visitor, new Context(rootServer, "", "", new StringBuilder()))
.toString();
}

private ExplainPlanStageVisitor(QueryPlan queryPlan) {
_queryPlan = queryPlan;
}

private StringBuilder appendInfo(StageNode node, Context context) {
int stage = node.getStageId();
context._builder
.append(context._prefix)
.append('[')
.append(stage)
.append("]@")
.append(context._host.getHostname())
.append(':')
.append(context._host.getPort())
.append(' ')
.append(node.explain());
return context._builder;
}

private StringBuilder visitSimpleNode(StageNode node, Context context) {
appendInfo(node, context).append('\n');
return node.getInputs().get(0).visit(this, context.next(false, context._host));
}

@Override
public StringBuilder visitAggregate(AggregateNode node, Context context) {
return visitSimpleNode(node, context);
}

@Override
public StringBuilder visitFilter(FilterNode node, Context context) {
return visitSimpleNode(node, context);
}

@Override
public StringBuilder visitJoin(JoinNode node, Context context) {
appendInfo(node, context).append('\n');
node.getInputs().get(0).visit(this, context.next(true, context._host));
node.getInputs().get(1).visit(this, context.next(false, context._host));
return context._builder;
}

@Override
public StringBuilder visitMailboxReceive(MailboxReceiveNode node, Context context) {
appendInfo(node, context).append('\n');

MailboxSendNode sender = (MailboxSendNode) node.getSender();
int senderStageId = node.getSenderStageId();
StageMetadata metadata = _queryPlan.getStageMetadataMap().get(senderStageId);
Map<ServerInstance, Map<String, List<String>>> segments = metadata.getServerInstanceToSegmentsMap();

Iterator<ServerInstance> iterator = metadata.getServerInstances().iterator();
while (iterator.hasNext()) {
ServerInstance serverInstance = iterator.next();
if (segments.containsKey(serverInstance)) {
// always print out leaf stages
sender.visit(this, context.next(iterator.hasNext(), serverInstance));
} else {
if (!iterator.hasNext()) {
// always print out the last one
sender.visit(this, context.next(false, serverInstance));
} else {
// only print short version of the sender node
appendMailboxSend(sender, context.next(true, serverInstance))
.append(" (Subtree Omitted)")
.append('\n');
}
}
}
return context._builder;
}

@Override
public StringBuilder visitMailboxSend(MailboxSendNode node, Context context) {
appendMailboxSend(node, context).append('\n');
return node.getInputs().get(0).visit(this, context.next(false, context._host));
}

private StringBuilder appendMailboxSend(MailboxSendNode node, Context context) {
appendInfo(node, context);

int receiverStageId = node.getReceiverStageId();
List<ServerInstance> servers = _queryPlan.getStageMetadataMap().get(receiverStageId).getServerInstances();
context._builder.append("->");
String receivers = servers.stream()
.map(s -> s.getHostname() + ':' + s.getPort())
.map(s -> "[" + receiverStageId + "]@" + s)
.collect(Collectors.joining(",", "{", "}"));
return context._builder.append(receivers);
}

@Override
public StringBuilder visitProject(ProjectNode node, Context context) {
return visitSimpleNode(node, context);
}

@Override
public StringBuilder visitSort(SortNode node, Context context) {
return visitSimpleNode(node, context);
}

@Override
public StringBuilder visitTableScan(TableScanNode node, Context context) {
return appendInfo(node, context)
.append(' ')
.append(_queryPlan.getStageMetadataMap()
.get(node.getStageId())
.getServerInstanceToSegmentsMap()
.get(context._host))
.append('\n');
}

@Override
public StringBuilder visitValue(ValueNode node, Context context) {
return appendInfo(node, context);
}

static class Context {
final ServerInstance _host;
final String _prefix;
final String _childPrefix;
final StringBuilder _builder;

Context(ServerInstance host, String prefix, String childPrefix, StringBuilder builder) {
_host = host;
_prefix = prefix;
_childPrefix = childPrefix;
_builder = builder;
}

Context next(boolean hasMoreChildren, ServerInstance host) {
return new Context(
host,
hasMoreChildren ? _childPrefix + "├── " : _childPrefix + "└── ",
hasMoreChildren ? _childPrefix + "│ " : _childPrefix + " ",
_builder
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
*/
package org.apache.pinot.query.planner;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.calcite.util.Pair;
import org.apache.pinot.query.planner.logical.LogicalPlanner;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
import org.apache.pinot.query.planner.stage.StageNode;
import org.apache.pinot.query.planner.stage.TableScanNode;


/**
Expand Down Expand Up @@ -75,53 +72,16 @@ public List<Pair<Integer, String>> getQueryResultFields() {
return _queryResultFields;
}

/**
* Explains the {@code QueryPlan}
*
* @return a human-readable tree explaining the query plan
* @see ExplainPlanStageVisitor#explain(QueryPlan)
* @apiNote this is <b>NOT</b> identical to the SQL {@code EXPLAIN PLAN FOR} functionality
* and is instead intended to be used by developers debugging during feature
* development
*/
public String explain() {
if (_queryStageMap.isEmpty()) {
return "EMPTY";
}

StringBuilder builder = new StringBuilder();
explain(
builder,
_queryStageMap.get(0),
"",
"");
return builder.toString();
}

private void explain(
StringBuilder builder,
StageNode root,
String prefix,
String childPrefix
) {
int stage = root.getStageId();

builder
.append(prefix)
.append("[").append(stage).append("] ")
.append(root.explain());

if (root instanceof TableScanNode) {
builder.append(' ');
builder.append(_stageMetadataMap.get(root.getStageId()).getServerInstanceToSegmentsMap());
}

builder.append('\n');

if (root instanceof MailboxReceiveNode) {
int senderStage = ((MailboxReceiveNode) root).getSenderStageId();
StageNode sender = _queryStageMap.get(senderStage);
explain(builder, sender, childPrefix + "└── ", childPrefix + " ");
} else {
for (Iterator<StageNode> iterator = root.getInputs().iterator(); iterator.hasNext();) {
StageNode input = iterator.next();
if (iterator.hasNext()) {
explain(builder, input, childPrefix + "├── ", childPrefix + "│ ");
} else {
explain(builder, input, childPrefix + "└── ", childPrefix + " ");
}
}
}
return ExplainPlanStageVisitor.explain(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,11 @@ public TimeBoundaryInfo getTimeBoundaryInfo() {
public void setTimeBoundaryInfo(TimeBoundaryInfo timeBoundaryInfo) {
_timeBoundaryInfo = timeBoundaryInfo;
}

@Override
public String toString() {
return "StageMetadata{" + "_scannedTables=" + _scannedTables + ", _serverInstances=" + _serverInstances
+ ", _serverInstanceToSegmentsMap=" + _serverInstanceToSegmentsMap + ", _timeBoundaryInfo=" + _timeBoundaryInfo
+ '}';
}
}
Loading

0 comments on commit 4935326

Please sign in to comment.