Skip to content

Commit

Permalink
support to show running queries and cancel query by id (#9171)
Browse files Browse the repository at this point in the history
  • Loading branch information
klsince authored Aug 17, 2022
1 parent d41ec0b commit 3a655d2
Show file tree
Hide file tree
Showing 21 changed files with 702 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@
import io.swagger.annotations.SwaggerDefinition;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.inject.Inject;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
Expand All @@ -46,6 +50,7 @@
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.pinot.broker.api.HttpRequesterIdentity;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.common.exception.QueryException;
Expand Down Expand Up @@ -84,6 +89,12 @@ public class PinotClientRequest {
@Inject
private BrokerMetrics _brokerMetrics;

@Inject
private Executor _executor;

@Inject
private HttpConnectionManager _httpConnMgr;

@GET
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -141,6 +152,58 @@ public void processSqlQueryPost(String query, @Suspended AsyncResponse asyncResp
}
}

@DELETE
@Path("query/{queryId}")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Cancel a query as identified by the queryId", notes = "No effect if no query exists for the "
+ "given queryId on the requested broker. Query may continue to run for a short while after calling cancel as "
+ "it's done in a non-blocking manner. The cancel method can be called multiple times.")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 404, message = "Query not found on the requested broker")
})
public String cancelQuery(
@ApiParam(value = "QueryId as assigned by the broker", required = true) @PathParam("queryId") long queryId,
@ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs")
@DefaultValue("3000") int timeoutMs,
@ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false")
boolean verbose) {
try {
Map<String, Integer> serverResponses = verbose ? new HashMap<>() : null;
if (_requestHandler.cancelQuery(queryId, timeoutMs, _executor, _httpConnMgr, serverResponses)) {
String resp = "Cancelled query: " + queryId;
if (verbose) {
resp += " with responses from servers: " + serverResponses;
}
return resp;
}
} catch (Exception e) {
throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(String.format("Failed to cancel query: %s on the broker due to error: %s", queryId, e.getMessage()))
.build());
}
throw new WebApplicationException(
Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the broker", queryId))
.build());
}

@GET
@Path("queries")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get queryIds of the running queries submitted via the requested broker", notes = "The id is "
+ "assigned by the requested broker and only unique at the scope of this broker")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error")
})
public Map<Long, String> getRunningQueries() {
try {
return _requestHandler.getRunningQueries();
} catch (Exception e) {
throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity("Failed to get running queries on the broker due to error: " + e.getMessage()).build());
}
}

private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterIdentity httpRequesterIdentity,
boolean onlyDql)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@
*/
package org.apache.pinot.broker.broker;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.swagger.jaxrs.config.BeanConfig;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.metrics.BrokerMetrics;
Expand Down Expand Up @@ -57,9 +63,17 @@ public BrokerAdminApiApplication(BrokerRoutingManager routingManager, BrokerRequ
if (brokerConf.getProperty(CommonConstants.Broker.BROKER_SERVICE_AUTO_DISCOVERY, false)) {
register(ServiceAutoDiscoveryFeature.class);
}
ExecutorService executor =
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build());
MultiThreadedHttpConnectionManager connMgr = new MultiThreadedHttpConnectionManager();
connMgr.getParams().setConnectionTimeout((int) brokerConf
.getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS,
CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
register(new AbstractBinder() {
@Override
protected void configure() {
bind(connMgr).to(HttpConnectionManager.class);
bind(executor).to(Executor.class);
bind(sqlQueryExecutor).to(SqlQueryExecutor.class);
bind(routingManager).to(BrokerRoutingManager.class);
bind(brokerRequestHandler).to(BrokerRequestHandler.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,26 @@
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.httpclient.URI;
import org.apache.commons.httpclient.methods.DeleteMethod;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.broker.api.HttpRequesterIdentity;
import org.apache.pinot.broker.api.RequesterIdentity;
Expand All @@ -47,6 +55,7 @@
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.http.MultiHttpRequest;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
Expand Down Expand Up @@ -126,6 +135,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
private final int _defaultHllLog2m;
private final boolean _enableQueryLimitOverride;
private final boolean _enableDistinctCountBitmapOverride;
private final Map<Long, QueryServers> _queriesById = new ConcurrentHashMap<>();
private final boolean _enableQueryCancellation;

public BaseBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache,
Expand Down Expand Up @@ -154,9 +165,13 @@ public BaseBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager
Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
_numDroppedLog = new AtomicInteger(0);
_numDroppedLogRateLimiter = RateLimiter.create(1.0);
_enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
LOGGER.info(
"Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps",
_brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate());
"Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps, "
+ "enabling query cancellation: {}",
_brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate(),
_enableQueryCancellation);
}

private String getDefaultBrokerId() {
Expand All @@ -168,6 +183,74 @@ private String getDefaultBrokerId() {
}
}

@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on broker");
return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query));
}

@VisibleForTesting
Set<ServerInstance> getRunningServers(long requestId) {
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on broker");
QueryServers queryServers = _queriesById.get(requestId);
return (queryServers == null) ? Collections.emptySet() : queryServers._servers;
}

