Skip to content

Commit

Permalink
cr
Browse files Browse the repository at this point in the history
  • Loading branch information
klsince committed Aug 24, 2022
1 parent baf2921 commit d4e2c4b
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
private final int _defaultHllLog2m;
private final boolean _enableQueryLimitOverride;
private final boolean _enableDistinctCountBitmapOverride;
private final Map<Long, QueryServers> _queriesById = new ConcurrentHashMap<>();
private final boolean _enableQueryCancellation;
private final Map<Long, QueryServers> _queriesById;

public BaseBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
Expand Down Expand Up @@ -164,13 +163,13 @@ public BaseBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager
Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
_numDroppedLog = new AtomicInteger(0);
_numDroppedLogRateLimiter = RateLimiter.create(1.0);
_enableQueryCancellation =
boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
_queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;
LOGGER.info(
"Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps, "
+ "enabling query cancellation: {}",
_brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate(),
_enableQueryCancellation);
+ "enabling query cancellation: {}", _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength,
_queryLogRateLimiter.getRate(), enableQueryCancellation);
}

private String getDefaultBrokerId() {
Expand All @@ -184,13 +183,13 @@ private String getDefaultBrokerId() {

@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on broker");
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query));
}

@VisibleForTesting
Set<ServerInstance> getRunningServers(long requestId) {
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on broker");
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
QueryServers queryServers = _queriesById.get(requestId);
return (queryServers == null) ? Collections.emptySet() : queryServers._servers;
}
Expand All @@ -199,7 +198,7 @@ Set<ServerInstance> getRunningServers(long requestId) {
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception {
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on broker");
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
QueryServers queryServers = _queriesById.get(queryId);
if (queryServers == null) {
return false;
Expand Down Expand Up @@ -278,7 +277,7 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
requestContext.setQuery(query);
return handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext);
} finally {
if (_enableQueryCancellation) {
if (_queriesById != null) {
_queriesById.remove(requestId);
LOGGER.debug("Remove track of running query: {}", requestId);
}
Expand Down Expand Up @@ -665,7 +664,7 @@ private BrokerResponseNative handleRequest(long requestId, String query,
realtimeRoutingTable = null;
}
}
if (_enableQueryCancellation) {
if (_queriesById != null) {
// Start to track the running query for cancellation just before sending it out to servers to avoid any potential
// failures that could happen before sending it out, like failures to calculate the routing table etc.
// TODO: Even tracking the query as late as here, a potential race condition between calling cancel API and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ public void runJob() {
IntermediateResultsBlock mergedBlock;
try {
mergedBlock = mergeResults();
} catch (InterruptedException ie) {
throw new QueryCancelledException("Cancelled while merging results blocks", ie);
} catch (InterruptedException e) {
throw new QueryCancelledException("Cancelled while merging results blocks", e);
} catch (Exception e) {
LOGGER.error("Caught exception while merging results blocks (query: {})", _queryContext, e);
mergedBlock = new IntermediateResultsBlock(QueryException.getException(QueryException.INTERNAL_ERROR, e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ protected IntermediateResultsBlock getNextBlock() {
return new MinMaxValueBasedSelectionOrderByCombineOperator(_operators, _queryContext, _executorService)
.getNextBlock();
} catch (QueryCancelledException e) {
throw new QueryCancelledException("Cancelled while running MinMaxValueBasedSelectionOrderByCombineOperator", e);
throw e;
} catch (Exception e) {
LOGGER.warn("Caught exception while using min/max value based combine, using the default combine", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,11 @@ private DataTable processQueryInternal(ServerQueryRequest queryRequest, Executor
LOGGER.info("Caught BadQueryRequestException while processing requestId: {}, {}", requestId, e.getMessage());
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
} else if (e instanceof QueryCancelledException) {
LOGGER.info("Cancelled while processing requestId: {}, {}", requestId, e.getMessage());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Cancelled while processing requestId: {}", requestId, e);
} else {
LOGGER.info("Cancelled while processing requestId: {}, {}", requestId, e.getMessage());
}
// NOTE most likely the onFailure() callback registered on query future in InstanceRequestHandler would
// return the error table to broker sooner than here. But in case of race condition, we construct the error
// table here too.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ public class InstanceRequestHandler extends SimpleChannelInboundHandler<ByteBuf>
private final QueryScheduler _queryScheduler;
private final ServerMetrics _serverMetrics;
private final AccessControl _accessControl;
private final boolean _enableQueryCancellation;
private final Map<String, Future<byte[]>> _queryFuturesById = new ConcurrentHashMap<>();
private final Map<String, Future<byte[]>> _queryFuturesById;

public InstanceRequestHandler(String instanceName, PinotConfiguration config, QueryScheduler queryScheduler,
ServerMetrics serverMetrics, AccessControl accessControl) {
Expand All @@ -91,10 +90,11 @@ public InstanceRequestHandler(String instanceName, PinotConfiguration config, Qu
} catch (TTransportException e) {
throw new RuntimeException("Failed to initialize Thrift Deserializer", e);
}
_enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_CANCELLATION));
if (_enableQueryCancellation) {
if (Boolean.parseBoolean(config.getProperty(CommonConstants.Server.CONFIG_OF_ENABLE_QUERY_CANCELLATION))) {
_queryFuturesById = new ConcurrentHashMap<>();
LOGGER.info("Enable query cancellation");
} else {
_queryFuturesById = null;
}
}

Expand Down Expand Up @@ -163,7 +163,7 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
void submitQuery(ServerQueryRequest queryRequest, ChannelHandlerContext ctx, String tableNameWithType,
long queryArrivalTimeMs, InstanceRequest instanceRequest) {
ListenableFuture<byte[]> future = _queryScheduler.submit(queryRequest);
if (_enableQueryCancellation) {
if (_queryFuturesById != null) {
String queryId = queryRequest.getQueryId();
// Track the running query for cancellation.
if (LOGGER.isDebugEnabled()) {
Expand All @@ -181,7 +181,7 @@ private FutureCallback<byte[]> createCallback(ChannelHandlerContext ctx, String
return new FutureCallback<byte[]>() {
@Override
public void onSuccess(@Nullable byte[] responseBytes) {
if (_enableQueryCancellation) {
if (_queryFuturesById != null) {
String queryId = queryRequest.getQueryId();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Remove track of running query: {} on success", queryId);
Expand All @@ -200,23 +200,28 @@ public void onSuccess(@Nullable byte[] responseBytes) {

@Override
public void onFailure(Throwable t) {
if (_enableQueryCancellation) {
if (_queryFuturesById != null) {
String queryId = queryRequest.getQueryId();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Remove track of running query: {} on failure", queryId);
}
_queryFuturesById.remove(queryId);
}
// Send exception response.
Exception ex = new Exception(t);
if (t instanceof CancellationException) {
LOGGER.info("Query: {} got cancelled", queryRequest.getQueryId());
ex = (Exception) t;
Exception e;
if (t instanceof Exception) {
e = (Exception) t;
if (e instanceof CancellationException) {
LOGGER.info("Query: {} got cancelled", queryRequest.getQueryId());
} else {
LOGGER.error("Exception while processing instance request", e);
}
} else {
LOGGER.error("Exception while processing instance request", t);
LOGGER.error("Error while processing instance request", t);
e = new Exception(t);
}
sendErrorResponse(ctx, instanceRequest.getRequestId(), tableNameWithType, queryArrivalTimeMs,
DataTableFactory.getEmptyDataTable(), ex);
DataTableFactory.getEmptyDataTable(), e);
}
};
}
Expand All @@ -239,7 +244,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
* @return true if a running query exists for the given queryId.
*/
public boolean cancelQuery(String queryId) {
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on server");
Preconditions.checkState(_queryFuturesById != null, "Query cancellation is not enabled on server");
// Keep the future as it'll be cleaned up by the thread executing the query.
Future<byte[]> future = _queryFuturesById.get(queryId);
if (future == null) {
Expand All @@ -259,7 +264,7 @@ public boolean cancelQuery(String queryId) {
* @return list of ids of the queries currently running on the server.
*/
public Set<String> getRunningQueryIds() {
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on server");
Preconditions.checkState(_queryFuturesById != null, "Query cancellation is not enabled on server");
return new HashSet<>(_queryFuturesById.keySet());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ public void testCancelMinMaxValueBasedSelectionOrderByCombineOperator() {
queryContext.setEndTimeMs(System.currentTimeMillis() + 10000);
SelectionOrderByCombineOperator combineOperator =
new SelectionOrderByCombineOperator(operators, queryContext, _executorService);
testCancelCombineOperator(combineOperator, ready,
"Cancelled while running MinMaxValueBasedSelectionOrderByCombineOperator");
testCancelCombineOperator(combineOperator, ready, "Cancelled while merging results blocks");
}

@Test
Expand Down

0 comments on commit d4e2c4b

Please sign in to comment.