Skip to content

Commit

Permalink
address feedback 2
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Oct 13, 2022
1 parent 3178f03 commit f390492
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ public List<Pair<Integer, String>> getQueryResultFields() {
return _queryResultFields;
}

/**
* Explains the {@code QueryPlan}
*
* @return a human-readable tree explaining the query plan
* @see ExplainPlanStageVisitor#explain(QueryPlan)
* @apiNote this is <b>NOT</b> identical to the SQL {@code EXPLAIN PLAN FOR} functionality
* and is instead intended to be used by developers debugging during feature
* development
*/
public String explain() {
return ExplainPlanStageVisitor.explain(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,30 @@


/**
* {@code ShuffleRewriter} removes unnecessary shuffles from a stage node plan by
* {@code ShuffleRewriteVisitor} removes unnecessary shuffles from a stage node plan by
* inspecting whether all data required by a specific subtree already resides on
* a single host. It gathers the information recursively by checking which partitioned
* data is selected by each node in the tree.
*
* <p>The only method that should be used externally is {@link #removeShuffles(StageNode)},
* <p>The only method that should be used externally is {@link #optimizeShuffles(StageNode)},
* other public methods are used only by {@link StageNode#visit(StageNodeVisitor, Object)}.
*/
public class ShuffleRewriter implements StageNodeVisitor<Set<Integer>, Void> {
public class ShuffleRewriteVisitor implements StageNodeVisitor<Set<Integer>, Void> {

/**
* This method rewrites {@code root} <b>in place</b>, removing any unnecessary shuffles
* by replacing the distribution type with {@link RelDistribution.Type#SINGLETON}.
*
* @param root the root node of the tree to rewrite
*/
public static void removeShuffles(StageNode root) {
root.visit(new ShuffleRewriter(), null);
public static void optimizeShuffles(StageNode root) {
root.visit(new ShuffleRewriteVisitor(), null);
}

/**
* Access to this class should only be used via {@link #removeShuffles(StageNode)}
* Access to this class should only be used via {@link #optimizeShuffles(StageNode)}
*/
private ShuffleRewriter() {
private ShuffleRewriteVisitor() {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,21 @@


/**
* {@code AttachStageMetadata} computes the {@link StageMetadata} for a {@link StageNode}
* {@code StageMetadataVisitor} computes the {@link StageMetadata} for a {@link StageNode}
* tree and attaches it in the form of a {@link QueryPlan}.
*/
public class AttachStageMetadata implements StageNodeVisitor<Void, QueryPlan> {
public class StageMetadataVisitor implements StageNodeVisitor<Void, QueryPlan> {

public static QueryPlan attachMetadata(List<Pair<Integer, String>> fields, StageNode root) {
QueryPlan queryPlan = new QueryPlan(fields, new HashMap<>(), new HashMap<>());
root.visit(new AttachStageMetadata(), queryPlan);
root.visit(new StageMetadataVisitor(), queryPlan);
return queryPlan;
}

/**
* Usage of this class should only come through {@link #attachMetadata(List, StageNode)}.
*/
private AttachStageMetadata() {
private StageMetadataVisitor() {
}

private void visit(StageNode node, QueryPlan queryPlan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* This class is non-threadsafe. Do not reuse the stage planner for multiple query plans.
*/
public class StagePlanner {
private PlannerContext _plannerContext;
private final PlannerContext _plannerContext;
private final WorkerManager _workerManager;
private int _stageIdCounter;

Expand All @@ -63,7 +63,7 @@ public QueryPlan makePlan(RelRoot relRoot) {

// walk the plan and create stages.
StageNode globalStageRoot = walkRelPlan(relRootNode, getNewStageId());
ShuffleRewriter.removeShuffles(globalStageRoot);
ShuffleRewriteVisitor.optimizeShuffles(globalStageRoot);

// global root needs to send results back to the ROOT, a.k.a. the client response node. the last stage only has one
// receiver so doesn't matter what the exchange type is. setting it to SINGLETON by default.
Expand All @@ -75,7 +75,7 @@ public QueryPlan makePlan(RelRoot relRoot) {
new MailboxReceiveNode(0, globalStageRoot.getDataSchema(), globalStageRoot.getStageId(),
RelDistribution.Type.RANDOM_DISTRIBUTED, null, globalSenderNode);

QueryPlan queryPlan = AttachStageMetadata.attachMetadata(relRoot.fields, globalReceiverNode);
QueryPlan queryPlan = StageMetadataVisitor.attachMetadata(relRoot.fields, globalReceiverNode);

// assign workers to each stage.
for (Map.Entry<Integer, StageMetadata> e : queryPlan.getStageMetadataMap().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public class MailboxReceiveNode extends AbstractStageNode {
@ProtoProperties
private KeySelector<Object[], Object[]> _partitionKeySelector;

private StageNode _sender;
// this is only available during planning and should not be relied
// on in any post-serialization code
private transient StageNode _sender;

public MailboxReceiveNode(int stageId) {
super(stageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana
_serverExecutor = new ServerQueryExecutorV1Impl();
_serverExecutor.init(config, instanceDataManager, serverMetrics);
_workerExecutor = new WorkerQueryExecutor();
_workerExecutor.init(_mailboxService, _hostname, _port);
_workerExecutor.init(config, serverMetrics, _mailboxService, _hostname, _port);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@
import org.apache.pinot.query.runtime.operator.TransformOperator;


public class PhysicalPlanBuilder implements StageNodeVisitor<Operator<TransferableBlock>, Void> {
public class PhysicalPlanVisitor implements StageNodeVisitor<Operator<TransferableBlock>, Void> {

private final MailboxService<Mailbox.MailboxContent> _mailboxService;
private final String _hostName;
private final int _port;
private final long _requestId;
private final Map<Integer, StageMetadata> _metadataMap;

public PhysicalPlanBuilder(MailboxService<Mailbox.MailboxContent> mailboxService, String hostName, int port,
public PhysicalPlanVisitor(MailboxService<Mailbox.MailboxContent> mailboxService, String hostName, int port,
long requestId, Map<Integer, StageMetadata> metadataMap) {
_mailboxService = mailboxService;
_hostName = hostName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.Mailbox;
import org.apache.pinot.common.request.context.ThreadTimer;
import org.apache.pinot.core.common.Operator;
Expand All @@ -29,6 +30,7 @@
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,16 +43,22 @@
*/
public class WorkerQueryExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerQueryExecutor.class);
private PinotConfiguration _config;
private ServerMetrics _serverMetrics;
private MailboxService<Mailbox.MailboxContent> _mailboxService;
private String _hostName;
private int _port;

public void init(MailboxService<Mailbox.MailboxContent> mailboxService, String hostName, int port) {
public void init(PinotConfiguration config, ServerMetrics serverMetrics,
MailboxService<Mailbox.MailboxContent> mailboxService, String hostName, int port) {
_config = config;
_serverMetrics = serverMetrics;
_mailboxService = mailboxService;
_hostName = hostName;
_port = port;
}


public synchronized void start() {
LOGGER.info("Worker query executor started");
}
Expand All @@ -64,7 +72,7 @@ public void processQuery(DistributedStagePlan queryRequest, Map<String, String>
long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID"));
StageNode stageRoot = queryRequest.getStageRoot();

Operator<TransferableBlock> rootOperator = new PhysicalPlanBuilder(
Operator<TransferableBlock> rootOperator = new PhysicalPlanVisitor(
_mailboxService, _hostName, _port, requestId, queryRequest.getMetadataMap()).build(stageRoot);

executorService.submit(new TraceRunnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public MailboxReceiveOperator(MailboxService<Mailbox.MailboxContent> mailboxServ
// joins for inequality conditions). This causes NPEs in the logs, but actually works
// because the side that hits the NPE doesn't expect to get any data anyway (that's the
// side that gets the broadcast from one side but nothing from the SINGLETON)
// FIXME: https://github.com/apache/pinot/issues/9592
_sendingStageInstances = Collections.singletonList(singletonInstance);
} else {
_sendingStageInstances = sendingStageInstances;
Expand Down

0 comments on commit f390492

Please sign in to comment.