diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java index f157ce61581e..e818a6dde723 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java @@ -60,21 +60,22 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine private static final String EXPLAIN_NAME = "COMBINE_SELECT_ORDERBY_MINMAX"; - // For min/max value based combine, when a thread detects that no more segments need to be processed, it inserts this - // special IntermediateResultsBlock into the BlockingQueue to awake the main thread - private static final BaseResultsBlock LAST_RESULTS_BLOCK = + // For min/max value based combine, when a thread detects that no more segment needs to be processed, it inserts this + // special results block, which can be skipped during the merge phase + private static final BaseResultsBlock EMPTY_RESULTS_BLOCK = new SelectionResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]), Collections.emptyList()); - // Use an AtomicInteger to track the number of operators skipped (no result inserted into the BlockingQueue) - private final AtomicInteger _numOperatorsSkipped = new AtomicInteger(); - private final AtomicReference _globalBoundaryValue = new AtomicReference<>(); + // Use an AtomicInteger to track the end operator id, beyond which no operator needs to be processed + private final AtomicInteger _endOperatorId; private final int _numRowsToKeep; private final List _minMaxValueContexts; + private final AtomicReference _globalBoundaryValue = new AtomicReference<>(); MinMaxValueBasedSelectionOrderByCombineOperator(List operators, QueryContext queryContext, ExecutorService executorService) { super(operators, queryContext, executorService); + _endOperatorId = new AtomicInteger(_numOperators); _numRowsToKeep = queryContext.getLimit() + queryContext.getOffset(); List orderByExpressions = _queryContext.getOrderByExpressions(); @@ -146,6 +147,11 @@ protected void processSegments() { int operatorId; while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) { + if (operatorId >= _endOperatorId.get()) { + _blockingQueue.offer(EMPTY_RESULTS_BLOCK); + continue; + } + // Calculate the boundary value from global boundary and thread boundary Comparable boundaryValue = _globalBoundaryValue.get(); if (boundaryValue == null) { @@ -173,9 +179,9 @@ protected void processSegments() { if (minMaxValueContext._minValue != null) { int result = minMaxValueContext._minValue.compareTo(boundaryValue); if (result > 0 || (result == 0 && numOrderByExpressions == 1)) { - _numOperatorsSkipped.getAndAdd((_numOperators - operatorId - 1) / _numTasks); - _blockingQueue.offer(LAST_RESULTS_BLOCK); - return; + _endOperatorId.set(operatorId); + _blockingQueue.offer(EMPTY_RESULTS_BLOCK); + continue; } } } else { @@ -184,9 +190,9 @@ protected void processSegments() { if (minMaxValueContext._maxValue != null) { int result = minMaxValueContext._maxValue.compareTo(boundaryValue); if (result < 0 || (result == 0 && numOrderByExpressions == 1)) { - _numOperatorsSkipped.getAndAdd((_numOperators - operatorId - 1) / _numTasks); - _blockingQueue.offer(LAST_RESULTS_BLOCK); - return; + _endOperatorId.set(operatorId); + _blockingQueue.offer(EMPTY_RESULTS_BLOCK); + continue; } } } @@ -248,7 +254,7 @@ protected BaseResultsBlock mergeResults() SelectionResultsBlock mergedBlock = null; int numBlocksMerged = 0; long endTimeMs = _queryContext.getEndTimeMs(); - while (numBlocksMerged + _numOperatorsSkipped.get() < _numOperators) { + while (numBlocksMerged < _numOperators) { BaseResultsBlock blockToMerge = _blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); if (blockToMerge == null) { @@ -258,6 +264,10 @@ protected BaseResultsBlock mergeResults() return new ExceptionResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, new TimeoutException("Timed out while polling results block"))); } + if (blockToMerge == EMPTY_RESULTS_BLOCK) { + numBlocksMerged++; + continue; + } if (blockToMerge.getProcessingExceptions() != null) { // Caught exception while processing segment, skip merging the remaining results blocks and directly return // the exception @@ -266,9 +276,7 @@ protected BaseResultsBlock mergeResults() if (mergedBlock == null) { mergedBlock = (SelectionResultsBlock) blockToMerge; } else { - if (blockToMerge != LAST_RESULTS_BLOCK) { - mergeResultsBlocks(mergedBlock, (SelectionResultsBlock) blockToMerge); - } + mergeResultsBlocks(mergedBlock, (SelectionResultsBlock) blockToMerge); } numBlocksMerged++;