diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
new file mode 100644
index 000000000000..e0345590dc98
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/ExplainPlanStageVisitor.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+
+
+/**
+ * A visitor that converts a {@code QueryPlan} into a human-readable string representation.
+ *
+ *
It is currently not used programmatically and cannot be accessed by the user. Instead,
+ * it is intended for use in manual debugging (e.g. setting breakpoints and calling QueryPlan#explain()).
+ */
+public class ExplainPlanStageVisitor implements StageNodeVisitor {
+
+ private final QueryPlan _queryPlan;
+
+ /**
+ * Explains the query plan.
+ *
+ * @see QueryPlan#explain()
+ * @param queryPlan the queryPlan to explain
+ * @return a String representation of the query plan tree
+ */
+ public static String explain(QueryPlan queryPlan) {
+ if (queryPlan.getQueryStageMap().isEmpty()) {
+ return "EMPTY";
+ }
+
+ // the root of a query plan always only has a single node
+ ServerInstance rootServer = queryPlan.getStageMetadataMap().get(0).getServerInstances().get(0);
+ return explainFrom(queryPlan, queryPlan.getQueryStageMap().get(0), rootServer);
+ }
+
+ /**
+ * Explains the query plan from a specific point in the subtree, taking {@code rootServer}
+ * as the node that is executing this sub-tree. This is helpful for debugging what is happening
+ * at a given point in time (for example, printing the tree that will be executed on a
+ * local node right before it is executed).
+ *
+ * @param queryPlan the entire query plan, including non-executed portions
+ * @param node the node to begin traversal
+ * @param rootServer the server instance that is executing this plan (should execute {@code node})
+ *
+ * @return a query plan associated with
+ */
+ public static String explainFrom(QueryPlan queryPlan, StageNode node, ServerInstance rootServer) {
+ final ExplainPlanStageVisitor visitor = new ExplainPlanStageVisitor(queryPlan);
+ return node
+ .visit(visitor, new Context(rootServer, "", "", new StringBuilder()))
+ .toString();
+ }
+
+ private ExplainPlanStageVisitor(QueryPlan queryPlan) {
+ _queryPlan = queryPlan;
+ }
+
+ private StringBuilder appendInfo(StageNode node, Context context) {
+ int stage = node.getStageId();
+ context._builder
+ .append(context._prefix)
+ .append('[')
+ .append(stage)
+ .append("]@")
+ .append(context._host.getHostname())
+ .append(':')
+ .append(context._host.getPort())
+ .append(' ')
+ .append(node.explain());
+ return context._builder;
+ }
+
+ private StringBuilder visitSimpleNode(StageNode node, Context context) {
+ appendInfo(node, context).append('\n');
+ return node.getInputs().get(0).visit(this, context.next(false, context._host));
+ }
+
+ @Override
+ public StringBuilder visitAggregate(AggregateNode node, Context context) {
+ return visitSimpleNode(node, context);
+ }
+
+ @Override
+ public StringBuilder visitFilter(FilterNode node, Context context) {
+ return visitSimpleNode(node, context);
+ }
+
+ @Override
+ public StringBuilder visitJoin(JoinNode node, Context context) {
+ appendInfo(node, context).append('\n');
+ node.getInputs().get(0).visit(this, context.next(true, context._host));
+ node.getInputs().get(1).visit(this, context.next(false, context._host));
+ return context._builder;
+ }
+
+ @Override
+ public StringBuilder visitMailboxReceive(MailboxReceiveNode node, Context context) {
+ appendInfo(node, context).append('\n');
+
+ MailboxSendNode sender = (MailboxSendNode) node.getSender();
+ int senderStageId = node.getSenderStageId();
+ StageMetadata metadata = _queryPlan.getStageMetadataMap().get(senderStageId);
+ Map>> segments = metadata.getServerInstanceToSegmentsMap();
+
+ Iterator iterator = metadata.getServerInstances().iterator();
+ while (iterator.hasNext()) {
+ ServerInstance serverInstance = iterator.next();
+ if (segments.containsKey(serverInstance)) {
+ // always print out leaf stages
+ sender.visit(this, context.next(iterator.hasNext(), serverInstance));
+ } else {
+ if (!iterator.hasNext()) {
+ // always print out the last one
+ sender.visit(this, context.next(false, serverInstance));
+ } else {
+ // only print short version of the sender node
+ appendMailboxSend(sender, context.next(true, serverInstance))
+ .append(" (Subtree Omitted)")
+ .append('\n');
+ }
+ }
+ }
+ return context._builder;
+ }
+
+ @Override
+ public StringBuilder visitMailboxSend(MailboxSendNode node, Context context) {
+ appendMailboxSend(node, context).append('\n');
+ return node.getInputs().get(0).visit(this, context.next(false, context._host));
+ }
+
+ private StringBuilder appendMailboxSend(MailboxSendNode node, Context context) {
+ appendInfo(node, context);
+
+ int receiverStageId = node.getReceiverStageId();
+ List servers = _queryPlan.getStageMetadataMap().get(receiverStageId).getServerInstances();
+ context._builder.append("->");
+ String receivers = servers.stream()
+ .map(s -> s.getHostname() + ':' + s.getPort())
+ .map(s -> "[" + receiverStageId + "]@" + s)
+ .collect(Collectors.joining(",", "{", "}"));
+ return context._builder.append(receivers);
+ }
+
+ @Override
+ public StringBuilder visitProject(ProjectNode node, Context context) {
+ return visitSimpleNode(node, context);
+ }
+
+ @Override
+ public StringBuilder visitSort(SortNode node, Context context) {
+ return visitSimpleNode(node, context);
+ }
+
+ @Override
+ public StringBuilder visitTableScan(TableScanNode node, Context context) {
+ return appendInfo(node, context)
+ .append(' ')
+ .append(_queryPlan.getStageMetadataMap()
+ .get(node.getStageId())
+ .getServerInstanceToSegmentsMap()
+ .get(context._host))
+ .append('\n');
+ }
+
+ @Override
+ public StringBuilder visitValue(ValueNode node, Context context) {
+ return appendInfo(node, context);
+ }
+
+ static class Context {
+ final ServerInstance _host;
+ final String _prefix;
+ final String _childPrefix;
+ final StringBuilder _builder;
+
+ Context(ServerInstance host, String prefix, String childPrefix, StringBuilder builder) {
+ _host = host;
+ _prefix = prefix;
+ _childPrefix = childPrefix;
+ _builder = builder;
+ }
+
+ Context next(boolean hasMoreChildren, ServerInstance host) {
+ return new Context(
+ host,
+ hasMoreChildren ? _childPrefix + "├── " : _childPrefix + "└── ",
+ hasMoreChildren ? _childPrefix + "│ " : _childPrefix + " ",
+ _builder
+ );
+ }
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
index d8770ed3bfe8..651d259979fd 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java
@@ -18,14 +18,11 @@
*/
package org.apache.pinot.query.planner;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.calcite.util.Pair;
import org.apache.pinot.query.planner.logical.LogicalPlanner;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.TableScanNode;
/**
@@ -75,53 +72,16 @@ public List> getQueryResultFields() {
return _queryResultFields;
}
+ /**
+ * Explains the {@code QueryPlan}
+ *
+ * @return a human-readable tree explaining the query plan
+ * @see ExplainPlanStageVisitor#explain(QueryPlan)
+ * @apiNote this is NOT identical to the SQL {@code EXPLAIN PLAN FOR} functionality
+ * and is instead intended to be used by developers debugging during feature
+ * development
+ */
public String explain() {
- if (_queryStageMap.isEmpty()) {
- return "EMPTY";
- }
-
- StringBuilder builder = new StringBuilder();
- explain(
- builder,
- _queryStageMap.get(0),
- "",
- "");
- return builder.toString();
- }
-
- private void explain(
- StringBuilder builder,
- StageNode root,
- String prefix,
- String childPrefix
- ) {
- int stage = root.getStageId();
-
- builder
- .append(prefix)
- .append("[").append(stage).append("] ")
- .append(root.explain());
-
- if (root instanceof TableScanNode) {
- builder.append(' ');
- builder.append(_stageMetadataMap.get(root.getStageId()).getServerInstanceToSegmentsMap());
- }
-
- builder.append('\n');
-
- if (root instanceof MailboxReceiveNode) {
- int senderStage = ((MailboxReceiveNode) root).getSenderStageId();
- StageNode sender = _queryStageMap.get(senderStage);
- explain(builder, sender, childPrefix + "└── ", childPrefix + " ");
- } else {
- for (Iterator iterator = root.getInputs().iterator(); iterator.hasNext();) {
- StageNode input = iterator.next();
- if (iterator.hasNext()) {
- explain(builder, input, childPrefix + "├── ", childPrefix + "│ ");
- } else {
- explain(builder, input, childPrefix + "└── ", childPrefix + " ");
- }
- }
- }
+ return ExplainPlanStageVisitor.explain(this);
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
index fe2531f9b6c2..2f21a64c275f 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/StageMetadata.java
@@ -97,4 +97,11 @@ public TimeBoundaryInfo getTimeBoundaryInfo() {
public void setTimeBoundaryInfo(TimeBoundaryInfo timeBoundaryInfo) {
_timeBoundaryInfo = timeBoundaryInfo;
}
+
+ @Override
+ public String toString() {
+ return "StageMetadata{" + "_scannedTables=" + _scannedTables + ", _serverInstances=" + _serverInstances
+ + ", _serverInstanceToSegmentsMap=" + _serverInstanceToSegmentsMap + ", _timeBoundaryInfo=" + _timeBoundaryInfo
+ + '}';
+ }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
new file mode 100644
index 000000000000..58adfc96d135
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
+import org.apache.pinot.query.planner.partitioning.KeySelector;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+
+
+/**
+ * {@code ShuffleRewriteVisitor} removes unnecessary shuffles from a stage node plan by
+ * inspecting whether all data required by a specific subtree are already colocated.
+ * a single host. It gathers the information recursively by checking which partitioned
+ * data is selected by each node in the tree.
+ *
+ * The only method that should be used externally is {@link #optimizeShuffles(StageNode)},
+ * other public methods are used only by {@link StageNode#visit(StageNodeVisitor, Object)}.
+ */
+public class ShuffleRewriteVisitor implements StageNodeVisitor, Void> {
+
+ /**
+ * This method rewrites {@code root} in place , removing any unnecessary shuffles
+ * by replacing the distribution type with {@link RelDistribution.Type#SINGLETON}.
+ *
+ * @param root the root node of the tree to rewrite
+ */
+ public static void optimizeShuffles(StageNode root) {
+ root.visit(new ShuffleRewriteVisitor(), null);
+ }
+
+ /**
+ * Access to this class should only be used via {@link #optimizeShuffles(StageNode)}
+ */
+ private ShuffleRewriteVisitor() {
+ }
+
+ @Override
+ public Set visitAggregate(AggregateNode node, Void context) {
+ Set oldPartitionKeys = node.getInputs().get(0).visit(this, context);
+
+ // any input reference directly carries over in group set of aggregation
+ // should still be a partition key
+ Set partitionKeys = new HashSet<>();
+ for (int i = 0; i < node.getGroupSet().size(); i++) {
+ RexExpression rex = node.getGroupSet().get(i);
+ if (rex instanceof RexExpression.InputRef) {
+ if (oldPartitionKeys.contains(((RexExpression.InputRef) rex).getIndex())) {
+ partitionKeys.add(i);
+ }
+ }
+ }
+
+ return partitionKeys;
+ }
+
+ @Override
+ public Set visitFilter(FilterNode node, Void context) {
+ // filters don't change partition keys
+ return node.getInputs().get(0).visit(this, context);
+ }
+
+ @Override
+ public Set visitJoin(JoinNode node, Void context) {
+ Set leftPKs = node.getInputs().get(0).visit(this, context);
+ Set rightPks = node.getInputs().get(1).visit(this, context);
+
+ // Currently, JOIN criteria is guaranteed to only have one FieldSelectionKeySelector
+ FieldSelectionKeySelector leftJoinKey = (FieldSelectionKeySelector) node.getJoinKeys().getLeftJoinKeySelector();
+ FieldSelectionKeySelector rightJoinKey = (FieldSelectionKeySelector) node.getJoinKeys().getRightJoinKeySelector();
+
+ int leftDataSchemaSize = node.getInputs().get(0).getDataSchema().size();
+ Set partitionKeys = new HashSet<>();
+ for (int i = 0; i < leftJoinKey.getColumnIndices().size(); i++) {
+ int leftIdx = leftJoinKey.getColumnIndices().get(i);
+ int rightIdx = rightJoinKey.getColumnIndices().get(i);
+ if (leftPKs.contains(leftIdx)) {
+ partitionKeys.add(leftIdx);
+ }
+ if (rightPks.contains(rightIdx)) {
+ // combined schema will have all the left fields before the right fields
+ // so add the leftDataSchemaSize before adding the key
+ partitionKeys.add(leftDataSchemaSize + rightIdx);
+ }
+ }
+
+ return partitionKeys;
+ }
+
+ @Override
+ public Set visitMailboxReceive(MailboxReceiveNode node, Void context) {
+ Set oldPartitionKeys = node.getSender().visit(this, context);
+ KeySelector selector = node.getPartitionKeySelector();
+
+ if (canSkipShuffle(oldPartitionKeys, selector)) {
+ node.setExchangeType(RelDistribution.Type.SINGLETON);
+ return oldPartitionKeys;
+ } else if (selector == null) {
+ return new HashSet<>();
+ } else {
+ return new HashSet<>(((FieldSelectionKeySelector) selector).getColumnIndices());
+ }
+ }
+
+ @Override
+ public Set visitMailboxSend(MailboxSendNode node, Void context) {
+ Set oldPartitionKeys = node.getInputs().get(0).visit(this, context);
+ KeySelector selector = node.getPartitionKeySelector();
+
+ if (canSkipShuffle(oldPartitionKeys, selector)) {
+ node.setExchangeType(RelDistribution.Type.SINGLETON);
+ return oldPartitionKeys;
+ } else {
+ // reset the context partitionKeys since we've determined that
+ // a shuffle is necessary (the MailboxReceiveNode that reads from
+ // this sender will necessarily be the result of a shuffle and
+ // will reset the partition keys based on its selector)
+ return new HashSet<>();
+ }
+ }
+
+ @Override
+ public Set visitProject(ProjectNode node, Void context) {
+ Set oldPartitionKeys = node.getInputs().get(0).visit(this, context);
+
+ // all inputs carry over if they're still in the projection result
+ Set partitionKeys = new HashSet<>();
+ for (int i = 0; i < node.getProjects().size(); i++) {
+ RexExpression rex = node.getProjects().get(i);
+ if (rex instanceof RexExpression.InputRef) {
+ if (oldPartitionKeys.contains(((RexExpression.InputRef) rex).getIndex())) {
+ partitionKeys.add(i);
+ }
+ }
+ }
+
+ return partitionKeys;
+ }
+
+ @Override
+ public Set visitSort(SortNode node, Void context) {
+ // sort doesn't change the partition keys
+ return node.getInputs().get(0).visit(this, context);
+ }
+
+ @Override
+ public Set visitTableScan(TableScanNode node, Void context) {
+ // TODO: add table partition in table config as partition keys - this info is not yet available
+ return new HashSet<>();
+ }
+
+ @Override
+ public Set visitValue(ValueNode node, Void context) {
+ return new HashSet<>();
+ }
+
+ private static boolean canSkipShuffle(Set partitionKeys, KeySelector keySelector) {
+ if (!partitionKeys.isEmpty() && keySelector != null) {
+ Set targetSet = new HashSet<>(((FieldSelectionKeySelector) keySelector).getColumnIndices());
+ return targetSet.containsAll(partitionKeys);
+ }
+ return false;
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java
new file mode 100644
index 000000000000..cf76134c8a88
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.logical;
+
+import java.util.HashMap;
+import java.util.List;
+import org.apache.calcite.util.Pair;
+import org.apache.pinot.query.planner.QueryPlan;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+
+
+/**
+ * {@code StageMetadataVisitor} computes the {@link StageMetadata} for a {@link StageNode}
+ * tree and attaches it in the form of a {@link QueryPlan}.
+ */
+public class StageMetadataVisitor implements StageNodeVisitor {
+
+ public static QueryPlan attachMetadata(List> fields, StageNode root) {
+ QueryPlan queryPlan = new QueryPlan(fields, new HashMap<>(), new HashMap<>());
+ root.visit(new StageMetadataVisitor(), queryPlan);
+ return queryPlan;
+ }
+
+ /**
+ * Usage of this class should only come through {@link #attachMetadata(List, StageNode)}.
+ */
+ private StageMetadataVisitor() {
+ }
+
+ private void visit(StageNode node, QueryPlan queryPlan) {
+ queryPlan
+ .getStageMetadataMap()
+ .computeIfAbsent(node.getStageId(), (id) -> new StageMetadata())
+ .attach(node);
+ }
+
+ @Override
+ public Void visitAggregate(AggregateNode node, QueryPlan context) {
+ node.getInputs().get(0).visit(this, context);
+ visit(node, context);
+ return null;
+ }
+
+ @Override
+ public Void visitFilter(FilterNode node, QueryPlan context) {
+ node.getInputs().get(0).visit(this, context);
+ visit(node, context);
+ return null;
+ }
+
+ @Override
+ public Void visitJoin(JoinNode node, QueryPlan context) {
+ node.getInputs().forEach(join -> join.visit(this, context));
+ visit(node, context);
+ return null;
+ }
+
+ @Override
+ public Void visitMailboxReceive(MailboxReceiveNode node, QueryPlan context) {
+ node.getSender().visit(this, context);
+ visit(node, context);
+
+ // special case for the global mailbox receive node
+ if (node.getStageId() == 0) {
+ context.getQueryStageMap().put(0, node);
+ }
+
+ return null;
+ }
+
+ @Override
+ public Void visitMailboxSend(MailboxSendNode node, QueryPlan context) {
+ node.getInputs().get(0).visit(this, context);
+ visit(node, context);
+
+ context.getQueryStageMap().put(node.getStageId(), node);
+ return null;
+ }
+
+ @Override
+ public Void visitProject(ProjectNode node, QueryPlan context) {
+ node.getInputs().get(0).visit(this, context);
+ visit(node, context);
+ return null;
+ }
+
+ @Override
+ public Void visitSort(SortNode node, QueryPlan context) {
+ node.getInputs().get(0).visit(this, context);
+ visit(node, context);
+ return null;
+ }
+
+ @Override
+ public Void visitTableScan(TableScanNode node, QueryPlan context) {
+ visit(node, context);
+ return null;
+ }
+
+ @Override
+ public Void visitValue(ValueNode node, QueryPlan context) {
+ visit(node, context);
+ return null;
+ }
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
index 8c940294efe0..323e7f506b0e 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java
@@ -18,11 +18,8 @@
*/
package org.apache.pinot.query.planner.logical;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
@@ -32,14 +29,9 @@
import org.apache.pinot.query.planner.StageMetadata;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
import org.apache.pinot.query.planner.partitioning.KeySelector;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.TableScanNode;
import org.apache.pinot.query.routing.WorkerManager;
@@ -51,9 +43,6 @@
public class StagePlanner {
private final PlannerContext _plannerContext;
private final WorkerManager _workerManager;
-
- private Map _queryStageMap;
- private Map _stageMetadataMap;
private int _stageIdCounter;
public StagePlanner(PlannerContext plannerContext, WorkerManager workerManager) {
@@ -69,176 +58,68 @@ public StagePlanner(PlannerContext plannerContext, WorkerManager workerManager)
*/
public QueryPlan makePlan(RelRoot relRoot) {
RelNode relRootNode = relRoot.rel;
- // clear the state
- _queryStageMap = new HashMap<>();
- _stageMetadataMap = new HashMap<>();
// Stage ID starts with 1, 0 will be reserved for ROOT stage.
_stageIdCounter = 1;
// walk the plan and create stages.
StageNode globalStageRoot = walkRelPlan(relRootNode, getNewStageId());
+ ShuffleRewriteVisitor.optimizeShuffles(globalStageRoot);
// global root needs to send results back to the ROOT, a.k.a. the client response node. the last stage only has one
// receiver so doesn't matter what the exchange type is. setting it to SINGLETON by default.
- StageNode globalReceiverNode =
- new MailboxReceiveNode(0, globalStageRoot.getDataSchema(), globalStageRoot.getStageId(),
- RelDistribution.Type.RANDOM_DISTRIBUTED, null);
StageNode globalSenderNode = new MailboxSendNode(globalStageRoot.getStageId(), globalStageRoot.getDataSchema(),
- globalReceiverNode.getStageId(), RelDistribution.Type.RANDOM_DISTRIBUTED, null);
+ 0, RelDistribution.Type.RANDOM_DISTRIBUTED, null);
globalSenderNode.addInput(globalStageRoot);
- _queryStageMap.put(globalSenderNode.getStageId(), globalSenderNode);
- StageMetadata stageMetadata = _stageMetadataMap.get(globalSenderNode.getStageId());
- stageMetadata.attach(globalSenderNode);
- _queryStageMap.put(globalReceiverNode.getStageId(), globalReceiverNode);
- StageMetadata globalReceivingStageMetadata = new StageMetadata();
- globalReceivingStageMetadata.attach(globalReceiverNode);
- _stageMetadataMap.put(globalReceiverNode.getStageId(), globalReceivingStageMetadata);
+ StageNode globalReceiverNode =
+ new MailboxReceiveNode(0, globalStageRoot.getDataSchema(), globalStageRoot.getStageId(),
+ RelDistribution.Type.RANDOM_DISTRIBUTED, null, globalSenderNode);
+
+ QueryPlan queryPlan = StageMetadataVisitor.attachMetadata(relRoot.fields, globalReceiverNode);
// assign workers to each stage.
- for (Map.Entry e : _stageMetadataMap.entrySet()) {
+ for (Map.Entry e : queryPlan.getStageMetadataMap().entrySet()) {
_workerManager.assignWorkerToStage(e.getKey(), e.getValue());
}
- return new QueryPlan(relRoot.fields, _queryStageMap, _stageMetadataMap);
+ return queryPlan;
}
// non-threadsafe
// TODO: add dataSchema (extracted from RelNode schema) to the StageNode.
private StageNode walkRelPlan(RelNode node, int currentStageId) {
if (isExchangeNode(node)) {
- // 1. exchangeNode always have only one input, get its input converted as a new stage root.
StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
RelDistribution distribution = ((LogicalExchange) node).getDistribution();
- List distributionKeys = distribution.getKeys();
- RelDistribution.Type exchangeType = distribution.getType();
-
- // 2. make an exchange sender and receiver node pair
- // only HASH_DISTRIBUTED requires a partition key selector; so all other types (SINGLETON and BROADCAST)
- // of exchange will not carry a partition key selector.
- KeySelector keySelector = exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
- ? new FieldSelectionKeySelector(distributionKeys) : null;
-
- StageNode mailboxReceiver;
- StageNode mailboxSender;
- if (canSkipShuffle(nextStageRoot, keySelector)) {
- // Use SINGLETON exchange type indicates a LOCAL-to-LOCAL data transfer between execution threads.
- // TODO: actually implement the SINGLETON exchange without going through the over-the-wire GRPC mailbox
- // sender and receiver.
- mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
- nextStageRoot.getStageId(), RelDistribution.Type.SINGLETON, keySelector);
- mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
- mailboxReceiver.getStageId(), RelDistribution.Type.SINGLETON, keySelector);
- } else {
- mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
- nextStageRoot.getStageId(), exchangeType, keySelector);
- mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
- mailboxReceiver.getStageId(), exchangeType, keySelector);
- }
- mailboxSender.addInput(nextStageRoot);
-
- // 3. put the sender side as a completed stage.
- _queryStageMap.put(mailboxSender.getStageId(), mailboxSender);
-
- // 4. update stage metadata.
- updateStageMetadata(mailboxSender.getStageId(), mailboxSender, _stageMetadataMap);
- updateStageMetadata(mailboxReceiver.getStageId(), mailboxReceiver, _stageMetadataMap);
-
- // 5. return the receiver, this is considered as a "virtual table scan" node for its parent.
- return mailboxReceiver;
+ return createSendReceivePair(nextStageRoot, distribution, currentStageId);
} else {
StageNode stageNode = RelToStageConverter.toStageNode(node, currentStageId);
List inputs = node.getInputs();
for (RelNode input : inputs) {
stageNode.addInput(walkRelPlan(input, currentStageId));
}
- updateStageMetadata(currentStageId, stageNode, _stageMetadataMap);
return stageNode;
}
}
- private boolean canSkipShuffle(StageNode stageNode, KeySelector keySelector) {
- Set originSet = stageNode.getPartitionKeys();
- if (!originSet.isEmpty() && keySelector != null) {
- Set targetSet = new HashSet<>(((FieldSelectionKeySelector) keySelector).getColumnIndices());
- return targetSet.containsAll(originSet);
- }
- return false;
- }
- private static void updateStageMetadata(int stageId, StageNode node, Map stageMetadataMap) {
- updatePartitionKeys(node);
- StageMetadata stageMetadata = stageMetadataMap.computeIfAbsent(stageId, (id) -> new StageMetadata());
- stageMetadata.attach(node);
- }
+ private StageNode createSendReceivePair(StageNode nextStageRoot, RelDistribution distribution, int currentStageId) {
+ List distributionKeys = distribution.getKeys();
+ RelDistribution.Type exchangeType = distribution.getType();
- private static void updatePartitionKeys(StageNode node) {
- if (node instanceof ProjectNode) {
- // any input reference directly carry over should still be a partition key.
- Set previousPartitionKeys = node.getInputs().get(0).getPartitionKeys();
- Set newPartitionKeys = new HashSet<>();
- ProjectNode projectNode = (ProjectNode) node;
- for (int i = 0; i < projectNode.getProjects().size(); i++) {
- RexExpression rexExpression = projectNode.getProjects().get(i);
- if (rexExpression instanceof RexExpression.InputRef
- && previousPartitionKeys.contains(((RexExpression.InputRef) rexExpression).getIndex())) {
- newPartitionKeys.add(i);
- }
- }
- projectNode.setPartitionKeys(newPartitionKeys);
- } else if (node instanceof FilterNode) {
- // filter node doesn't change partition keys.
- node.setPartitionKeys(node.getInputs().get(0).getPartitionKeys());
- } else if (node instanceof AggregateNode) {
- // any input reference directly carry over in group set of aggregation should still be a partition key.
- Set previousPartitionKeys = node.getInputs().get(0).getPartitionKeys();
- Set newPartitionKeys = new HashSet<>();
- AggregateNode aggregateNode = (AggregateNode) node;
- for (int i = 0; i < aggregateNode.getGroupSet().size(); i++) {
- RexExpression rexExpression = aggregateNode.getGroupSet().get(i);
- if (rexExpression instanceof RexExpression.InputRef
- && previousPartitionKeys.contains(((RexExpression.InputRef) rexExpression).getIndex())) {
- newPartitionKeys.add(i);
- }
- }
- aggregateNode.setPartitionKeys(newPartitionKeys);
- } else if (node instanceof JoinNode) {
- int leftDataSchemaSize = node.getInputs().get(0).getDataSchema().size();
- Set leftPartitionKeys = node.getInputs().get(0).getPartitionKeys();
- Set rightPartitionKeys = node.getInputs().get(1).getPartitionKeys();
- // Currently, JOIN criteria guarantee to only have one FieldSelectionKeySelector.
- FieldSelectionKeySelector leftJoinKeySelector =
- (FieldSelectionKeySelector) ((JoinNode) node).getJoinKeys().getLeftJoinKeySelector();
- FieldSelectionKeySelector rightJoinKeySelector =
- (FieldSelectionKeySelector) ((JoinNode) node).getJoinKeys().getRightJoinKeySelector();
- Set newPartitionKeys = new HashSet<>();
- for (int i = 0; i < leftJoinKeySelector.getColumnIndices().size(); i++) {
- int leftIndex = leftJoinKeySelector.getColumnIndices().get(i);
- int rightIndex = rightJoinKeySelector.getColumnIndices().get(i);
- if (leftPartitionKeys.contains(leftIndex)) {
- newPartitionKeys.add(leftIndex);
- }
- if (rightPartitionKeys.contains(rightIndex)) {
- newPartitionKeys.add(leftDataSchemaSize + rightIndex);
- }
- }
- node.setPartitionKeys(newPartitionKeys);
- } else if (node instanceof TableScanNode) {
- // TODO: add table partition in table config as partition keys. we dont have that information yet.
- } else if (node instanceof MailboxReceiveNode) {
- // hash distribution key is partition key.
- FieldSelectionKeySelector keySelector = (FieldSelectionKeySelector)
- ((MailboxReceiveNode) node).getPartitionKeySelector();
- if (keySelector != null) {
- node.setPartitionKeys(new HashSet<>(keySelector.getColumnIndices()));
- }
- } else if (node instanceof MailboxSendNode) {
- FieldSelectionKeySelector keySelector = (FieldSelectionKeySelector)
- ((MailboxSendNode) node).getPartitionKeySelector();
- if (keySelector != null) {
- node.setPartitionKeys(new HashSet<>(keySelector.getColumnIndices()));
- }
- }
+ // make an exchange sender and receiver node pair
+ // only HASH_DISTRIBUTED requires a partition key selector; so all other types (SINGLETON and BROADCAST)
+ // of exchange will not carry a partition key selector.
+ KeySelector keySelector = exchangeType == RelDistribution.Type.HASH_DISTRIBUTED
+ ? new FieldSelectionKeySelector(distributionKeys) : null;
+
+ StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
+ currentStageId, exchangeType, keySelector);
+ StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
+ nextStageRoot.getStageId(), exchangeType, keySelector, mailboxSender);
+ mailboxSender.addInput(nextStageRoot);
+
+ return mailboxReceiver;
}
private boolean isExchangeNode(RelNode node) {
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
index 594c6d7e382f..46de8731b76b 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AbstractStageNode.java
@@ -19,10 +19,7 @@
package org.apache.pinot.query.planner.stage;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.serde.ProtoSerializable;
@@ -34,7 +31,6 @@ public abstract class AbstractStageNode implements StageNode, ProtoSerializable
protected final int _stageId;
protected final List _inputs;
protected DataSchema _dataSchema;
- protected Set _partitionedKeys;
public AbstractStageNode(int stageId) {
this(stageId, null);
@@ -44,7 +40,6 @@ public AbstractStageNode(int stageId, DataSchema dataSchema) {
_stageId = stageId;
_dataSchema = dataSchema;
_inputs = new ArrayList<>();
- _partitionedKeys = new HashSet<>();
}
@Override
@@ -72,16 +67,6 @@ public void setDataSchema(DataSchema dataSchema) {
_dataSchema = dataSchema;
}
- @Override
- public Set getPartitionKeys() {
- return _partitionedKeys;
- }
-
- @Override
- public void setPartitionKeys(Collection partitionedKeys) {
- _partitionedKeys.addAll(partitionedKeys);
- }
-
@Override
public void fromObjectField(Plan.ObjectField objectField) {
ProtoSerializationUtils.setObjectFieldToObject(this, objectField);
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
index 70174df26582..ea8dc2c1c10e 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/AggregateNode.java
@@ -59,4 +59,9 @@ public List getGroupSet() {
public String explain() {
return "AGGREGATE";
}
+
+ @Override
+ public T visit(StageNodeVisitor visitor, C context) {
+ return visitor.visitAggregate(this, context);
+ }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java
index 0d960e951a4c..52ed004da16d 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/FilterNode.java
@@ -45,4 +45,9 @@ public RexExpression getCondition() {
public String explain() {
return "FILTER";
}
+
+ @Override
+ public T visit(StageNodeVisitor visitor, C context) {
+ return visitor.visitFilter(this, context);
+ }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
index b34d74d57471..af9b4e03ed1f 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/JoinNode.java
@@ -64,6 +64,11 @@ public String explain() {
return "JOIN";
}
+ @Override
+ public T visit(StageNodeVisitor visitor, C context) {
+ return visitor.visitJoin(this, context);
+ }
+
public static class JoinKeys {
@ProtoProperties
private KeySelector _leftJoinKeySelector;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
index e358c2cb13f5..cf90a0005cfd 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java
@@ -33,22 +33,32 @@ public class MailboxReceiveNode extends AbstractStageNode {
@ProtoProperties
private KeySelector _partitionKeySelector;
+ // this is only available during planning and should not be relied
+ // on in any post-serialization code
+ private transient StageNode _sender;
+
public MailboxReceiveNode(int stageId) {
super(stageId);
}
public MailboxReceiveNode(int stageId, DataSchema dataSchema, int senderStageId,
- RelDistribution.Type exchangeType, @Nullable KeySelector partitionKeySelector) {
+ RelDistribution.Type exchangeType, @Nullable KeySelector partitionKeySelector,
+ StageNode sender) {
super(stageId, dataSchema);
_senderStageId = senderStageId;
_exchangeType = exchangeType;
_partitionKeySelector = partitionKeySelector;
+ _sender = sender;
}
public int getSenderStageId() {
return _senderStageId;
}
+ public void setExchangeType(RelDistribution.Type exchangeType) {
+ _exchangeType = exchangeType;
+ }
+
public RelDistribution.Type getExchangeType() {
return _exchangeType;
}
@@ -57,8 +67,17 @@ public KeySelector getPartitionKeySelector() {
return _partitionKeySelector;
}
+ public StageNode getSender() {
+ return _sender;
+ }
+
@Override
public String explain() {
- return "MAIL_RECEIVE";
+ return "MAIL_RECEIVE(" + _exchangeType + ")";
+ }
+
+ @Override
+ public T visit(StageNodeVisitor visitor, C context) {
+ return visitor.visitMailboxReceive(this, context);
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
index 05459b563449..4219590100ad 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxSendNode.java
@@ -49,6 +49,10 @@ public int getReceiverStageId() {
return _receiverStageId;
}
+ public void setExchangeType(RelDistribution.Type exchangeType) {
+ _exchangeType = exchangeType;
+ }
+
public RelDistribution.Type getExchangeType() {
return _exchangeType;
}
@@ -59,6 +63,11 @@ public KeySelector getPartitionKeySelector() {
@Override
public String explain() {
- return "MAIL_SEND";
+ return "MAIL_SEND(" + _exchangeType + ")";
+ }
+
+ @Override
+ public T visit(StageNodeVisitor visitor, C context) {
+ return visitor.visitMailboxSend(this, context);
}
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java
index 250a38885f6b..8371dda609f8 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ProjectNode.java
@@ -46,4 +46,9 @@ public List getProjects() {
public String explain() {
return "PROJECT";
}
+
+ @Override
+ public T visit(StageNodeVisitor visitor, C context) {
+ return visitor.visitProject(this, context);
+ }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
index 4df23e7325b3..38b2da6c5617 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/SortNode.java
@@ -72,4 +72,9 @@ public int getOffset() {
public String explain() {
return "SORT" + (_fetch > 0 ? " (LIMIT " + _fetch + ")" : "");
}
+
+ @Override
+ public T visit(StageNodeVisitor visitor, C context) {
+ return visitor.visitSort(this, context);
+ }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
index 8f170a065c7d..7e3278cfe872 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNode.java
@@ -19,9 +19,7 @@
package org.apache.pinot.query.planner.stage;
import java.io.Serializable;
-import java.util.Collection;
import java.util.List;
-import java.util.Set;
import org.apache.pinot.common.utils.DataSchema;
@@ -45,9 +43,7 @@ public interface StageNode extends Serializable {
void setDataSchema(DataSchema dataSchema);
- Set getPartitionKeys();
-
- void setPartitionKeys(Collection partitionKeys);
-
String explain();
+
+ T visit(StageNodeVisitor visitor, C context);
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
new file mode 100644
index 000000000000..614dbb877add
--- /dev/null
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/StageNodeVisitor.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.stage;
+
+import org.apache.pinot.query.planner.QueryPlan;
+
+
+/**
+ * {@code StageNodeVisitor} is a skeleton class that allows for implementations of {@code StageNode}
+ * tree traversals using the {@link StageNode#visit(StageNodeVisitor, Object)} method. There is no
+ * enforced traversal order, and should be implemented by subclasses.
+ *
+ * It is recommended that implementors use private constructors and static methods to access main
+ * functionality (see {@link org.apache.pinot.query.planner.ExplainPlanStageVisitor#explain(QueryPlan)}
+ * as an example of a usage of this pattern.
+ *
+ * @param the return type for all visits
+ * @param a Context that will be passed as the second parameter to {@code StageNode#visit},
+ * implementors can decide how they want to use this context (e.g. whether or not
+ * it can be modified in place or whether it's an immutable context)
+ */
+public interface StageNodeVisitor {
+
+ T visitAggregate(AggregateNode node, C context);
+
+ T visitFilter(FilterNode node, C context);
+
+ T visitJoin(JoinNode node, C context);
+
+ T visitMailboxReceive(MailboxReceiveNode node, C context);
+
+ T visitMailboxSend(MailboxSendNode node, C context);
+
+ T visitProject(ProjectNode node, C context);
+
+ T visitSort(SortNode node, C context);
+
+ T visitTableScan(TableScanNode node, C context);
+
+ T visitValue(ValueNode node, C context);
+}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java
index bcd8493cc70b..7711dd623506 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/TableScanNode.java
@@ -51,4 +51,9 @@ public List getTableScanColumns() {
public String explain() {
return "TABLE SCAN (" + _tableName + ")";
}
+
+ @Override
+ public T visit(StageNodeVisitor visitor, C context) {
+ return visitor.visitTableScan(this, context);
+ }
}
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
index 3918338d19f9..b3ad0d40f613 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/ValueNode.java
@@ -56,4 +56,9 @@ public List> getLiteralRows() {
public String explain() {
return "LITERAL";
}
+
+ @Override
+ public T visit(StageNodeVisitor visitor, C context) {
+ return visitor.visitValue(this, context);
+ }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
index c4d81fd76a4b..bdc36a757173 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcMailboxService.java
@@ -99,4 +99,9 @@ public ReceivingMailbox getReceivingMailbox(String mailboxId) {
public ManagedChannel getChannel(String mailboxId) {
return _channelManager.getChannel(Utils.constructChannelId(mailboxId));
}
+
+ @Override
+ public String toString() {
+ return "GrpcMailboxService{" + "_hostname='" + _hostname + '\'' + ", _mailboxPort=" + _mailboxPort + '}';
+ }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
index 9b457644d0d3..46dd9dc96737 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java
@@ -123,7 +123,7 @@ public BaseDataBlock getDataBlock() {
throw new UnsupportedOperationException("Unable to build from container with type: " + _type);
}
} catch (Exception e) {
- throw new RuntimeException("Unable to create DataBlock");
+ throw new RuntimeException("Unable to create DataBlock", e);
}
}
return _dataBlock;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
new file mode 100644
index 000000000000..b4cc73d5ac50
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.executor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.common.proto.Mailbox;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.planner.StageMetadata;
+import org.apache.pinot.query.planner.stage.AggregateNode;
+import org.apache.pinot.query.planner.stage.FilterNode;
+import org.apache.pinot.query.planner.stage.JoinNode;
+import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
+import org.apache.pinot.query.planner.stage.MailboxSendNode;
+import org.apache.pinot.query.planner.stage.ProjectNode;
+import org.apache.pinot.query.planner.stage.SortNode;
+import org.apache.pinot.query.planner.stage.StageNode;
+import org.apache.pinot.query.planner.stage.StageNodeVisitor;
+import org.apache.pinot.query.planner.stage.TableScanNode;
+import org.apache.pinot.query.planner.stage.ValueNode;
+import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.AggregateOperator;
+import org.apache.pinot.query.runtime.operator.FilterOperator;
+import org.apache.pinot.query.runtime.operator.HashJoinOperator;
+import org.apache.pinot.query.runtime.operator.LiteralValueOperator;
+import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.SortOperator;
+import org.apache.pinot.query.runtime.operator.TransformOperator;
+import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
+
+
+/**
+ * This visitor constructs a physical plan of operators from a {@link StageNode} tree. Note that
+ * this works only for the intermediate stage nodes, leaf stage nodes are expected to compile into
+ * v1 operators at this point in time.
+ *
+ * This class should be used statically via {@link #build(MailboxService, String, int, long, Map, StageNode)}
+ *
+ * @see org.apache.pinot.query.runtime.QueryRunner#processQuery(DistributedStagePlan, ExecutorService, Map)
+ */
+public class PhysicalPlanVisitor implements StageNodeVisitor, Void> {
+
+ private final MailboxService _mailboxService;
+ private final String _hostName;
+ private final int _port;
+ private final long _requestId;
+ private final Map _metadataMap;
+
+ public static Operator build(MailboxService mailboxService,
+ String hostName, int port, long requestId, Map metadataMap, StageNode node) {
+ return node.visit(new PhysicalPlanVisitor(mailboxService, hostName, port, requestId, metadataMap), null);
+ }
+
+ private PhysicalPlanVisitor(MailboxService mailboxService, String hostName, int port,
+ long requestId, Map metadataMap) {
+ _mailboxService = mailboxService;
+ _hostName = hostName;
+ _port = port;
+ _requestId = requestId;
+ _metadataMap = metadataMap;
+ }
+
+ @Override
+ public Operator visitMailboxReceive(MailboxReceiveNode node, Void context) {
+ List sendingInstances = _metadataMap.get(node.getSenderStageId()).getServerInstances();
+ return new MailboxReceiveOperator(_mailboxService, node.getDataSchema(), sendingInstances,
+ node.getExchangeType(), node.getPartitionKeySelector(), _hostName, _port, _requestId,
+ node.getSenderStageId());
+ }
+
+ @Override
+ public Operator visitMailboxSend(MailboxSendNode node, Void context) {
+ Operator nextOperator = node.getInputs().get(0).visit(this, null);
+ StageMetadata receivingStageMetadata = _metadataMap.get(node.getReceiverStageId());
+ return new MailboxSendOperator(_mailboxService, node.getDataSchema(), nextOperator,
+ receivingStageMetadata.getServerInstances(), node.getExchangeType(), node.getPartitionKeySelector(),
+ _hostName, _port, _requestId, node.getStageId());
+ }
+
+ @Override
+ public Operator visitAggregate(AggregateNode node, Void context) {
+ Operator nextOperator = node.getInputs().get(0).visit(this, null);
+ return new AggregateOperator(nextOperator, node.getDataSchema(), node.getAggCalls(),
+ node.getGroupSet(), node.getInputs().get(0).getDataSchema());
+ }
+
+ @Override
+ public Operator visitFilter(FilterNode node, Void context) {
+ Operator nextOperator = node.getInputs().get(0).visit(this, null);
+ return new FilterOperator(nextOperator, node.getDataSchema(), node.getCondition());
+ }
+
+ @Override
+ public Operator visitJoin(JoinNode node, Void context) {
+ StageNode left = node.getInputs().get(0);
+ StageNode right = node.getInputs().get(1);
+
+ Operator leftOperator = left.visit(this, null);
+ Operator rightOperator = right.visit(this, null);
+
+ return new HashJoinOperator(leftOperator, left.getDataSchema(), rightOperator,
+ right.getDataSchema(), node.getDataSchema(), node.getJoinKeys(),
+ node.getJoinClauses(), node.getJoinRelType());
+ }
+
+ @Override
+ public Operator visitProject(ProjectNode node, Void context) {
+ Operator nextOperator = node.getInputs().get(0).visit(this, null);
+ return new TransformOperator(nextOperator, node.getDataSchema(), node.getProjects(),
+ node.getInputs().get(0).getDataSchema());
+ }
+
+ @Override
+ public Operator visitSort(SortNode node, Void context) {
+ Operator nextOperator = node.getInputs().get(0).visit(this, null);
+ return new SortOperator(nextOperator, node.getCollationKeys(), node.getCollationDirections(),
+ node.getFetch(), node.getOffset(), node.getDataSchema());
+ }
+
+ @Override
+ public Operator visitTableScan(TableScanNode node, Void context) {
+ throw new UnsupportedOperationException("Stage node of type TableScanNode is not supported!");
+ }
+
+ @Override
+ public Operator visitValue(ValueNode node, Void context) {
+ return new LiteralValueOperator(node.getDataSchema(), node.getLiteralRows());
+ }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
index 5f0ed67d081d..5fb7d05d69fd 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java
@@ -18,36 +18,17 @@
*/
package org.apache.pinot.query.runtime.executor;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.request.context.ThreadTimer;
-import org.apache.pinot.core.operator.BaseOperator;
-import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.util.trace.TraceRunnable;
import org.apache.pinot.query.mailbox.MailboxService;
-import org.apache.pinot.query.planner.StageMetadata;
-import org.apache.pinot.query.planner.stage.AggregateNode;
-import org.apache.pinot.query.planner.stage.FilterNode;
-import org.apache.pinot.query.planner.stage.JoinNode;
-import org.apache.pinot.query.planner.stage.MailboxReceiveNode;
-import org.apache.pinot.query.planner.stage.MailboxSendNode;
-import org.apache.pinot.query.planner.stage.ProjectNode;
-import org.apache.pinot.query.planner.stage.SortNode;
import org.apache.pinot.query.planner.stage.StageNode;
-import org.apache.pinot.query.planner.stage.ValueNode;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
-import org.apache.pinot.query.runtime.operator.AggregateOperator;
-import org.apache.pinot.query.runtime.operator.FilterOperator;
-import org.apache.pinot.query.runtime.operator.HashJoinOperator;
-import org.apache.pinot.query.runtime.operator.LiteralValueOperator;
-import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator;
-import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
-import org.apache.pinot.query.runtime.operator.SortOperator;
-import org.apache.pinot.query.runtime.operator.TransformOperator;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
@@ -77,6 +58,7 @@ public void init(PinotConfiguration config, ServerMetrics serverMetrics,
_port = port;
}
+
public synchronized void start() {
LOGGER.info("Worker query executor started");
}
@@ -85,12 +67,14 @@ public synchronized void shutDown() {
LOGGER.info("Worker query executor shut down");
}
- // TODO: split this execution from PhysicalPlanner
public void processQuery(DistributedStagePlan queryRequest, Map requestMetadataMap,
ExecutorService executorService) {
long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID"));
StageNode stageRoot = queryRequest.getStageRoot();
- BaseOperator rootOperator = getOperator(requestId, stageRoot, queryRequest.getMetadataMap());
+
+ Operator rootOperator = PhysicalPlanVisitor.build(
+ _mailboxService, _hostName, _port, requestId, queryRequest.getMetadataMap(), stageRoot);
+
executorService.submit(new TraceRunnable() {
@Override
public void runJob() {
@@ -102,55 +86,4 @@ public void runJob() {
}
});
}
-
- // TODO: split this PhysicalPlanner into a separate module
- // TODO: optimize this into a framework. (physical planner)
- private BaseOperator getOperator(long requestId, StageNode stageNode,
- Map metadataMap) {
- if (stageNode instanceof MailboxReceiveNode) {
- MailboxReceiveNode receiveNode = (MailboxReceiveNode) stageNode;
- List sendingInstances = metadataMap.get(receiveNode.getSenderStageId()).getServerInstances();
- return new MailboxReceiveOperator(_mailboxService, receiveNode.getDataSchema(), sendingInstances,
- receiveNode.getExchangeType(), receiveNode.getPartitionKeySelector(), _hostName, _port, requestId,
- receiveNode.getSenderStageId());
- } else if (stageNode instanceof MailboxSendNode) {
- MailboxSendNode sendNode = (MailboxSendNode) stageNode;
- BaseOperator nextOperator = getOperator(requestId, sendNode.getInputs().get(0), metadataMap);
- StageMetadata receivingStageMetadata = metadataMap.get(sendNode.getReceiverStageId());
- return new MailboxSendOperator(_mailboxService, sendNode.getDataSchema(), nextOperator,
- receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(), sendNode.getPartitionKeySelector(),
- _hostName, _port, requestId, sendNode.getStageId());
- } else if (stageNode instanceof JoinNode) {
- JoinNode joinNode = (JoinNode) stageNode;
- BaseOperator leftOperator = getOperator(requestId, joinNode.getInputs().get(0), metadataMap);
- BaseOperator rightOperator = getOperator(requestId, joinNode.getInputs().get(1), metadataMap);
- return new HashJoinOperator(leftOperator, joinNode.getInputs().get(0).getDataSchema(), rightOperator,
- joinNode.getInputs().get(1).getDataSchema(), joinNode.getDataSchema(), joinNode.getJoinKeys(),
- joinNode.getJoinClauses(), joinNode.getJoinRelType());
- } else if (stageNode instanceof AggregateNode) {
- AggregateNode aggregateNode = (AggregateNode) stageNode;
- BaseOperator inputOperator =
- getOperator(requestId, aggregateNode.getInputs().get(0), metadataMap);
- return new AggregateOperator(inputOperator, aggregateNode.getDataSchema(), aggregateNode.getAggCalls(),
- aggregateNode.getGroupSet(), aggregateNode.getInputs().get(0).getDataSchema());
- } else if (stageNode instanceof FilterNode) {
- FilterNode filterNode = (FilterNode) stageNode;
- return new FilterOperator(getOperator(requestId, filterNode.getInputs().get(0), metadataMap),
- filterNode.getDataSchema(), filterNode.getCondition());
- } else if (stageNode instanceof ProjectNode) {
- ProjectNode projectNode = (ProjectNode) stageNode;
- return new TransformOperator(getOperator(requestId, projectNode.getInputs().get(0), metadataMap),
- projectNode.getDataSchema(), projectNode.getProjects(), projectNode.getInputs().get(0).getDataSchema());
- } else if (stageNode instanceof SortNode) {
- SortNode sortNode = (SortNode) stageNode;
- return new SortOperator(getOperator(requestId, sortNode.getInputs().get(0), metadataMap),
- sortNode.getCollationKeys(), sortNode.getCollationDirections(), sortNode.getFetch(), sortNode.getOffset(),
- sortNode.getDataSchema());
- } else if (stageNode instanceof ValueNode) {
- return new LiteralValueOperator(stageNode.getDataSchema(), ((ValueNode) stageNode).getLiteralRows());
- } else {
- throw new UnsupportedOperationException(
- String.format("Stage node type %s is not supported!", stageNode.getClass().getSimpleName()));
- }
- }
}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index 6747b07ef419..0c261d26e336 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -48,7 +48,7 @@
public class AggregateOperator extends BaseOperator {
private static final String EXPLAIN_NAME = "AGGREGATE_OPERATOR";
- private BaseOperator _inputOperator;
+ private Operator _inputOperator;
private List _aggCalls;
private List _groupSet;
@@ -64,7 +64,7 @@ public class AggregateOperator extends BaseOperator {
private boolean _isCumulativeBlockConstructed;
// TODO: refactor Pinot Reducer code to support the intermediate stage agg operator.
- public AggregateOperator(BaseOperator inputOperator, DataSchema dataSchema,
+ public AggregateOperator(Operator inputOperator, DataSchema dataSchema,
List aggCalls, List groupSet, DataSchema upstreamDataSchema) {
_inputOperator = inputOperator;
_aggCalls = aggCalls;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
index f1ab55061e58..b3aa17ac56b6 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java
@@ -34,12 +34,12 @@
public class FilterOperator extends BaseOperator {
private static final String EXPLAIN_NAME = "FILTER";
- private final BaseOperator _upstreamOperator;
+ private final Operator _upstreamOperator;
private final FilterOperand _filterOperand;
private final DataSchema _dataSchema;
private TransferableBlock _upstreamErrorBlock;
- public FilterOperator(BaseOperator upstreamOperator, DataSchema dataSchema, RexExpression filter) {
+ public FilterOperator(Operator upstreamOperator, DataSchema dataSchema, RexExpression filter) {
_upstreamOperator = upstreamOperator;
_dataSchema = dataSchema;
_filterOperand = FilterOperand.toFilterOperand(filter, dataSchema);
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index cb43ff09c09d..bcf6807dd918 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -49,8 +49,8 @@ public class HashJoinOperator extends BaseOperator {
private static final String EXPLAIN_NAME = "BROADCAST_JOIN";
private final HashMap> _broadcastHashTable;
- private final BaseOperator _leftTableOperator;
- private final BaseOperator _rightTableOperator;
+ private final Operator _leftTableOperator;
+ private final Operator _rightTableOperator;
private final JoinRelType _joinType;
private final DataSchema _resultSchema;
private final DataSchema _leftTableSchema;
@@ -62,8 +62,8 @@ public class HashJoinOperator extends BaseOperator {
private KeySelector _leftKeySelector;
private KeySelector _rightKeySelector;
- public HashJoinOperator(BaseOperator leftTableOperator, DataSchema leftSchema,
- BaseOperator rightTableOperator, DataSchema rightSchema, DataSchema outputSchema,
+ public HashJoinOperator(Operator leftTableOperator, DataSchema leftSchema,
+ Operator rightTableOperator, DataSchema rightSchema, DataSchema outputSchema,
JoinNode.JoinKeys joinKeys, List joinClauses, JoinRelType joinType) {
_leftKeySelector = joinKeys.getLeftJoinKeySelector();
_rightKeySelector = joinKeys.getRightJoinKeySelector();
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
index ad05f207ef51..8169fc912186 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java
@@ -79,6 +79,13 @@ public MailboxReceiveOperator(MailboxService mailboxServ
singletonInstance = serverInstance;
}
}
+
+ // FIXME: there's a bug where singletonInstance may be null in the case of a JOIN where
+ // one side is BROADCAST and the other is SINGLETON (this is the case with nested loop
+ // joins for inequality conditions). This causes NPEs in the logs, but actually works
+ // because the side that hits the NPE doesn't expect to get any data anyway (that's the
+ // side that gets the broadcast from one side but nothing from the SINGLETON)
+ // FIXME: https://github.com/apache/pinot/issues/9592
_sendingStageInstances = Collections.singletonList(singletonInstance);
} else {
_sendingStageInstances = sendingStageInstances;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index ae4cab02f2ba..3e358ccc2cd3 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -70,10 +70,10 @@ public class MailboxSendOperator extends BaseOperator {
private final int _stageId;
private final MailboxService _mailboxService;
private final DataSchema _dataSchema;
- private BaseOperator _dataTableBlockBaseOperator;
+ private Operator _dataTableBlockBaseOperator;
public MailboxSendOperator(MailboxService mailboxService, DataSchema dataSchema,
- BaseOperator dataTableBlockBaseOperator, List receivingStageInstances,
+ Operator dataTableBlockBaseOperator, List receivingStageInstances,
RelDistribution.Type exchangeType, KeySelector keySelector, String hostName, int port,
long jobId, int stageId) {
_dataSchema = dataSchema;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
index b3741cb28f2a..1acb0c9a6912 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java
@@ -37,7 +37,7 @@
public class SortOperator extends BaseOperator {
private static final String EXPLAIN_NAME = "SORT";
- private final BaseOperator _upstreamOperator;
+ private final Operator _upstreamOperator;
private final int _fetch;
private final int _offset;
private final DataSchema _dataSchema;
@@ -47,7 +47,7 @@ public class SortOperator extends BaseOperator {
private boolean _isSortedBlockConstructed;
private TransferableBlock _upstreamErrorBlock;
- public SortOperator(BaseOperator upstreamOperator, List collationKeys,
+ public SortOperator(Operator upstreamOperator, List collationKeys,
List collationDirections, int fetch, int offset, DataSchema dataSchema) {
_upstreamOperator = upstreamOperator;
_fetch = fetch;
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
index 0c947e1ca673..90efc377abc5 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java
@@ -37,13 +37,13 @@
*/
public class TransformOperator extends BaseOperator {
private static final String EXPLAIN_NAME = "TRANSFORM";
- private final BaseOperator _upstreamOperator;
+ private final Operator _upstreamOperator;
private final List _transformOperandsList;
private final int _resultColumnSize;
private final DataSchema _resultSchema;
private TransferableBlock _upstreamErrorBlock;
- public TransformOperator(BaseOperator upstreamOperator, DataSchema dataSchema,
+ public TransformOperator(Operator upstreamOperator, DataSchema dataSchema,
List transforms, DataSchema upstreamDataSchema) {
_upstreamOperator = upstreamOperator;
_resultColumnSize = transforms.size();