Skip to content

Commit

Permalink
Fix skip segment logic in MinMaxValueBasedSelectionOrderByCombineOper…
Browse files Browse the repository at this point in the history
…ator (#9434)
  • Loading branch information
Jackie-Jiang authored Sep 20, 2022
1 parent 985d0b5 commit b043dd8
Showing 1 changed file with 24 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,21 +60,22 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator extends BaseCombine

private static final String EXPLAIN_NAME = "COMBINE_SELECT_ORDERBY_MINMAX";

// For min/max value based combine, when a thread detects that no more segments need to be processed, it inserts this
// special IntermediateResultsBlock into the BlockingQueue to awake the main thread
private static final BaseResultsBlock LAST_RESULTS_BLOCK =
// For min/max value based combine, when a thread detects that no more segment needs to be processed, it inserts this
// special results block, which can be skipped during the merge phase
private static final BaseResultsBlock EMPTY_RESULTS_BLOCK =
new SelectionResultsBlock(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]),
Collections.emptyList());

// Use an AtomicInteger to track the number of operators skipped (no result inserted into the BlockingQueue)
private final AtomicInteger _numOperatorsSkipped = new AtomicInteger();
private final AtomicReference<Comparable> _globalBoundaryValue = new AtomicReference<>();
// Use an AtomicInteger to track the end operator id, beyond which no operator needs to be processed
private final AtomicInteger _endOperatorId;
private final int _numRowsToKeep;
private final List<MinMaxValueContext> _minMaxValueContexts;
private final AtomicReference<Comparable> _globalBoundaryValue = new AtomicReference<>();

MinMaxValueBasedSelectionOrderByCombineOperator(List<Operator> operators, QueryContext queryContext,
ExecutorService executorService) {
super(operators, queryContext, executorService);
_endOperatorId = new AtomicInteger(_numOperators);
_numRowsToKeep = queryContext.getLimit() + queryContext.getOffset();

List<OrderByExpressionContext> orderByExpressions = _queryContext.getOrderByExpressions();
Expand Down Expand Up @@ -146,6 +147,11 @@ protected void processSegments() {

int operatorId;
while ((operatorId = _nextOperatorId.getAndIncrement()) < _numOperators) {
if (operatorId >= _endOperatorId.get()) {
_blockingQueue.offer(EMPTY_RESULTS_BLOCK);
continue;
}

// Calculate the boundary value from global boundary and thread boundary
Comparable boundaryValue = _globalBoundaryValue.get();
if (boundaryValue == null) {
Expand Down Expand Up @@ -173,9 +179,9 @@ protected void processSegments() {
if (minMaxValueContext._minValue != null) {
int result = minMaxValueContext._minValue.compareTo(boundaryValue);
if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
_numOperatorsSkipped.getAndAdd((_numOperators - operatorId - 1) / _numTasks);
_blockingQueue.offer(LAST_RESULTS_BLOCK);
return;
_endOperatorId.set(operatorId);
_blockingQueue.offer(EMPTY_RESULTS_BLOCK);
continue;
}
}
} else {
Expand All @@ -184,9 +190,9 @@ protected void processSegments() {
if (minMaxValueContext._maxValue != null) {
int result = minMaxValueContext._maxValue.compareTo(boundaryValue);
if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
_numOperatorsSkipped.getAndAdd((_numOperators - operatorId - 1) / _numTasks);
_blockingQueue.offer(LAST_RESULTS_BLOCK);
return;
_endOperatorId.set(operatorId);
_blockingQueue.offer(EMPTY_RESULTS_BLOCK);
continue;
}
}
}
Expand Down Expand Up @@ -248,7 +254,7 @@ protected BaseResultsBlock mergeResults()
SelectionResultsBlock mergedBlock = null;
int numBlocksMerged = 0;
long endTimeMs = _queryContext.getEndTimeMs();
while (numBlocksMerged + _numOperatorsSkipped.get() < _numOperators) {
while (numBlocksMerged < _numOperators) {
BaseResultsBlock blockToMerge =
_blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (blockToMerge == null) {
Expand All @@ -258,6 +264,10 @@ protected BaseResultsBlock mergeResults()
return new ExceptionResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
new TimeoutException("Timed out while polling results block")));
}
if (blockToMerge == EMPTY_RESULTS_BLOCK) {
numBlocksMerged++;
continue;
}
if (blockToMerge.getProcessingExceptions() != null) {
// Caught exception while processing segment, skip merging the remaining results blocks and directly return
// the exception
Expand All @@ -266,9 +276,7 @@ protected BaseResultsBlock mergeResults()
if (mergedBlock == null) {
mergedBlock = (SelectionResultsBlock) blockToMerge;
} else {
if (blockToMerge != LAST_RESULTS_BLOCK) {
mergeResultsBlocks(mergedBlock, (SelectionResultsBlock) blockToMerge);
}
mergeResultsBlocks(mergedBlock, (SelectionResultsBlock) blockToMerge);
}
numBlocksMerged++;

Expand Down

0 comments on commit b043dd8

Please sign in to comment.