Skip to content

Commit

Permalink
Improve server query cancellation and timeout checking during executi…
Browse files Browse the repository at this point in the history
…on (#9286)

* Improve query cancellation and timeout checking

* Improve query cancellation and timeout checking

* Improve query cancellation and timeout checking

* Address comments

* Address comments

* Address comments
  • Loading branch information
jasperjiaguo authored Aug 29, 2022
1 parent 53c117f commit 15b16e8
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void init(TableDataManagerConfig tableDataManagerConfig, String instanceI
_isStreamSegmentDownloadUntar = tableDataManagerParams.isStreamSegmentDownloadUntar();
if (_isStreamSegmentDownloadUntar) {
LOGGER.info("Using streamed download-untar for segment download! "
+ "The rate limit interval for streamed download-untar is {} ms",
+ "The rate limit interval for streamed download-untar is {} bytes/s",
_streamSegmentDownloadUntarRateLimitBytesPerSec);
}
int maxParallelSegmentDownloads = tableDataManagerParams.getMaxParallelSegmentDownloads();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,15 @@ protected IntermediateResultsBlock mergeResults()
int numBlocksMerged = 0;
long endTimeMs = _queryContext.getEndTimeMs();
while (numBlocksMerged < _numOperators) {
// Timeout has reached, shouldn't continue to process. `_blockingQueue.poll` will continue to return blocks even
// if negative timeout is provided; therefore an extra check is needed
if (endTimeMs - System.currentTimeMillis() < 0) {
return generateTimeOutResult(numBlocksMerged);
}
IntermediateResultsBlock blockToMerge =
_blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (blockToMerge == null) {
// Query times out, skip merging the remaining results blocks
LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
_queryContext);
return new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
new TimeoutException("Timed out while polling results block")));
return generateTimeOutResult(numBlocksMerged);
}
if (blockToMerge.getProcessingExceptions() != null) {
// Caught exception while processing segment, skip merging the remaining results blocks and directly return the
Expand All @@ -224,6 +225,14 @@ protected IntermediateResultsBlock mergeResults()
return mergedBlock;
}

private IntermediateResultsBlock generateTimeOutResult(int numBlocksMerged) {
// Query times out, skip merging the remaining results blocks
LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged,
_queryContext);
return new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
new TimeoutException("Timed out while polling results block")));
}

/**
* Can be overridden for early termination.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.util.GroupByUtils;
import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -57,6 +58,8 @@
@SuppressWarnings("rawtypes")
public class GroupByOrderByCombineOperator extends BaseCombineOperator {
public static final int MAX_TRIM_THRESHOLD = 1_000_000_000;
public static final int MAX_GROUP_BY_KEYS_MERGED_PER_INTERRUPTION_CHECK = 10_000;

private static final Logger LOGGER = LoggerFactory.getLogger(GroupByOrderByCombineOperator.class);

private static final String EXPLAIN_NAME = "COMBINE_GROUPBY_ORDERBY";
Expand Down Expand Up @@ -165,6 +168,8 @@ protected void processSegments(int taskIndex) {
// Merge aggregation group-by result.
// Iterate over the group-by keys, for each key, update the group-by result in the indexedTable
Collection<IntermediateRecord> intermediateRecords = resultsBlock.getIntermediateRecords();
// Count the number of merged keys
int mergedKeys = 0;
// For now, only GroupBy OrderBy query has pre-constructed intermediate records
if (intermediateRecords == null) {
// Merge aggregation group-by result.
Expand All @@ -181,12 +186,16 @@ protected void processSegments(int taskIndex) {
values[_numGroupByExpressions + i] = aggregationGroupByResult.getResultForGroupId(i, groupId);
}
_indexedTable.upsert(new Key(keys), new Record(values));
mergedKeys++;
checkMergePhaseInterruption(mergedKeys);
}
}
} else {
for (IntermediateRecord intermediateResult : intermediateRecords) {
//TODO: change upsert api so that it accepts intermediateRecord directly
_indexedTable.upsert(intermediateResult._key, intermediateResult._record);
mergedKeys++;
checkMergePhaseInterruption(mergedKeys);
}
}
} finally {
Expand All @@ -197,6 +206,13 @@ protected void processSegments(int taskIndex) {
}
}

// Check for thread interruption, every time after merging 10_000 keys
private void checkMergePhaseInterruption(int mergedKeys) {
if (mergedKeys % MAX_GROUP_BY_KEYS_MERGED_PER_INTERRUPTION_CHECK == 0 && Thread.interrupted()) {
throw new EarlyTerminationException();
}
}

@Override
protected void onException(Exception e) {
_mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
Expand Down

0 comments on commit 15b16e8

Please sign in to comment.