diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java index 166278a30e2f..651d259979fd 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/QueryPlan.java @@ -72,6 +72,15 @@ public List> getQueryResultFields() { return _queryResultFields; } + /** + * Explains the {@code QueryPlan} + * + * @return a human-readable tree explaining the query plan + * @see ExplainPlanStageVisitor#explain(QueryPlan) + * @apiNote this is NOT identical to the SQL {@code EXPLAIN PLAN FOR} functionality + * and is instead intended to be used by developers debugging during feature + * development + */ public String explain() { return ExplainPlanStageVisitor.explain(this); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java similarity index 94% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriter.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java index 0cb99c388994..bc7df7157650 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/ShuffleRewriteVisitor.java @@ -37,15 +37,15 @@ /** - * {@code ShuffleRewriter} removes unnecessary shuffles from a stage node plan by + * {@code ShuffleRewriteVisitor} removes unnecessary shuffles from a stage node plan by * inspecting whether all data required by a specific subtree already resides on * a single host. It gathers the information recursively by checking which partitioned * data is selected by each node in the tree. * - *

The only method that should be used externally is {@link #removeShuffles(StageNode)}, + *

The only method that should be used externally is {@link #optimizeShuffles(StageNode)}, * other public methods are used only by {@link StageNode#visit(StageNodeVisitor, Object)}. */ -public class ShuffleRewriter implements StageNodeVisitor, Void> { +public class ShuffleRewriteVisitor implements StageNodeVisitor, Void> { /** * This method rewrites {@code root} in place, removing any unnecessary shuffles @@ -53,14 +53,14 @@ public class ShuffleRewriter implements StageNodeVisitor, Void> { * * @param root the root node of the tree to rewrite */ - public static void removeShuffles(StageNode root) { - root.visit(new ShuffleRewriter(), null); + public static void optimizeShuffles(StageNode root) { + root.visit(new ShuffleRewriteVisitor(), null); } /** - * Access to this class should only be used via {@link #removeShuffles(StageNode)} + * Access to this class should only be used via {@link #optimizeShuffles(StageNode)} */ - private ShuffleRewriter() { + private ShuffleRewriteVisitor() { } @Override diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/AttachStageMetadata.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java similarity index 93% rename from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/AttachStageMetadata.java rename to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java index 98304619baee..cf76134c8a88 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/AttachStageMetadata.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StageMetadataVisitor.java @@ -37,21 +37,21 @@ /** - * {@code AttachStageMetadata} computes the {@link StageMetadata} for a {@link StageNode} + * {@code StageMetadataVisitor} computes the {@link StageMetadata} for a {@link StageNode} * tree and attaches it in the form of a {@link QueryPlan}. */ -public class AttachStageMetadata implements StageNodeVisitor { +public class StageMetadataVisitor implements StageNodeVisitor { public static QueryPlan attachMetadata(List> fields, StageNode root) { QueryPlan queryPlan = new QueryPlan(fields, new HashMap<>(), new HashMap<>()); - root.visit(new AttachStageMetadata(), queryPlan); + root.visit(new StageMetadataVisitor(), queryPlan); return queryPlan; } /** * Usage of this class should only come through {@link #attachMetadata(List, StageNode)}. */ - private AttachStageMetadata() { + private StageMetadataVisitor() { } private void visit(StageNode node, QueryPlan queryPlan) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java index 5c99aeb7ec89..323e7f506b0e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/StagePlanner.java @@ -41,7 +41,7 @@ * This class is non-threadsafe. Do not reuse the stage planner for multiple query plans. */ public class StagePlanner { - private PlannerContext _plannerContext; + private final PlannerContext _plannerContext; private final WorkerManager _workerManager; private int _stageIdCounter; @@ -63,7 +63,7 @@ public QueryPlan makePlan(RelRoot relRoot) { // walk the plan and create stages. StageNode globalStageRoot = walkRelPlan(relRootNode, getNewStageId()); - ShuffleRewriter.removeShuffles(globalStageRoot); + ShuffleRewriteVisitor.optimizeShuffles(globalStageRoot); // global root needs to send results back to the ROOT, a.k.a. the client response node. the last stage only has one // receiver so doesn't matter what the exchange type is. setting it to SINGLETON by default. @@ -75,7 +75,7 @@ public QueryPlan makePlan(RelRoot relRoot) { new MailboxReceiveNode(0, globalStageRoot.getDataSchema(), globalStageRoot.getStageId(), RelDistribution.Type.RANDOM_DISTRIBUTED, null, globalSenderNode); - QueryPlan queryPlan = AttachStageMetadata.attachMetadata(relRoot.fields, globalReceiverNode); + QueryPlan queryPlan = StageMetadataVisitor.attachMetadata(relRoot.fields, globalReceiverNode); // assign workers to each stage. for (Map.Entry e : queryPlan.getStageMetadataMap().entrySet()) { diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java index 0d2428ba74f4..cf90a0005cfd 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/stage/MailboxReceiveNode.java @@ -33,7 +33,9 @@ public class MailboxReceiveNode extends AbstractStageNode { @ProtoProperties private KeySelector _partitionKeySelector; - private StageNode _sender; + // this is only available during planning and should not be relied + // on in any post-serialization code + private transient StageNode _sender; public MailboxReceiveNode(int stageId) { super(stageId); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 012204ef7350..c26ea3bffeab 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -87,7 +87,7 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana _serverExecutor = new ServerQueryExecutorV1Impl(); _serverExecutor.init(config, instanceDataManager, serverMetrics); _workerExecutor = new WorkerQueryExecutor(); - _workerExecutor.init(_mailboxService, _hostname, _port); + _workerExecutor.init(config, serverMetrics, _mailboxService, _hostname, _port); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanBuilder.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java similarity index 97% rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanBuilder.java rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java index 1471320e6dc0..c68fda433c6b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanBuilder.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/PhysicalPlanVisitor.java @@ -47,7 +47,7 @@ import org.apache.pinot.query.runtime.operator.TransformOperator; -public class PhysicalPlanBuilder implements StageNodeVisitor, Void> { +public class PhysicalPlanVisitor implements StageNodeVisitor, Void> { private final MailboxService _mailboxService; private final String _hostName; @@ -55,7 +55,7 @@ public class PhysicalPlanBuilder implements StageNodeVisitor _metadataMap; - public PhysicalPlanBuilder(MailboxService mailboxService, String hostName, int port, + public PhysicalPlanVisitor(MailboxService mailboxService, String hostName, int port, long requestId, Map metadataMap) { _mailboxService = mailboxService; _hostName = hostName; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java index 4e400281352d..7a2153838389 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java @@ -20,6 +20,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; +import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.proto.Mailbox; import org.apache.pinot.common.request.context.ThreadTimer; import org.apache.pinot.core.common.Operator; @@ -29,6 +30,7 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; +import org.apache.pinot.spi.env.PinotConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,16 +43,22 @@ */ public class WorkerQueryExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(WorkerQueryExecutor.class); + private PinotConfiguration _config; + private ServerMetrics _serverMetrics; private MailboxService _mailboxService; private String _hostName; private int _port; - public void init(MailboxService mailboxService, String hostName, int port) { + public void init(PinotConfiguration config, ServerMetrics serverMetrics, + MailboxService mailboxService, String hostName, int port) { + _config = config; + _serverMetrics = serverMetrics; _mailboxService = mailboxService; _hostName = hostName; _port = port; } + public synchronized void start() { LOGGER.info("Worker query executor started"); } @@ -64,7 +72,7 @@ public void processQuery(DistributedStagePlan queryRequest, Map long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID")); StageNode stageRoot = queryRequest.getStageRoot(); - Operator rootOperator = new PhysicalPlanBuilder( + Operator rootOperator = new PhysicalPlanVisitor( _mailboxService, _hostName, _port, requestId, queryRequest.getMetadataMap()).build(stageRoot); executorService.submit(new TraceRunnable() { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java index 9a1c8ab19529..8169fc912186 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java @@ -85,6 +85,7 @@ public MailboxReceiveOperator(MailboxService mailboxServ // joins for inequality conditions). This causes NPEs in the logs, but actually works // because the side that hits the NPE doesn't expect to get any data anyway (that's the // side that gets the broadcast from one side but nothing from the SINGLETON) + // FIXME: https://github.com/apache/pinot/issues/9592 _sendingStageInstances = Collections.singletonList(singletonInstance); } else { _sendingStageInstances = sendingStageInstances;