diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java index eace5b0629c0..5776f2c82bbe 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java @@ -44,7 +44,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; -import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -54,7 +53,6 @@ public class SSBQueryIntegrationTest extends BaseClusterIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(SSBQueryIntegrationTest.class); - private static final int MIN_AVAILABLE_CORE_REQUIREMENT = 4; private static final Map SSB_QUICKSTART_TABLE_RESOURCES = ImmutableMap.of( "customer", "examples/batch/ssb/customer", "dates", "examples/batch/ssb/dates", @@ -66,10 +64,6 @@ public class SSBQueryIntegrationTest extends BaseClusterIntegrationTest { @BeforeClass public void setUp() throws Exception { - if (Runtime.getRuntime().availableProcessors() < MIN_AVAILABLE_CORE_REQUIREMENT) { - throw new SkipException("Skip SSB query testing. Insufficient core count: " - + Runtime.getRuntime().availableProcessors()); - } TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); // Start the Pinot cluster diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index 8c0a959c7dca..cd689be88014 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -50,11 +50,15 @@ import org.apache.pinot.query.mailbox.MultiplexingMailboxService; import org.apache.pinot.query.planner.StageMetadata; import org.apache.pinot.query.planner.stage.MailboxSendNode; +import org.apache.pinot.query.planner.stage.StageNode; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; -import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor; +import org.apache.pinot.query.runtime.executor.OpChainSchedulerService; import org.apache.pinot.query.runtime.operator.MailboxSendOperator; +import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; +import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor; +import org.apache.pinot.query.runtime.plan.PlanRequestContext; import org.apache.pinot.query.runtime.plan.ServerRequestPlanVisitor; import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext; import org.apache.pinot.query.service.QueryConfig; @@ -76,7 +80,6 @@ public class QueryRunner { private static final Logger LOGGER = LoggerFactory.getLogger(QueryRunner.class); // This is a temporary before merging the 2 type of executor. private ServerQueryExecutorV1Impl _serverExecutor; - private WorkerQueryExecutor _workerExecutor; private HelixManager _helixManager; private ZkHelixPropertyStore _helixPropertyStore; private MailboxService _mailboxService; @@ -98,8 +101,6 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana _mailboxService = MultiplexingMailboxService.newInstance(_hostname, _port, config); _serverExecutor = new ServerQueryExecutorV1Impl(); _serverExecutor.init(config, instanceDataManager, serverMetrics); - _workerExecutor = new WorkerQueryExecutor(); - _workerExecutor.init(config, serverMetrics, _mailboxService, _hostname, _port); } catch (Exception e) { throw new RuntimeException(e); } @@ -109,16 +110,14 @@ public void start() { _helixPropertyStore = _helixManager.getHelixPropertyStore(); _mailboxService.start(); _serverExecutor.start(); - _workerExecutor.start(); } public void shutDown() { - _workerExecutor.shutDown(); _serverExecutor.shutDown(); _mailboxService.shutdown(); } - public void processQuery(DistributedStagePlan distributedStagePlan, ExecutorService executorService, + public void processQuery(DistributedStagePlan distributedStagePlan, OpChainSchedulerService scheduler, Map requestMetadataMap) { if (isLeafStage(distributedStagePlan)) { // TODO: make server query request return via mailbox, this is a hack to gather the non-streaming data table @@ -132,7 +131,7 @@ public void processQuery(DistributedStagePlan distributedStagePlan, ExecutorServ for (ServerPlanRequestContext requestContext : serverQueryRequests) { ServerQueryRequest request = new ServerQueryRequest(requestContext.getInstanceRequest(), new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis()); - serverQueryResults.add(processServerQuery(request, executorService)); + serverQueryResults.add(processServerQuery(request, scheduler.getWorkerPool())); } MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot(); @@ -148,7 +147,11 @@ public void processQuery(DistributedStagePlan distributedStagePlan, ExecutorServ LOGGER.debug("Acquired transferable block: {}", blockCounter++); } } else { - _workerExecutor.processQuery(distributedStagePlan, requestMetadataMap, executorService); + long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID")); + StageNode stageRoot = distributedStagePlan.getStageRoot(); + OpChain rootOperator = PhysicalPlanVisitor.build(stageRoot, new PlanRequestContext( + _mailboxService, requestId, stageRoot.getStageId(), _hostname, _port, distributedStagePlan.getMetadataMap())); + scheduler.register(rootOperator); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java new file mode 100644 index 000000000000..4a34e493460b --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainScheduler.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.executor; + +import org.apache.pinot.query.mailbox.MailboxIdentifier; +import org.apache.pinot.query.runtime.operator.OpChain; + + +/** + * An interface that defines different scheduling strategies to work with the + * {@link OpChainSchedulerService}. All methods are thread safe and can be guaranteed + * to never be called concurrently - therefore all implementations may use data + * structures that are not concurrent. + */ +public interface OpChainScheduler { + + /** + * @param operatorChain the operator chain to register + */ + void register(OpChain operatorChain); + + /** + * This method is called whenever {@code mailbox} has new data available to consume, + * this can be useful for advanced scheduling algorithms + * + * @param mailbox the mailbox ID + */ + void onDataAvailable(MailboxIdentifier mailbox); + + /** + * @return whether or not there is any work for the scheduler to do + */ + boolean hasNext(); + + /** + * @return the next operator chain to process + * @throws java.util.NoSuchElementException if {@link #hasNext()} returns false + * prior to this call + */ + OpChain next(); +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java new file mode 100644 index 000000000000..87c44774d3bf --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.executor; + +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import com.google.common.util.concurrent.Monitor; +import java.util.concurrent.ExecutorService; +import org.apache.pinot.common.request.context.ThreadTimer; +import org.apache.pinot.core.util.trace.TraceRunnable; +import org.apache.pinot.query.mailbox.MailboxIdentifier; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.operator.OpChain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class provides the implementation for scheduling multistage queries on a single node based + * on the {@link OpChainScheduler} logic that is passed in. Multistage queries support partial execution + * and will return a NOOP metadata block as a "yield" signal, indicating that the next operator + * chain ({@link OpChainScheduler#next()} will be requested. + * + *

Note that a yielded operator chain will be re-registered with the underlying scheduler. + */ +@SuppressWarnings("UnstableApiUsage") +public class OpChainSchedulerService extends AbstractExecutionThreadService { + + private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class); + + private final OpChainScheduler _scheduler; + private final ExecutorService _workerPool; + + // anything that is guarded by this monitor should be non-blocking + private final Monitor _monitor = new Monitor(); + private final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) { + @Override + public boolean isSatisfied() { + return _scheduler.hasNext() || !isRunning(); + } + }; + + public OpChainSchedulerService(OpChainScheduler scheduler, ExecutorService workerPool) { + _scheduler = scheduler; + _workerPool = workerPool; + } + + @Override + protected void triggerShutdown() { + // this wil just notify all waiters that the scheduler is shutting down + _monitor.enter(); + _monitor.leave(); + } + + @Override + protected void run() + throws Exception { + while (isRunning()) { + _monitor.enterWhen(_hasNextOrClosing); + try { + if (!isRunning()) { + return; + } + + OpChain operatorChain = _scheduler.next(); + _workerPool.submit(new TraceRunnable() { + @Override + public void runJob() { + try { + ThreadTimer timer = operatorChain.getAndStartTimer(); + + // so long as there's work to be done, keep getting the next block + // when the operator chain returns a NOOP block, then yield the execution + // of this to another worker + TransferableBlock result = operatorChain.getRoot().nextBlock(); + while (!result.isNoOpBlock() && !result.isEndOfStreamBlock()) { + LOGGER.debug("Got block with {} rows.", result.getNumRows()); + result = operatorChain.getRoot().nextBlock(); + } + + if (!result.isEndOfStreamBlock()) { + // not complete, needs to re-register for scheduling + register(operatorChain); + } else { + LOGGER.info("Execution time: " + timer.getThreadTimeNs()); + } + } catch (Exception e) { + LOGGER.error("Failed to execute query!", e); + } + } + }); + } finally { + _monitor.leave(); + } + } + } + + /** + * Register a new operator chain with the scheduler. + * + * @param operatorChain the chain to register + */ + public final void register(OpChain operatorChain) { + _monitor.enter(); + try { + _scheduler.register(operatorChain); + } finally { + _monitor.leave(); + } + } + + /** + * This method should be called whenever data is available in a given mailbox. + * Implementations of this method should be idempotent, it may be called in the + * scenario that no mail is available. + * + * @param mailbox the identifier of the mailbox that now has data + */ + public final void onDataAvailable(MailboxIdentifier mailbox) { + _monitor.enter(); + try { + _scheduler.onDataAvailable(mailbox); + } finally { + _monitor.leave(); + } + } + + // TODO: remove this method after we pipe down the proper executor pool to the v1 engine + public ExecutorService getWorkerPool() { + return _workerPool; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java new file mode 100644 index 000000000000..ff9c7625369f --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/RoundRobinScheduler.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.executor; + +import java.util.LinkedList; +import java.util.Queue; +import org.apache.pinot.query.mailbox.MailboxIdentifier; +import org.apache.pinot.query.runtime.operator.OpChain; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class RoundRobinScheduler implements OpChainScheduler { + private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinScheduler.class); + + private final Queue _opChainQueue = new LinkedList<>(); + + @Override + public void register(OpChain operatorChain) { + _opChainQueue.add(operatorChain); + } + + @Override + public void onDataAvailable(MailboxIdentifier mailbox) { + // do nothing - this doesn't change order of execution + } + + @Override + public boolean hasNext() { + // don't use _nextOpChain.hasNext() because that may potentially create + // a new iterator that gets tossed + return !_opChainQueue.isEmpty(); + } + + @Override + public OpChain next() { + return _opChainQueue.poll(); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java deleted file mode 100644 index 18bb2defa540..000000000000 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.query.runtime.executor; - -import java.util.Map; -import java.util.concurrent.ExecutorService; -import org.apache.pinot.common.metrics.ServerMetrics; -import org.apache.pinot.common.request.context.ThreadTimer; -import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.util.trace.TraceRunnable; -import org.apache.pinot.query.mailbox.MailboxService; -import org.apache.pinot.query.planner.stage.StageNode; -import org.apache.pinot.query.runtime.blocks.TransferableBlock; -import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; -import org.apache.pinot.query.runtime.plan.DistributedStagePlan; -import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor; -import org.apache.pinot.query.runtime.plan.PlanRequestContext; -import org.apache.pinot.spi.env.PinotConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * WorkerQueryExecutor is the v2 of the {@link org.apache.pinot.core.query.executor.QueryExecutor} API. - * - * It provides not only execution interface for {@link org.apache.pinot.core.query.request.ServerQueryRequest} but - * also a more general {@link DistributedStagePlan}. - */ -public class WorkerQueryExecutor { - private static final Logger LOGGER = LoggerFactory.getLogger(WorkerQueryExecutor.class); - private PinotConfiguration _config; - private ServerMetrics _serverMetrics; - private MailboxService _mailboxService; - private String _hostName; - private int _port; - - public void init(PinotConfiguration config, ServerMetrics serverMetrics, - MailboxService mailboxService, String hostName, int port) { - _config = config; - _serverMetrics = serverMetrics; - _mailboxService = mailboxService; - _hostName = hostName; - _port = port; - } - - - public synchronized void start() { - LOGGER.info("Worker query executor started"); - } - - public synchronized void shutDown() { - LOGGER.info("Worker query executor shut down"); - } - - public void processQuery(DistributedStagePlan queryRequest, Map requestMetadataMap, - ExecutorService executorService) { - long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID")); - StageNode stageRoot = queryRequest.getStageRoot(); - - Operator rootOperator = PhysicalPlanVisitor.build(stageRoot, new PlanRequestContext( - _mailboxService, requestId, stageRoot.getStageId(), _hostName, _port, queryRequest.getMetadataMap())); - - executorService.submit(new TraceRunnable() { - @Override - public void runJob() { - try { - ThreadTimer executionThreadTimer = new ThreadTimer(); - while (!TransferableBlockUtils.isEndOfStream(rootOperator.nextBlock())) { - LOGGER.debug("Result Block acquired"); - } - LOGGER.info("Execution time:" + executionThreadTimer.getThreadTimeNs()); - } catch (Exception e) { - LOGGER.error("Failed to execute query!", e); - } - } - }); - } -} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java index 18fb47a08a23..35fd5ba3434d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java @@ -19,8 +19,10 @@ package org.apache.pinot.query.runtime.operator; import com.google.common.base.Preconditions; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; import org.apache.pinot.common.exception.QueryException; @@ -52,6 +54,7 @@ public class MailboxReceiveOperator extends BaseOperator { private final RelDistribution.Type _exchangeType; private final KeySelector _keySelector; private final List _sendingStageInstances; + private final List _sendingMailboxes; private final DataSchema _dataSchema; private final String _hostName; private final int _port; @@ -96,6 +99,8 @@ public MailboxReceiveOperator(MailboxService mailboxService, _upstreamErrorBlock = null; _keySelector = keySelector; _serverIdx = 0; + + _sendingMailboxes = _sendingStageInstances.stream().map(this::toMailboxId).collect(Collectors.toList()); } @Override @@ -170,4 +175,8 @@ private MailboxIdentifier toMailboxId(ServerInstance serverInstance) { return new StringMailboxIdentifier(String.format("%s_%s", _jobId, _stageId), serverInstance.getHostname(), serverInstance.getQueryMailboxPort(), _hostName, _port); } + + public Collection getSendingMailboxes() { + return _sendingMailboxes; + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java new file mode 100644 index 000000000000..1fa9277b96d9 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.operator; + +import com.google.common.base.Suppliers; +import java.util.function.Supplier; +import org.apache.pinot.common.request.context.ThreadTimer; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; + + +/** + * An {@code OpChain} represents a chain of operators that are separated + * by send/receive stages. + */ +public class OpChain { + + private final Operator _root; + // TODO: build timers that are partial-execution aware + private final Supplier _timer; + + public OpChain(Operator root) { + _root = root; + + // use memoized supplier so that the timing doesn't start until the + // first time we get the timer + _timer = Suppliers.memoize(ThreadTimer::new)::get; + } + + public Operator getRoot() { + return _root; + } + + public ThreadTimer getAndStartTimer() { + return _timer.get(); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java index 32e2d7ddf8a2..f3536af6f65d 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java @@ -20,7 +20,6 @@ import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.planner.StageMetadata; @@ -36,12 +35,14 @@ import org.apache.pinot.query.planner.stage.TableScanNode; import org.apache.pinot.query.planner.stage.ValueNode; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.executor.OpChainSchedulerService; import org.apache.pinot.query.runtime.operator.AggregateOperator; import org.apache.pinot.query.runtime.operator.FilterOperator; import org.apache.pinot.query.runtime.operator.HashJoinOperator; import org.apache.pinot.query.runtime.operator.LiteralValueOperator; import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; import org.apache.pinot.query.runtime.operator.MailboxSendOperator; +import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.operator.SortOperator; import org.apache.pinot.query.runtime.operator.TransformOperator; @@ -53,13 +54,15 @@ * *

This class should be used statically via {@link #build(StageNode, PlanRequestContext)} * - * @see org.apache.pinot.query.runtime.QueryRunner#processQuery(DistributedStagePlan, ExecutorService, Map) + * @see org.apache.pinot.query.runtime.QueryRunner#processQuery(DistributedStagePlan, OpChainSchedulerService, Map) */ public class PhysicalPlanVisitor implements StageNodeVisitor, PlanRequestContext> { + private static final PhysicalPlanVisitor INSTANCE = new PhysicalPlanVisitor(); - public static Operator build(StageNode node, PlanRequestContext context) { - return node.visit(INSTANCE, context); + public static OpChain build(StageNode node, PlanRequestContext context) { + Operator root = node.visit(INSTANCE, context); + return new OpChain(root); } @Override diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java index 0a74ee820b92..197338190c38 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java @@ -25,7 +25,6 @@ import io.grpc.stub.StreamObserver; import java.io.IOException; import java.util.Map; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.pinot.common.proto.PinotQueryWorkerGrpc; import org.apache.pinot.common.proto.Worker; @@ -33,6 +32,8 @@ import org.apache.pinot.core.query.scheduler.resources.ResourceManager; import org.apache.pinot.core.transport.grpc.GrpcQueryServer; import org.apache.pinot.query.runtime.QueryRunner; +import org.apache.pinot.query.runtime.executor.OpChainSchedulerService; +import org.apache.pinot.query.runtime.executor.RoundRobinScheduler; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; import org.slf4j.Logger; @@ -42,18 +43,22 @@ /** * {@link QueryServer} is the GRPC server that accepts query plan requests sent from {@link QueryDispatcher}. */ +@SuppressWarnings("UnstableApiUsage") public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { private static final Logger LOGGER = LoggerFactory.getLogger(GrpcQueryServer.class); private final Server _server; private final QueryRunner _queryRunner; - private final ExecutorService _executorService; + private final OpChainSchedulerService _scheduler; public QueryServer(int port, QueryRunner queryRunner) { _server = ServerBuilder.forPort(port).addService(this).build(); - _executorService = Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_WORKER_THREADS, - new NamedThreadFactory("query_worker_on_" + port + "_port")); + _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(), + Executors.newFixedThreadPool( + ResourceManager.DEFAULT_QUERY_WORKER_THREADS, + new NamedThreadFactory("query_worker_on_" + port + "_port"))); _queryRunner = queryRunner; + LOGGER.info("Initialized QueryWorker on port: {} with numWorkerThreads: {}", port, ResourceManager.DEFAULT_QUERY_WORKER_THREADS); } @@ -61,6 +66,7 @@ public QueryServer(int port, QueryRunner queryRunner) { public void start() { LOGGER.info("Starting QueryWorker"); try { + _scheduler.startAsync().awaitRunning(); _queryRunner.start(); _server.start(); } catch (IOException e) { @@ -70,9 +76,13 @@ public void start() { public void shutdown() { LOGGER.info("Shutting down QueryWorker"); + _queryRunner.shutDown(); + _scheduler.stopAsync(); + _server.shutdown(); + try { - _queryRunner.shutDown(); - _server.shutdown().awaitTermination(); + _scheduler.awaitTerminated(); + _server.awaitTermination(); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -105,7 +115,7 @@ public void submit(Worker.QueryRequest request, StreamObserver _runnerConfig = new HashMap<>(); private final InstanceDataManager _instanceDataManager; @@ -80,8 +81,10 @@ public QueryServerEnclosure(MockInstanceDataManagerFactory factory) { _runnerConfig.put(QueryConfig.KEY_OF_QUERY_RUNNER_HOSTNAME, String.format("Server_%s", QueryConfig.DEFAULT_QUERY_RUNNER_HOSTNAME)); _queryRunner = new QueryRunner(); - _testExecutor = Executors.newFixedThreadPool(DEFAULT_EXECUTOR_THREAD_NUM, - new NamedThreadFactory("test_query_server_enclosure_on_" + _queryRunnerPort + "_port")); + _scheduler = new OpChainSchedulerService(new RoundRobinScheduler(), + Executors.newFixedThreadPool( + DEFAULT_EXECUTOR_THREAD_NUM, + new NamedThreadFactory("test_query_server_enclosure_on_" + _queryRunnerPort + "_port"))); } catch (Exception e) { throw new RuntimeException("Test Failed!", e); } @@ -120,13 +123,15 @@ public void start() _queryRunner = new QueryRunner(); _queryRunner.init(configuration, _instanceDataManager, _helixManager, mockServiceMetrics()); _queryRunner.start(); + _scheduler.startAsync().awaitRunning(); } public void shutDown() { _queryRunner.shutDown(); + _scheduler.stopAsync().awaitTerminated(); } public void processQuery(DistributedStagePlan distributedStagePlan, Map requestMetadataMap) { - _queryRunner.processQuery(distributedStagePlan, _testExecutor, requestMetadataMap); + _queryRunner.processQuery(distributedStagePlan, _scheduler, requestMetadataMap); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java new file mode 100644 index 000000000000..7cd1f48e1044 --- /dev/null +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.executor; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.operator.OpChain; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class OpChainSchedulerServiceTest { + + private ExecutorService _executor; + private AutoCloseable _mocks; + + @Mock + private Operator _operatorA; + @Mock + private Operator _operatorB; + @Mock + private OpChainScheduler _scheduler; + + @BeforeClass + public void beforeClass() { + _mocks = MockitoAnnotations.openMocks(this); + } + + @AfterClass + public void afterClass() + throws Exception { + _mocks.close(); + } + + @AfterMethod + public void afterMethod() { + _executor.shutdownNow(); + } + + private void initExecutor(int numThreads) { + _executor = Executors.newFixedThreadPool(numThreads); + } + + private OpChain getChain(Operator operator) { + return new OpChain(operator); + } + + @Test + public void shouldScheduleSingleOpChainRegisteredAfterStart() + throws InterruptedException { + // Given: + initExecutor(1); + Mockito.when(_scheduler.hasNext()).thenReturn(true); + Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA)); + OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor); + + CountDownLatch latch = new CountDownLatch(1); + Mockito.when(_operatorA.nextBlock()).thenAnswer(inv -> { + latch.countDown(); + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + }); + + // When: + scheduler.startAsync().awaitRunning(); + scheduler.register(new OpChain(_operatorA)); + + // Then: + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds"); + scheduler.stopAsync().awaitTerminated(); + } + + @Test + public void shouldScheduleSingleOpChainRegisteredBeforeStart() + throws InterruptedException { + // Given: + initExecutor(1); + Mockito.when(_scheduler.hasNext()).thenReturn(true); + Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA)); + OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor); + + CountDownLatch latch = new CountDownLatch(1); + Mockito.when(_operatorA.nextBlock()).thenAnswer(inv -> { + latch.countDown(); + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + }); + + // When: + scheduler.register(new OpChain(_operatorA)); + scheduler.startAsync().awaitRunning(); + + // Then: + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds"); + scheduler.stopAsync().awaitTerminated(); + } + + @Test + public void shouldReRegisterOpChainOnNoOpBlock() + throws InterruptedException { + // Given: + initExecutor(1); + Mockito.when(_scheduler.hasNext()).thenReturn(true); + Mockito.when(_scheduler.next()).thenReturn(getChain(_operatorA)); + OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor); + + CountDownLatch latch = new CountDownLatch(1); + Mockito.when(_operatorA.nextBlock()) + .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()) + .thenAnswer(inv -> { + latch.countDown(); + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + }); + + // When: + scheduler.startAsync().awaitRunning(); + scheduler.register(new OpChain(_operatorA)); + + // Then: + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds"); + Mockito.verify(_scheduler, Mockito.times(2)).register(Mockito.any(OpChain.class)); + scheduler.stopAsync().awaitTerminated(); + } + + @Test + public void shouldYieldOpChainsWhenNoWorkCanBeDone() + throws InterruptedException { + // Given: + initExecutor(1); + Mockito.when(_scheduler.hasNext()).thenReturn(true); + Mockito.when(_scheduler.next()) + .thenReturn(getChain(_operatorA)) + .thenReturn(getChain(_operatorB)) + .thenReturn(getChain(_operatorA)); + OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor); + + AtomicBoolean opAReturnedNoOp = new AtomicBoolean(false); + AtomicBoolean hasOpBRan = new AtomicBoolean(false); + + CountDownLatch latch = new CountDownLatch(1); + Mockito.when(_operatorA.nextBlock()).thenAnswer(inv -> { + if (hasOpBRan.get()) { + latch.countDown(); + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + } else { + opAReturnedNoOp.set(true); + return TransferableBlockUtils.getNoOpTransferableBlock(); + } + }); + + Mockito.when(_operatorB.nextBlock()).thenAnswer(inv -> { + hasOpBRan.set(true); + return TransferableBlockUtils.getEndOfStreamTransferableBlock(); + }); + + // When: + scheduler.startAsync().awaitRunning(); + scheduler.register(new OpChain(_operatorA)); + scheduler.register(new OpChain(_operatorB)); + + // Then: + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected await to be called in less than 10 seconds"); + Assert.assertTrue(opAReturnedNoOp.get(), "expected opA to be scheduled first"); + scheduler.stopAsync().awaitTerminated(); + } + + @Test + public void shouldNotCallSchedulerNextWhenHasNextReturnsFalse() + throws InterruptedException { + // Given: + initExecutor(1); + CountDownLatch latch = new CountDownLatch(1); + Mockito.when(_scheduler.hasNext()).thenAnswer(inv -> { + latch.countDown(); + return false; + }); + OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor); + + // When: + scheduler.startAsync().awaitRunning(); + + // Then: + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS), "expected hasNext to be called"); + scheduler.stopAsync().awaitTerminated(); + Mockito.verify(_scheduler, Mockito.never()).next(); + } + + @Test + public void shouldReevaluateHasNextWhenOnDataAvailableIsCalled() + throws InterruptedException { + // Given: + initExecutor(1); + CountDownLatch firstHasNext = new CountDownLatch(1); + CountDownLatch secondHasNext = new CountDownLatch(1); + Mockito.when(_scheduler.hasNext()).thenAnswer(inv -> { + firstHasNext.countDown(); + return false; + }).then(inv -> { + secondHasNext.countDown(); + return false; + }); + + OpChainSchedulerService scheduler = new OpChainSchedulerService(_scheduler, _executor); + + // When: + scheduler.startAsync().awaitRunning(); + Assert.assertTrue(firstHasNext.await(10, TimeUnit.SECONDS), "expected hasNext to be called"); + scheduler.onDataAvailable(null); + + // Then: + Assert.assertTrue(secondHasNext.await(10, TimeUnit.SECONDS), "expected hasNext to be called again"); + scheduler.stopAsync().awaitTerminated(); + } +} diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java new file mode 100644 index 000000000000..0c92403656f1 --- /dev/null +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/RoundRobinSchedulerTest.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.executor; + +import org.apache.pinot.query.runtime.operator.OpChain; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class RoundRobinSchedulerTest { + + @Test + public void shouldPollOperators() { + // Given: + OpChain opChain = Mockito.mock(OpChain.class); + RoundRobinScheduler scheduler = new RoundRobinScheduler(); + + // When: + scheduler.register(opChain); + + // Then: + Assert.assertTrue(scheduler.hasNext(), "expected next"); + Assert.assertEquals(scheduler.next(), opChain); + Assert.assertFalse(scheduler.hasNext(), "should no longer have next after polling"); + } +} diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java index 0ca5cc656a11..b1d8edececb0 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.Random; -import java.util.concurrent.ExecutorService; import org.apache.pinot.common.proto.PinotQueryWorkerGrpc; import org.apache.pinot.common.proto.Worker; import org.apache.pinot.core.transport.ServerInstance; @@ -38,6 +37,7 @@ import org.apache.pinot.query.planner.stage.StageNode; import org.apache.pinot.query.routing.WorkerInstance; import org.apache.pinot.query.runtime.QueryRunner; +import org.apache.pinot.query.runtime.executor.OpChainSchedulerService; import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; import org.apache.pinot.query.testutils.QueryTestUtils; import org.apache.pinot.util.TestUtils; @@ -119,7 +119,7 @@ public void testWorkerAcceptsWorkerRequestCorrect(String sql) StageNode stageNode = queryPlan.getQueryStageMap().get(stageId); return isStageNodesEqual(stageNode, distributedStagePlan.getStageRoot()) && isMetadataMapsEqual(stageMetadata, distributedStagePlan.getMetadataMap().get(stageId)); - }), any(ExecutorService.class), Mockito.argThat(requestMetadataMap -> + }), any(OpChainSchedulerService.class), Mockito.argThat(requestMetadataMap -> requestIdStr.equals(requestMetadataMap.get("REQUEST_ID")))); return true; } catch (Throwable t) {