From d12d5ffdb479341e8c23c636137337267d55dd6e Mon Sep 17 00:00:00 2001 From: Almog Gavra Date: Sun, 9 Oct 2022 17:29:26 -0700 Subject: [PATCH] also refactor QueryPlan#explan --- .../planner/ExplainPlanStageVisitor.java | 201 ++++++++++++++++++ .../apache/pinot/query/planner/QueryPlan.java | 51 +---- 2 files changed, 202 insertions(+), 50 deletions(-) create mode 100644 pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java new file mode 100644 index 000000000000..6cafb974d8cc --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.query.planner.stage.AggregateNode; +import org.apache.pinot.query.planner.stage.FilterNode; +import org.apache.pinot.query.planner.stage.JoinNode; +import org.apache.pinot.query.planner.stage.MailboxReceiveNode; +import org.apache.pinot.query.planner.stage.MailboxSendNode; +import org.apache.pinot.query.planner.stage.ProjectNode; +import org.apache.pinot.query.planner.stage.SortNode; +import org.apache.pinot.query.planner.stage.StageNode; +import org.apache.pinot.query.planner.stage.StageNodeVisitor; +import org.apache.pinot.query.planner.stage.TableScanNode; +import org.apache.pinot.query.planner.stage.ValueNode; + + +public class ExplainPlanStageVisitor implements StageNodeVisitor { + + private final QueryPlan _queryPlan; + + private ExplainPlanStageVisitor(QueryPlan queryPlan) { + _queryPlan = queryPlan; + } + + public static String explain(QueryPlan queryPlan) { + if (queryPlan.getQueryStageMap().isEmpty()) { + return "EMPTY"; + } + + // the root of a query plan always only has a single node + ServerInstance rootServer = queryPlan.getStageMetadataMap().get(0).getServerInstances().get(0); + return explainFrom(queryPlan, queryPlan.getQueryStageMap().get(0), rootServer); + } + + public static String explainFrom(QueryPlan queryPlan, StageNode node, ServerInstance rootServer) { + final ExplainPlanStageVisitor visitor = new ExplainPlanStageVisitor(queryPlan); + return node + .visit(visitor, new Context(rootServer, "", "", new StringBuilder())) + .toString(); + } + + private StringBuilder appendInfo(StageNode node, Context context) { + int stage = node.getStageId(); + context._builder + .append(context._prefix) + .append('[') + .append(stage) + .append(']') + .append('@') + .append(context._host.getHostname()) + .append(':') + .append(context._host.getPort()) + .append(' ') + .append(node.explain()); + return context._builder; + } + + private StringBuilder visitSimpleNode(StageNode node, Context context) { + appendInfo(node, context).append('\n'); + return node.getInputs().get(0).visit(this, context.next(false, context._host)); + } + + @Override + public StringBuilder visitAggregate(AggregateNode node, Context context) { + return visitSimpleNode(node, context); + } + + @Override + public StringBuilder visitFilter(FilterNode node, Context context) { + return visitSimpleNode(node, context); + } + + @Override + public StringBuilder visitJoin(JoinNode node, Context context) { + appendInfo(node, context).append('\n'); + node.getInputs().get(0).visit(this, context.next(true, context._host)); + node.getInputs().get(1).visit(this, context.next(false, context._host)); + return context._builder; + } + + @Override + public StringBuilder visitMailboxReceive(MailboxReceiveNode node, Context context) { + appendInfo(node, context).append('\n'); + + MailboxSendNode sender = (MailboxSendNode) node.getSender(); + int senderStageId = node.getSenderStageId(); + StageMetadata metadata = _queryPlan.getStageMetadataMap().get(senderStageId); + Map>> segments = metadata.getServerInstanceToSegmentsMap(); + + Iterator iterator = metadata.getServerInstances().iterator(); + while (iterator.hasNext()) { + ServerInstance serverInstance = iterator.next(); + if (segments.containsKey(serverInstance)) { + // always print out leaf stages + sender.visit(this, context.next(iterator.hasNext(), serverInstance)); + } else { + if (!iterator.hasNext()) { + // always print out the last one + sender.visit(this, context.next(false, serverInstance)); + } else { + // only print short version of the sender node + appendMailboxSend(sender, context.next(true, serverInstance)) + .append(" (Subtree Omitted)") + .append('\n'); + } + } + } + return context._builder; + } + + @Override + public StringBuilder visitMailboxSend(MailboxSendNode node, Context context) { + appendMailboxSend(node, context).append('\n'); + return node.getInputs().get(0).visit(this, context.next(false, context._host)); + } + + private StringBuilder appendMailboxSend(MailboxSendNode node, Context context) { + appendInfo(node, context); + + int receiverStageId = node.getReceiverStageId(); + List servers = _queryPlan.getStageMetadataMap().get(receiverStageId).getServerInstances(); + context._builder.append("->"); + String receivers = servers.stream() + .map(s -> s.getHostname() + ':' + s.getPort()) + .map(s -> "[" + receiverStageId + "]@" + s) + .collect(Collectors.joining(",", "{", "}")); + return context._builder.append(receivers); + } + + @Override + public StringBuilder visitProject(ProjectNode node, Context context) { + return visitSimpleNode(node, context); + } + + @Override + public StringBuilder visitSort(SortNode node, Context context) { + return visitSimpleNode(node, context); + } + + @Override + public StringBuilder visitTableScan(TableScanNode node, Context context) { + return appendInfo(node, context) + .append(' ') + .append(_queryPlan.getStageMetadataMap() + .get(node.getStageId()) + .getServerInstanceToSegmentsMap() + .get(context._host)) + .append('\n'); + } + + @Override + public StringBuilder visitValue(ValueNode node, Context context) { + return appendInfo(node, context); + } + + public static class Context { + final ServerInstance _host; + final String _prefix; + final String _childPrefix; + + public Context(ServerInstance host, String prefix, String childPrefix, StringBuilder builder) { + _host = host; + _prefix = prefix; + _childPrefix = childPrefix; + _builder = builder; + } + + StringBuilder _builder; + + public Context next(boolean hasMoreChildren, ServerInstance host) { + return new Context( + host, + hasMoreChildren ? _childPrefix + "├── " : _childPrefix + "└── ", + hasMoreChildren ? _childPrefix + "│ " : _childPrefix + " ", + _builder + ); + } + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java index d8770ed3bfe8..166278a30e2f 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java @@ -18,14 +18,11 @@ */ package org.apache.pinot.query.planner; -import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.calcite.util.Pair; import org.apache.pinot.query.planner.logical.LogicalPlanner; -import org.apache.pinot.query.planner.stage.MailboxReceiveNode; import org.apache.pinot.query.planner.stage.StageNode; -import org.apache.pinot.query.planner.stage.TableScanNode; /** @@ -76,52 +73,6 @@ public List> getQueryResultFields() { } public String explain() { - if (_queryStageMap.isEmpty()) { - return "EMPTY"; - } - - StringBuilder builder = new StringBuilder(); - explain( - builder, - _queryStageMap.get(0), - "", - ""); - return builder.toString(); - } - - private void explain( - StringBuilder builder, - StageNode root, - String prefix, - String childPrefix - ) { - int stage = root.getStageId(); - - builder - .append(prefix) - .append("[").append(stage).append("] ") - .append(root.explain()); - - if (root instanceof TableScanNode) { - builder.append(' '); - builder.append(_stageMetadataMap.get(root.getStageId()).getServerInstanceToSegmentsMap()); - } - - builder.append('\n'); - - if (root instanceof MailboxReceiveNode) { - int senderStage = ((MailboxReceiveNode) root).getSenderStageId(); - StageNode sender = _queryStageMap.get(senderStage); - explain(builder, sender, childPrefix + "└── ", childPrefix + " "); - } else { - for (Iterator iterator = root.getInputs().iterator(); iterator.hasNext();) { - StageNode input = iterator.next(); - if (iterator.hasNext()) { - explain(builder, input, childPrefix + "├── ", childPrefix + "│ "); - } else { - explain(builder, input, childPrefix + "└── ", childPrefix + " "); - } - } - } + return ExplainPlanStageVisitor.explain(this); } }