From 4ce8a110d46cca4bdfae4b35ac0ed815be0affd3 Mon Sep 17 00:00:00 2001 From: Jia Guo Date: Fri, 26 Aug 2022 14:17:57 -0700 Subject: [PATCH 1/6] Improve query cancellation and timeout checking --- .../core/data/manager/BaseTableDataManager.java | 2 +- .../operator/combine/BaseCombineOperator.java | 5 ++++- .../combine/GroupByOrderByCombineOperator.java | 16 ++++++++++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index bcb21234c4b8..cb2f491c2275 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -142,7 +142,7 @@ public void init(TableDataManagerConfig tableDataManagerConfig, String instanceI _isStreamSegmentDownloadUntar = tableDataManagerParams.isStreamSegmentDownloadUntar(); if (_isStreamSegmentDownloadUntar) { LOGGER.info("Using streamed download-untar for segment download! " - + "The rate limit interval for streamed download-untar is {} ms", + + "The rate limit interval for streamed download-untar is {} bytes", _streamSegmentDownloadUntarRateLimitBytesPerSec); } int maxParallelSegmentDownloads = tableDataManagerParams.getMaxParallelSegmentDownloads(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index 9adaec05f053..444fb3a998d2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -198,7 +198,10 @@ protected IntermediateResultsBlock mergeResults() while (numBlocksMerged < _numOperators) { IntermediateResultsBlock blockToMerge = _blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - if (blockToMerge == null) { + // Timeout has reached, shouldn't continue to process. `_blockingQueue.poll` will continue to return blocks even + // if negative timeout is provided; therefore an extra check is needed + boolean timeoutReached = endTimeMs < System.currentTimeMillis(); + if (blockToMerge == null || timeoutReached) { // Query times out, skip merging the remaining results blocks LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged, _queryContext); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java index f12e1c9b5e72..738969a930df 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java @@ -45,9 +45,12 @@ import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.util.GroupByUtils; +import org.apache.pinot.spi.exception.EarlyTerminationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.core.plan.DocIdSetPlanNode.MAX_DOC_PER_CALL; + /** * Combine operator for aggregation group-by queries with SQL semantic. @@ -165,6 +168,8 @@ protected void processSegments(int taskIndex) { // Merge aggregation group-by result. // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable Collection intermediateRecords = resultsBlock.getIntermediateRecords(); + // Count the number of merged keys + long mergedKeys = 0; // For now, only GroupBy OrderBy query has pre-constructed intermediate records if (intermediateRecords == null) { // Merge aggregation group-by result. @@ -181,12 +186,16 @@ protected void processSegments(int taskIndex) { values[_numGroupByExpressions + i] = aggregationGroupByResult.getResultForGroupId(i, groupId); } _indexedTable.upsert(new Key(keys), new Record(values)); + mergedKeys++; + checkMergePhaseInterruption(mergedKeys); } } } else { for (IntermediateRecord intermediateResult : intermediateRecords) { //TODO: change upsert api so that it accepts intermediateRecord directly _indexedTable.upsert(intermediateResult._key, intermediateResult._record); + mergedKeys++; + checkMergePhaseInterruption(mergedKeys); } } } finally { @@ -197,6 +206,13 @@ protected void processSegments(int taskIndex) { } } + // Check for thread interruption, every time after merging 10_000 keys + private void checkMergePhaseInterruption(long mergedKeys) { + if (mergedKeys % MAX_DOC_PER_CALL == 0 && Thread.interrupted()) { + throw new EarlyTerminationException(); + } + } + @Override protected void onException(Exception e) { _mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); From fcfb27a562e51aad1f99a968e49ff59aad21b745 Mon Sep 17 00:00:00 2001 From: Jia Guo Date: Fri, 26 Aug 2022 14:29:59 -0700 Subject: [PATCH 2/6] Improve query cancellation and timeout checking --- .../core/operator/combine/GroupByOrderByCombineOperator.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java index 738969a930df..61628c9acbdb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java @@ -48,8 +48,7 @@ import org.apache.pinot.spi.exception.EarlyTerminationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import static org.apache.pinot.core.plan.DocIdSetPlanNode.MAX_DOC_PER_CALL; +import org.apache.pinot.core.plan.DocIdSetPlanNode; /** @@ -208,7 +207,7 @@ protected void processSegments(int taskIndex) { // Check for thread interruption, every time after merging 10_000 keys private void checkMergePhaseInterruption(long mergedKeys) { - if (mergedKeys % MAX_DOC_PER_CALL == 0 && Thread.interrupted()) { + if (mergedKeys % DocIdSetPlanNode.MAX_DOC_PER_CALL == 0 && Thread.interrupted()) { throw new EarlyTerminationException(); } } From 161dbb0a7f622b60bebcada8ff38e9c359f4c94c Mon Sep 17 00:00:00 2001 From: Jia Guo Date: Fri, 26 Aug 2022 14:41:05 -0700 Subject: [PATCH 3/6] Improve query cancellation and timeout checking --- .../core/operator/combine/GroupByOrderByCombineOperator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java index 61628c9acbdb..94e45b7b0a67 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java @@ -40,6 +40,7 @@ import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable; import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; @@ -48,7 +49,6 @@ import org.apache.pinot.spi.exception.EarlyTerminationException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.pinot.core.plan.DocIdSetPlanNode; /** From 53834b73eb1b1a4784a9f7d3bc98077b1dcb2d23 Mon Sep 17 00:00:00 2001 From: Jia Guo Date: Sat, 27 Aug 2022 23:35:55 -0700 Subject: [PATCH 4/6] Address comments --- .../data/manager/BaseTableDataManager.java | 2 +- .../operator/combine/BaseCombineOperator.java | 24 ++++++++++++------- .../GroupByOrderByCombineOperator.java | 9 +++---- 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index cb2f491c2275..a483c51cdc5c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -142,7 +142,7 @@ public void init(TableDataManagerConfig tableDataManagerConfig, String instanceI _isStreamSegmentDownloadUntar = tableDataManagerParams.isStreamSegmentDownloadUntar(); if (_isStreamSegmentDownloadUntar) { LOGGER.info("Using streamed download-untar for segment download! " - + "The rate limit interval for streamed download-untar is {} bytes", + + "The rate limit interval for streamed download-untar is {} bytes/s", _streamSegmentDownloadUntarRateLimitBytesPerSec); } int maxParallelSegmentDownloads = tableDataManagerParams.getMaxParallelSegmentDownloads(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index 444fb3a998d2..4ed1960e3c89 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -196,17 +196,15 @@ protected IntermediateResultsBlock mergeResults() int numBlocksMerged = 0; long endTimeMs = _queryContext.getEndTimeMs(); while (numBlocksMerged < _numOperators) { - IntermediateResultsBlock blockToMerge = - _blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); // Timeout has reached, shouldn't continue to process. `_blockingQueue.poll` will continue to return blocks even // if negative timeout is provided; therefore an extra check is needed - boolean timeoutReached = endTimeMs < System.currentTimeMillis(); - if (blockToMerge == null || timeoutReached) { - // Query times out, skip merging the remaining results blocks - LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged, - _queryContext); - return new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, - new TimeoutException("Timed out while polling results block"))); + if (endTimeMs - System.currentTimeMillis() < 0){ + return generateTimeOutResult(numBlocksMerged); + } + IntermediateResultsBlock blockToMerge = + _blockingQueue.poll(endTimeMs - System.currentTimeMillis(), TimeUnit.MILLISECONDS); + if (blockToMerge == null) { + return generateTimeOutResult(numBlocksMerged); } if (blockToMerge.getProcessingExceptions() != null) { // Caught exception while processing segment, skip merging the remaining results blocks and directly return the @@ -227,6 +225,14 @@ protected IntermediateResultsBlock mergeResults() return mergedBlock; } + private IntermediateResultsBlock generateTimeOutResult(int numBlocksMerged) { + // Query times out, skip merging the remaining results blocks + LOGGER.error("Timed out while polling results block, numBlocksMerged: {} (query: {})", numBlocksMerged, + _queryContext); + return new IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR, + new TimeoutException("Timed out while polling results block"))); + } + /** * Can be overridden for early termination. */ diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java index 94e45b7b0a67..1480d6cc4d0b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java @@ -40,7 +40,6 @@ import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable; import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; -import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; @@ -59,6 +58,8 @@ @SuppressWarnings("rawtypes") public class GroupByOrderByCombineOperator extends BaseCombineOperator { public static final int MAX_TRIM_THRESHOLD = 1_000_000_000; + public static final int MAX_GROUP_BY_KEYS_PER_MERGE_CALL = 10_000; + private static final Logger LOGGER = LoggerFactory.getLogger(GroupByOrderByCombineOperator.class); private static final String EXPLAIN_NAME = "COMBINE_GROUPBY_ORDERBY"; @@ -168,7 +169,7 @@ protected void processSegments(int taskIndex) { // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable Collection intermediateRecords = resultsBlock.getIntermediateRecords(); // Count the number of merged keys - long mergedKeys = 0; + int mergedKeys = 0; // For now, only GroupBy OrderBy query has pre-constructed intermediate records if (intermediateRecords == null) { // Merge aggregation group-by result. @@ -206,8 +207,8 @@ protected void processSegments(int taskIndex) { } // Check for thread interruption, every time after merging 10_000 keys - private void checkMergePhaseInterruption(long mergedKeys) { - if (mergedKeys % DocIdSetPlanNode.MAX_DOC_PER_CALL == 0 && Thread.interrupted()) { + private void checkMergePhaseInterruption(int mergedKeys) { + if (mergedKeys % MAX_GROUP_BY_KEYS_PER_MERGE_CALL == 0 && Thread.interrupted()) { throw new EarlyTerminationException(); } } From f7173c3e1d5621d99faffec74e507569128bfc90 Mon Sep 17 00:00:00 2001 From: Jia Guo Date: Sat, 27 Aug 2022 23:37:54 -0700 Subject: [PATCH 5/6] Address comments --- .../apache/pinot/core/operator/combine/BaseCombineOperator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index 4ed1960e3c89..9238604796eb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -198,7 +198,7 @@ protected IntermediateResultsBlock mergeResults() while (numBlocksMerged < _numOperators) { // Timeout has reached, shouldn't continue to process. `_blockingQueue.poll` will continue to return blocks even // if negative timeout is provided; therefore an extra check is needed - if (endTimeMs - System.currentTimeMillis() < 0){ + if (endTimeMs - System.currentTimeMillis() < 0) { return generateTimeOutResult(numBlocksMerged); } IntermediateResultsBlock blockToMerge = From 1a04903dd58fb2a79b59e72022ffa06a992a0c9e Mon Sep 17 00:00:00 2001 From: Jia Guo Date: Sat, 27 Aug 2022 23:43:30 -0700 Subject: [PATCH 6/6] Address comments --- .../core/operator/combine/GroupByOrderByCombineOperator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java index 1480d6cc4d0b..afea001c1fb1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java @@ -58,7 +58,7 @@ @SuppressWarnings("rawtypes") public class GroupByOrderByCombineOperator extends BaseCombineOperator { public static final int MAX_TRIM_THRESHOLD = 1_000_000_000; - public static final int MAX_GROUP_BY_KEYS_PER_MERGE_CALL = 10_000; + public static final int MAX_GROUP_BY_KEYS_MERGED_PER_INTERRUPTION_CHECK = 10_000; private static final Logger LOGGER = LoggerFactory.getLogger(GroupByOrderByCombineOperator.class); @@ -208,7 +208,7 @@ protected void processSegments(int taskIndex) { // Check for thread interruption, every time after merging 10_000 keys private void checkMergePhaseInterruption(int mergedKeys) { - if (mergedKeys % MAX_GROUP_BY_KEYS_PER_MERGE_CALL == 0 && Thread.interrupted()) { + if (mergedKeys % MAX_GROUP_BY_KEYS_MERGED_PER_INTERRUPTION_CHECK == 0 && Thread.interrupted()) { throw new EarlyTerminationException(); } }