@Override
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception {
Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on broker");
QueryServers queryServers = _queriesById.get(queryId);
if (queryServers == null) {
return false;
}
String globalId = getGlobalQueryId(queryId);
List<String> serverUrls = new ArrayList<>();
for (ServerInstance server : queryServers._servers) {
serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(), globalId));
}
if (serverUrls.isEmpty()) {
LOGGER.debug("No servers running the query: {} right now", globalId);
return true;
}
LOGGER.debug("Cancelling the query: {} via server urls: {}", globalId, serverUrls);
CompletionService<DeleteMethod> completionService =
new MultiHttpRequest(executor, connMgr).execute(serverUrls, null, timeoutMs, "DELETE", DeleteMethod::new);
List<String> errMsgs = new ArrayList<>(serverUrls.size());
for (int i = 0; i < serverUrls.size(); i++) {
DeleteMethod deleteMethod = null;
try {
// Wait for all requests to respond before returning to be sure that the servers have handled the cancel
// requests. The completion order is different from serverUrls, thus use uri in the response.
deleteMethod = completionService.take().get();
URI uri = deleteMethod.getURI();
int status = deleteMethod.getStatusCode();
// Unexpected server responses are collected and returned as exception.
if (status != 200 && status != 404) {
throw new Exception(String.format("Unexpected status=%d and response='%s' from uri='%s'", status,
deleteMethod.getResponseBodyAsString(), uri));
}
if (serverResponses != null) {
serverResponses.put(uri.getHost() + ":" + uri.getPort(), status);
}
} catch (Exception e) {
LOGGER.error("Failed to cancel query: {}", globalId, e);
// Can't just throw exception from here as there is a need to release the other connections.
// So just collect the error msg to throw them together after the for-loop.
errMsgs.add(e.getMessage());
} finally {
if (deleteMethod != null) {
deleteMethod.releaseConnection();
}
}
}
if (errMsgs.size() > 0) {
throw new Exception("Unexpected responses from servers: " + StringUtils.join(errMsgs, ","));
}
return true;
}

@Override
public BrokerResponseNative handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions,
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
Expand All @@ -191,9 +274,16 @@ public BrokerResponseNative handleRequest(JsonNode request, @Nullable SqlNodeAnd
if (sql == null) {
throw new BadQueryRequestException("Failed to find 'sql' in the request: " + request);
}
String query = sql.asText();
requestContext.setQuery(query);
return handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext);
try {
String query = sql.asText();
requestContext.setQuery(query);
return handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext);
} finally {
if (_enableQueryCancellation) {
_queriesById.remove(requestId);
LOGGER.debug("Remove track of running query: {}", requestId);
}
}
}

private BrokerResponseNative handleRequest(long requestId, String query,
Expand Down Expand Up @@ -576,6 +666,19 @@ private BrokerResponseNative handleRequest(long requestId, String query,
realtimeRoutingTable = null;
}
}
if (_enableQueryCancellation) {
// Start to track the running query for cancellation just before sending it out to servers to avoid any potential
// failures that could happen before sending it out, like failures to calculate the routing table etc.
// TODO: Even tracking the query as late as here, a potential race condition between calling cancel API and
// query being sent out to servers can still happen. If cancel request arrives earlier than query being
// sent out to servers, the servers miss the cancel request and continue to run the queries. The users
// can always list the running queries and cancel query again until it ends. Just that such race
// condition makes cancel API less reliable. This should be rare as it assumes sending queries out to
// servers takes time, but will address later if needed.
QueryServers queryServers = _queriesById.computeIfAbsent(requestId, k -> new QueryServers(query));
LOGGER.debug("Keep track of running query: {}", requestId);
queryServers.addServers(offlineRoutingTable, realtimeRoutingTable);
}
// TODO: Modify processBrokerRequest() to directly take PinotQuery
BrokerResponseNative brokerResponse =
processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, offlineRoutingTable,
Expand Down Expand Up @@ -1650,6 +1753,10 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons
statistics.setNumRowsResultSet(response.getNumRowsResultSet());
}

private String getGlobalQueryId(long requestId) {
return _brokerId + "_" + requestId;
}

/**
* Helper class to pass the per server statistics.
*/
Expand All @@ -1664,4 +1771,26 @@ public void setServerStats(String serverStats) {
_serverStats = serverStats;
}
}

/**
* Helper class to track the query plaintext and the requested servers.
*/
private static class QueryServers {
private final String _query;
private final Set<ServerInstance> _servers = Collections.newSetFromMap(new ConcurrentHashMap<>());

public QueryServers(String query) {
_query = query;
}

public void addServers(Map<ServerInstance, List<String>> offlineRoutingTable,
Map<ServerInstance, List<String>> realtimeRoutingTable) {
if (offlineRoutingTable != null) {
_servers.addAll(offlineRoutingTable.keySet());
}
if (realtimeRoutingTable != null) {
_servers.addAll(realtimeRoutingTable.keySet());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
package org.apache.pinot.broker.requesthandler;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.Map;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.spi.trace.RequestContext;
Expand All @@ -43,4 +46,20 @@ default BrokerResponseNative handleRequest(JsonNode request, @Nullable Requester
throws Exception {
return handleRequest(request, null, requesterIdentity, requestContext);
}

Map<Long, String> getRunningQueries();

/**
* Cancel a query as identified by the queryId. This method is non-blocking so the query may still run for a while
* after calling this method. This cancel method can be called multiple times.
* @param queryId the unique Id assigned to the query by the broker
* @param timeoutMs timeout to wait for servers to respond the cancel requests
* @param executor to send cancel requests to servers in parallel
* @param connMgr to provide the http connections
* @param serverResponses to collect cancel responses from all servers if a map is provided
* @return true if there is a running query for the given queryId.
*/
boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception;
}
Loading

0 comments on commit 3a655d2

Please sign in to comment.