Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multistage] implement naive round robin operator chain scheduling #9753

Merged
merged 4 commits into from
Nov 15, 2022

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Nov 7, 2022

fixes #9615

This is a follow-up to #9711 and follows the design outlined in this design doc.

This PR implements a round robin operator chain scheduling algorithm and sets up the interface for future PRs that will implement more advanced scheduling. As of this PR, we can be guaranteed that all queries will make progress (see the change in pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SSBQueryIntegrationTest.java, you can now run it under situations with only 2 cores available) but the algorithm is still very hungry for CPU (queries with nothing in their mailbox will still be scheduled).

Review Guide:

  • look at OpChainSchedulerService and RoundRobinScheduler, which contains the logic of yielding threads when there's no more work to be done for a given operator chain
  • look at WorkerQueryExecutor to see where this new scheduler is now wired in as opposed to running the work directly on the old worker pool
  • I added some logic to PhysicalPlanVisitor to collect information on mailboxes so that later we can hook up the mailboxes with the scheduling logic. Probably should have been done in a follow-up PR but 🤷 I was already at it. Let me know if you want me to split it up
  • Look at the corresponding tests

cc @walterddr @61yao

}
}

// TODO: remove this method after we pipe down the proper executor pool to the v1 engine
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit unfortunate, but it's how the existing code works and refactoring it would be out of scope for this PR. While it's not particularly efficient, it also isn't dangerous - V1 queries are non-blocking, so using the same worker pool for executing V1 queries (that are issued as part of V2) and V2 intermediate queries does not threaten liveness.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol should've read this first. good call out.

another way is to simple decoupled the executor service used by v1 from v2. i am not sure which is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think we should split v1/v2 executor pools - that's probably the safest option. Alternatively we may also want three pools: a v2-intermediate pool, a v1-via-v2 pool and a v1-vanilla pool. That will allow us to make sure that clusters that run an existing v1 vanilla workload have their exposure to the v2 engine entirely limited

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, but for another day

// not complete, needs to re-register for scheduling
register(operatorChain);
} else {
LOGGER.info("Execution time: " + timer.getThreadTimeNs());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for more complex scheduling algorithms, we will add a callback here to complete or unregister an operator chain. that requires a unique way to identify operator chains which adds a bit more code so I avoided it for this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. good splitting point

@codecov-commenter
Copy link

codecov-commenter commented Nov 8, 2022

Codecov Report

Merging #9753 (77c080a) into master (2f640ff) will decrease coverage by 0.02%.
The diff coverage is 94.80%.

@@             Coverage Diff              @@
##             master    #9753      +/-   ##
============================================
- Coverage     70.08%   70.05%   -0.03%     
- Complexity     4980     5396     +416     
============================================
  Files          1951     1957       +6     
  Lines        104561   104878     +317     
  Branches      15836    15874      +38     
============================================
+ Hits          73279    73477     +198     
- Misses        26155    26245      +90     
- Partials       5127     5156      +29     
Flag Coverage Δ
integration1 25.36% <0.00%> (-0.08%) ⬇️
integration2 24.56% <0.00%> (-0.08%) ⬇️
unittests1 67.58% <94.80%> (+0.03%) ⬆️
unittests2 15.68% <94.80%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...ot/query/runtime/executor/RoundRobinScheduler.java 87.50% <87.50%> (ø)
...g/apache/pinot/query/runtime/operator/OpChain.java 87.50% <87.50%> (ø)
...uery/runtime/executor/OpChainSchedulerService.java 95.12% <95.12%> (ø)
...va/org/apache/pinot/query/runtime/QueryRunner.java 81.55% <100.00%> (ø)
...ot/query/runtime/executor/WorkerQueryExecutor.java 100.00% <100.00%> (+7.69%) ⬆️
...query/runtime/operator/MailboxReceiveOperator.java 79.66% <100.00%> (+0.71%) ⬆️
.../pinot/query/runtime/plan/PhysicalPlanVisitor.java 97.22% <100.00%> (+0.44%) ⬆️
...va/org/apache/pinot/query/service/QueryServer.java 73.80% <100.00%> (+1.30%) ⬆️
...data/manager/realtime/DefaultSegmentCommitter.java 0.00% <0.00%> (-80.00%) ⬇️
...er/api/resources/LLCSegmentCompletionHandlers.java 43.56% <0.00%> (-18.82%) ⬇️
... and 57 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Comment on lines -69 to -72
if (Runtime.getRuntime().availableProcessors() < MIN_AVAILABLE_CORE_REQUIREMENT) {
throw new SkipException("Skip SSB query testing. Insufficient core count: "
+ Runtime.getRuntime().availableProcessors());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

freaking awesome!!!

Comment on lines 67 to 68
public void processQuery(DistributedStagePlan queryRequest, Map<String, String> requestMetadataMap,
ExecutorService executorService) {
OpChainSchedulerService scheduler) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessarily needed in this PR: let's change this API to directly take operator chain as input

Suggested change
public void processQuery(DistributedStagePlan queryRequest, Map<String, String> requestMetadataMap,
ExecutorService executorService) {
OpChainSchedulerService scheduler) {
public void processQuery(OpChain opChain, OpChainSchedulerService scheduler) {

and make the opChain constructor outside --> this way we can early return if there's any error during opChain construct

Copy link
Contributor Author

@agavra agavra Nov 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just deleted WorkerQueryExecutor altogether, it really doesn't make sense to have that and the scheduler.

The constructing OpChain in the request thread should be a different PR

@@ -128,7 +129,7 @@ public void processQuery(DistributedStagePlan distributedStagePlan, ExecutorServ
for (ServerPlanRequestContext requestContext : serverQueryRequests) {
ServerQueryRequest request = new ServerQueryRequest(requestContext.getInstanceRequest(),
new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()), System.currentTimeMillis());
serverQueryResults.add(processServerQuery(request, executorService));
serverQueryResults.add(processServerQuery(request, scheduler.getWorkerPool()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this means leaf stage are directly scheduled on top of the worker pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, IIRC this is the same as existing behavior if you follow where executorService is created and passed down

*
* @param mailbox the mailbox ID
*/
void onDataAvailable(MailboxIdentifier mailbox);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this the only trigger? (other than register)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's three possible triggers:

  • register
  • onDataAvailable
  • next().getRoot().nextBlock() completes

Triggers are defined in the implementation instead of the interface.

// so long as there's work to be done, keep getting the next block
// when the operator chain returns a NOOP block, then yield the execution
// of this to another worker
TransferableBlock result = operatorChain.getRoot().nextBlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, for the current mechanism, this will return NO-OP everytime it reaches all the way down to mailbox receive and the buffer is empty. yes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 yup that's correct

LOGGER.info("Initialized QueryWorker on port: {} with numWorkerThreads: {}", port,
ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
}

public void start() {
LOGGER.info("Starting QueryWorker");
try {
_scheduler.startAsync().awaitRunning();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops! good catch

@@ -218,6 +221,7 @@ public void start()
_queryRunner = new QueryRunner();
_queryRunner.init(configuration, _instanceDataManager, _helixManager, mockServiceMetrics());
_queryRunner.start();
_scheduler.startAsync().awaitRunning();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown?

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm overall. minor comments please take a look


public static Operator<TransferableBlock> build(StageNode node, PlanRequestContext context) {
return node.visit(INSTANCE, context);
private List<MailboxIdentifier> _inputMailboxIds = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this into the PlanRequestContext? otherwise, we need to adjust the comment for the static usage of this visitor class

Comment on lines 54 to 56
public List<MailboxIdentifier> getInputMailboxes() {
return _inputMailboxes;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seem like this is being used. from what I understand on the triggering mechanism:

  • register
  • onDataAvailable
  • next().getRoot().nextBlock() completes

all should be able to identify which opChain to call based on jobID alone. do we need the input mailbox id list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see what you're saying. Since this isn't used in this PR I'll just clean it up for now and pipe it back in when I add the PR which triggers the scheduler via mailbox data available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you. or adding a unit-test to explain is also good. i am assuming it will be related to #9753 (comment) but I am not sure exactly. so i pop the question.

Comment on lines 50 to 51
private final Monitor _monitor = new Monitor();
protected final Monitor.Guard _hasNextOrClosing = new Monitor.Guard(_monitor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is more of a question --> why one of these is private and the other is protected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is a very good question... I think it was an autocomplete typo

_timer = Suppliers.memoize(ThreadTimer::new)::get;
}

public Operator<TransferableBlock> getRoot() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question for follow up PRs:
assuming the opChain is going to be invoked via this root operator API. I was wondering how we can inform the opChain being scheduled from scheduler.onDataAvailable(mailboxId) API, which mailboxId has new data so we can do better than round-robin checking each mailbox on the list

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's exactly the plan for the next PR - in fact it'll be even better than that, it won't schedule anything unless there's data to be scheduled at all (and it'll sleep until it's notified of available data)

@walterddr walterddr added the multi-stage Related to the multi-stage query engine label Nov 15, 2022

private static final Logger LOGGER = LoggerFactory.getLogger(OpChainSchedulerService.class);

private final OpChainScheduler _scheduler;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment saying this is guarded by monitor below?

// not complete, needs to re-register for scheduling
register(operatorChain);
} else {
LOGGER.info("Execution time: " + timer.getThreadTimeNs());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this logging expensive? I feel there would be a lot of logs if we log the pause every time. Can we have some class such as OpChainStats to hold the data and we decide later where to report them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops good callout. this could be a debug log

* An {@code OpChain} represents a chain of operators that are separated
* by send/receive stages.
*/
public class OpChain {
Copy link
Contributor

@61yao 61yao Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this OpChain should capture the timeout info rather than we rely on MailboxSendOperator timeout. It is clear when we should timeout instead of saying if the root operator times out correctly, this will work.

Copy link
Contributor

@walterddr walterddr Nov 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not sure how to interpret this comment. eventually the timeout needs to be returned by error block in the current architecture. maybe we can clarify more in a concrete example on what other routes opChain can bubble up the timeout

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we don't schedule the chain anymore if we check the opchain times out.
Say we have deadline in Opchain. we can do

while scheduling, we can do
if (now > deadline){
discard the scheduling.
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah! i see.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that suggestion a lot, definitely a good improvement

* An {@code OpChain} represents a chain of operators that are separated
* by send/receive stages.
*/
public class OpChain {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should also have OpChain ID somewhere. maybe in future PRs :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently OpChain is equivalent to stage. we can add later once we have the split logic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant even for stage one, we should have an ID for easier debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

big +1! I actually started adding that but decided that should be done in future PRs (see #9753 (comment))

@walterddr walterddr merged commit 342b6a5 into apache:master Nov 15, 2022
Comment on lines +102 to +104
} catch (Exception e) {
LOGGER.error("Failed to execute query!", e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont think this is returnable via data blocks. echo back to @61yao's comment on timeout. maybe we need other ways to indicate out-of-norm failures. let's follow up on other PR (i don't think we handle these correctly right now either)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
multi-stage Related to the multi-stage query engine
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support non-blocking MailboxReceivedOperator
4 participants