diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java index 5c7ce3299fe2..e1ccd67b2f9b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java @@ -33,10 +33,14 @@ import io.swagger.annotations.SwaggerDefinition; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executor; import javax.inject.Inject; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.WebApplicationException; @@ -46,6 +50,7 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.apache.commons.httpclient.HttpConnectionManager; import org.apache.pinot.broker.api.HttpRequesterIdentity; import org.apache.pinot.broker.requesthandler.BrokerRequestHandler; import org.apache.pinot.common.exception.QueryException; @@ -84,6 +89,12 @@ public class PinotClientRequest { @Inject private BrokerMetrics _brokerMetrics; + @Inject + private Executor _executor; + + @Inject + private HttpConnectionManager _httpConnMgr; + @GET @ManagedAsync @Produces(MediaType.APPLICATION_JSON) @@ -141,6 +152,58 @@ public void processSqlQueryPost(String query, @Suspended AsyncResponse asyncResp } } + @DELETE + @Path("query/{queryId}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Cancel a query as identified by the queryId", notes = "No effect if no query exists for the " + + "given queryId on the requested broker. Query may continue to run for a short while after calling cancel as " + + "it's done in a non-blocking manner. The cancel method can be called multiple times.") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 404, message = "Query not found on the requested broker") + }) + public String cancelQuery( + @ApiParam(value = "QueryId as assigned by the broker", required = true) @PathParam("queryId") long queryId, + @ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs") + @DefaultValue("3000") int timeoutMs, + @ApiParam(value = "Return server responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false") + boolean verbose) { + try { + Map serverResponses = verbose ? new HashMap<>() : null; + if (_requestHandler.cancelQuery(queryId, timeoutMs, _executor, _httpConnMgr, serverResponses)) { + String resp = "Cancelled query: " + queryId; + if (verbose) { + resp += " with responses from servers: " + serverResponses; + } + return resp; + } + } catch (Exception e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(String.format("Failed to cancel query: %s on the broker due to error: %s", queryId, e.getMessage())) + .build()); + } + throw new WebApplicationException( + Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the broker", queryId)) + .build()); + } + + @GET + @Path("queries") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get queryIds of the running queries submitted via the requested broker", notes = "The id is " + + "assigned by the requested broker and only unique at the scope of this broker") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error") + }) + public Map getRunningQueries() { + try { + return _requestHandler.getRunningQueries(); + } catch (Exception e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity("Failed to get running queries on the broker due to error: " + e.getMessage()).build()); + } + } + private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterIdentity httpRequesterIdentity, boolean onlyDql) throws Exception { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java index fb8c4d6f0a99..3978b65891a8 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java @@ -18,11 +18,17 @@ */ package org.apache.pinot.broker.broker; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.swagger.jaxrs.config.BeanConfig; import java.io.IOException; import java.net.URL; import java.net.URLClassLoader; import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.commons.httpclient.HttpConnectionManager; +import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; import org.apache.pinot.broker.requesthandler.BrokerRequestHandler; import org.apache.pinot.broker.routing.BrokerRoutingManager; import org.apache.pinot.common.metrics.BrokerMetrics; @@ -57,9 +63,17 @@ public BrokerAdminApiApplication(BrokerRoutingManager routingManager, BrokerRequ if (brokerConf.getProperty(CommonConstants.Broker.BROKER_SERVICE_AUTO_DISCOVERY, false)) { register(ServiceAutoDiscoveryFeature.class); } + ExecutorService executor = + Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build()); + MultiThreadedHttpConnectionManager connMgr = new MultiThreadedHttpConnectionManager(); + connMgr.getParams().setConnectionTimeout((int) brokerConf + .getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS, + CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS)); register(new AbstractBinder() { @Override protected void configure() { + bind(connMgr).to(HttpConnectionManager.class); + bind(executor).to(Executor.class); bind(sqlQueryExecutor).to(SqlQueryExecutor.class); bind(routingManager).to(BrokerRoutingManager.class); bind(brokerRequestHandler).to(BrokerRequestHandler.class); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 6afa20c95dac..f5dba937baf5 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -26,18 +26,26 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.httpclient.HttpConnectionManager; +import org.apache.commons.httpclient.URI; +import org.apache.commons.httpclient.methods.DeleteMethod; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.broker.api.HttpRequesterIdentity; import org.apache.pinot.broker.api.RequesterIdentity; @@ -47,6 +55,7 @@ import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo; import org.apache.pinot.common.config.provider.TableCache; import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.http.MultiHttpRequest; import org.apache.pinot.common.metrics.BrokerGauge; import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; @@ -126,6 +135,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { private final int _defaultHllLog2m; private final boolean _enableQueryLimitOverride; private final boolean _enableDistinctCountBitmapOverride; + private final Map _queriesById = new ConcurrentHashMap<>(); + private final boolean _enableQueryCancellation; public BaseBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory, QueryQuotaManager queryQuotaManager, TableCache tableCache, @@ -154,9 +165,13 @@ public BaseBrokerRequestHandler(PinotConfiguration config, BrokerRoutingManager Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND)); _numDroppedLog = new AtomicInteger(0); _numDroppedLogRateLimiter = RateLimiter.create(1.0); + _enableQueryCancellation = + Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION)); LOGGER.info( - "Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps", - _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate()); + "Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps, " + + "enabling query cancellation: {}", + _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate(), + _enableQueryCancellation); } private String getDefaultBrokerId() { @@ -168,6 +183,74 @@ private String getDefaultBrokerId() { } } + @Override + public Map getRunningQueries() { + Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on broker"); + return _queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()._query)); + } + + @VisibleForTesting + Set getRunningServers(long requestId) { + Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on broker"); + QueryServers queryServers = _queriesById.get(requestId); + return (queryServers == null) ? Collections.emptySet() : queryServers._servers; + } + + @Override + public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpConnectionManager connMgr, + Map serverResponses) + throws Exception { + Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on broker"); + QueryServers queryServers = _queriesById.get(queryId); + if (queryServers == null) { + return false; + } + String globalId = getGlobalQueryId(queryId); + List serverUrls = new ArrayList<>(); + for (ServerInstance server : queryServers._servers) { + serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(), globalId)); + } + if (serverUrls.isEmpty()) { + LOGGER.debug("No servers running the query: {} right now", globalId); + return true; + } + LOGGER.debug("Cancelling the query: {} via server urls: {}", globalId, serverUrls); + CompletionService completionService = + new MultiHttpRequest(executor, connMgr).execute(serverUrls, null, timeoutMs, "DELETE", DeleteMethod::new); + List errMsgs = new ArrayList<>(serverUrls.size()); + for (int i = 0; i < serverUrls.size(); i++) { + DeleteMethod deleteMethod = null; + try { + // Wait for all requests to respond before returning to be sure that the servers have handled the cancel + // requests. The completion order is different from serverUrls, thus use uri in the response. + deleteMethod = completionService.take().get(); + URI uri = deleteMethod.getURI(); + int status = deleteMethod.getStatusCode(); + // Unexpected server responses are collected and returned as exception. + if (status != 200 && status != 404) { + throw new Exception(String.format("Unexpected status=%d and response='%s' from uri='%s'", status, + deleteMethod.getResponseBodyAsString(), uri)); + } + if (serverResponses != null) { + serverResponses.put(uri.getHost() + ":" + uri.getPort(), status); + } + } catch (Exception e) { + LOGGER.error("Failed to cancel query: {}", globalId, e); + // Can't just throw exception from here as there is a need to release the other connections. + // So just collect the error msg to throw them together after the for-loop. + errMsgs.add(e.getMessage()); + } finally { + if (deleteMethod != null) { + deleteMethod.releaseConnection(); + } + } + } + if (errMsgs.size() > 0) { + throw new Exception("Unexpected responses from servers: " + StringUtils.join(errMsgs, ",")); + } + return true; + } + @Override public BrokerResponseNative handleRequest(JsonNode request, @Nullable SqlNodeAndOptions sqlNodeAndOptions, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) @@ -191,9 +274,16 @@ public BrokerResponseNative handleRequest(JsonNode request, @Nullable SqlNodeAnd if (sql == null) { throw new BadQueryRequestException("Failed to find 'sql' in the request: " + request); } - String query = sql.asText(); - requestContext.setQuery(query); - return handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext); + try { + String query = sql.asText(); + requestContext.setQuery(query); + return handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext); + } finally { + if (_enableQueryCancellation) { + _queriesById.remove(requestId); + LOGGER.debug("Remove track of running query: {}", requestId); + } + } } private BrokerResponseNative handleRequest(long requestId, String query, @@ -576,6 +666,19 @@ private BrokerResponseNative handleRequest(long requestId, String query, realtimeRoutingTable = null; } } + if (_enableQueryCancellation) { + // Start to track the running query for cancellation just before sending it out to servers to avoid any potential + // failures that could happen before sending it out, like failures to calculate the routing table etc. + // TODO: Even tracking the query as late as here, a potential race condition between calling cancel API and + // query being sent out to servers can still happen. If cancel request arrives earlier than query being + // sent out to servers, the servers miss the cancel request and continue to run the queries. The users + // can always list the running queries and cancel query again until it ends. Just that such race + // condition makes cancel API less reliable. This should be rare as it assumes sending queries out to + // servers takes time, but will address later if needed. + QueryServers queryServers = _queriesById.computeIfAbsent(requestId, k -> new QueryServers(query)); + LOGGER.debug("Keep track of running query: {}", requestId); + queryServers.addServers(offlineRoutingTable, realtimeRoutingTable); + } // TODO: Modify processBrokerRequest() to directly take PinotQuery BrokerResponseNative brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, offlineRoutingTable, @@ -1650,6 +1753,10 @@ protected static void augmentStatistics(RequestContext statistics, BrokerRespons statistics.setNumRowsResultSet(response.getNumRowsResultSet()); } + private String getGlobalQueryId(long requestId) { + return _brokerId + "_" + requestId; + } + /** * Helper class to pass the per server statistics. */ @@ -1664,4 +1771,26 @@ public void setServerStats(String serverStats) { _serverStats = serverStats; } } + + /** + * Helper class to track the query plaintext and the requested servers. + */ + private static class QueryServers { + private final String _query; + private final Set _servers = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + public QueryServers(String query) { + _query = query; + } + + public void addServers(Map> offlineRoutingTable, + Map> realtimeRoutingTable) { + if (offlineRoutingTable != null) { + _servers.addAll(offlineRoutingTable.keySet()); + } + if (realtimeRoutingTable != null) { + _servers.addAll(realtimeRoutingTable.keySet()); + } + } + } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java index 799bfe829513..de42134a6a51 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java @@ -19,8 +19,11 @@ package org.apache.pinot.broker.requesthandler; import com.fasterxml.jackson.databind.JsonNode; +import java.util.Map; +import java.util.concurrent.Executor; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; +import org.apache.commons.httpclient.HttpConnectionManager; import org.apache.pinot.broker.api.RequesterIdentity; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.spi.trace.RequestContext; @@ -43,4 +46,20 @@ default BrokerResponseNative handleRequest(JsonNode request, @Nullable Requester throws Exception { return handleRequest(request, null, requesterIdentity, requestContext); } + + Map getRunningQueries(); + + /** + * Cancel a query as identified by the queryId. This method is non-blocking so the query may still run for a while + * after calling this method. This cancel method can be called multiple times. + * @param queryId the unique Id assigned to the query by the broker + * @param timeoutMs timeout to wait for servers to respond the cancel requests + * @param executor to send cancel requests to servers in parallel + * @param connMgr to provide the http connections + * @param serverResponses to collect cancel responses from all servers if a map is provided + * @return true if there is a running query for the given queryId. + */ + boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpConnectionManager connMgr, + Map serverResponses) + throws Exception; } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java index ba22f3f481fa..7a2a085273cb 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java @@ -20,7 +20,9 @@ import com.fasterxml.jackson.databind.JsonNode; import java.util.Map; +import java.util.concurrent.Executor; import javax.annotation.Nullable; +import org.apache.commons.httpclient.HttpConnectionManager; import org.apache.pinot.broker.api.RequesterIdentity; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.BrokerMeter; @@ -116,4 +118,21 @@ private boolean useMultiStageEngine(JsonNode request, SqlNodeAndOptions sqlNodeA } return false; } + + @Override + public Map getRunningQueries() { + // TODO: add support for multiStaged engine: track running queries for multiStaged engine and combine its + // running queries with those from singleStaged engine. Both engines share the same request Id generator, so + // the query will have unique ids across the two engines. + return _singleStageBrokerRequestHandler.getRunningQueries(); + } + + @Override + public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpConnectionManager connMgr, + Map serverResponses) + throws Exception { + // TODO: add support for multiStaged engine, basically try to cancel the query on multiStaged engine firstly; if + // not found, try on the singleStaged engine. + return _singleStageBrokerRequestHandler.cancelQuery(queryId, timeoutMs, executor, connMgr, serverResponses); + } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java index f4e09259ff0d..cff4eff1933f 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java @@ -18,20 +18,46 @@ */ package org.apache.pinot.broker.requesthandler; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import javax.annotation.Nullable; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.broker.broker.AllowAllAccessControlFactory; +import org.apache.pinot.broker.queryquota.QueryQuotaManager; +import org.apache.pinot.broker.routing.BrokerRoutingManager; import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.Expression; import org.apache.pinot.common.request.PinotQuery; +import org.apache.pinot.common.response.broker.BrokerResponseNative; +import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TenantConfig; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.exception.BadQueryRequestException; +import org.apache.pinot.spi.metrics.PinotMetricUtils; +import org.apache.pinot.spi.trace.RequestContext; +import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.apache.pinot.util.TestUtils; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -156,4 +182,74 @@ public void testGetActualTableNameAllowingDots() { Assert.assertEquals(BaseBrokerRequestHandler.getActualTableName("db.namespace.mytable", tableCache), "db.namespace.mytable"); } + + @Test + public void testCancelQuery() + throws Exception { + String tableName = "myTable_OFFLINE"; + // Mock pretty much everything until the query can be submitted. + TableCache tableCache = mock(TableCache.class); + TableConfig tableCfg = mock(TableConfig.class); + when(tableCache.getActualTableName(anyString())).thenReturn(tableName); + TenantConfig tenant = new TenantConfig("tier_BROKER", "tier_SERVER", null); + when(tableCfg.getTenantConfig()).thenReturn(tenant); + when(tableCache.getTableConfig(anyString())).thenReturn(tableCfg); + BrokerRoutingManager routingManager = mock(BrokerRoutingManager.class); + when(routingManager.routingExists(anyString())).thenReturn(true); + RoutingTable rt = mock(RoutingTable.class); + when(rt.getServerInstanceToSegmentsMap()).thenReturn(Collections + .singletonMap(new ServerInstance(new InstanceConfig("server01_9000")), Collections.singletonList("segment01"))); + when(routingManager.getRoutingTable(any())).thenReturn(rt); + QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class); + when(queryQuotaManager.acquire(anyString())).thenReturn(true); + CountDownLatch latch = new CountDownLatch(1); + PinotConfiguration config = + new PinotConfiguration(Collections.singletonMap("pinot.broker.enable.query.cancellation", "true")); + BaseBrokerRequestHandler requestHandler = + new BaseBrokerRequestHandler(config, routingManager, new AllowAllAccessControlFactory(), + queryQuotaManager, tableCache, + new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(), true, Collections.emptySet())) { + @Override + public void start() { + } + + @Override + public void shutDown() { + } + + @Override + protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, + BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, + @Nullable Map> offlineRoutingTable, + @Nullable BrokerRequest realtimeBrokerRequest, + @Nullable Map> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, + RequestContext requestContext) + throws Exception { + latch.await(); + return null; + } + }; + CompletableFuture.runAsync(() -> { + try { + JsonNode request = JsonUtils.stringToJsonNode( + String.format("{\"sql\":\"select * from %s limit 10\",\"queryOptions\":\"timeoutMs=10000\"}", tableName)); + RequestContext requestStats = Tracing.getTracer().createRequestScope(); + requestHandler.handleRequest(request, null, requestStats); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + TestUtils.waitForCondition((aVoid) -> requestHandler.getRunningServers(1).size() == 1, 500, 5000, + "Failed to submit query"); + Map.Entry entry = requestHandler.getRunningQueries().entrySet().iterator().next(); + Assert.assertEquals(entry.getKey().longValue(), 1); + Assert.assertTrue(entry.getValue().contains("select * from myTable_OFFLINE limit 10")); + Set servers = requestHandler.getRunningServers(1); + Assert.assertEquals(servers.size(), 1); + Assert.assertEquals(servers.iterator().next().getHostname(), "server01"); + Assert.assertEquals(servers.iterator().next().getPort(), 9000); + Assert.assertEquals(servers.iterator().next().getInstanceId(), "server01_9000"); + Assert.assertEquals(servers.iterator().next().getAdminEndpoint(), "http://server01:8097"); + latch.countDown(); + } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/http/MultiGetRequest.java b/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java similarity index 60% rename from pinot-common/src/main/java/org/apache/pinot/common/http/MultiGetRequest.java rename to pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java index 9f6e3f3158ee..f66c51d2e1de 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/http/MultiGetRequest.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java @@ -23,9 +23,11 @@ import java.util.concurrent.CompletionService; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; +import java.util.function.Function; import javax.annotation.Nullable; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpConnectionManager; +import org.apache.commons.httpclient.HttpMethodBase; import org.apache.commons.httpclient.methods.GetMethod; import org.apache.commons.httpclient.params.HttpClientParams; import org.slf4j.Logger; @@ -33,23 +35,21 @@ /** - * Class to support multiple http GET operations in parallel by using - * the executor that is passed in. + * Class to support multiple http operations in parallel by using the executor that is passed in. This is a wrapper + * around Apache common HTTP client. * - * This is a wrapper around Apache common HTTP client. + * The execute method is re-usable but there is no real benefit to it. All the connection management is handled by + * the input HttpConnectionManager. Note that we cannot use SimpleHttpConnectionManager as it is not thread safe. Use + * MultiThreadedHttpConnectionManager as shown in the example below. As GET is commonly used, there is a dedicated + * execute method for it. Other http methods like DELETE can use the generic version of execute method. * - * The execute method is re-usable but there is no real benefit to it. All - * the connection management is handled by the input HttpConnectionManager. - * Note that we cannot use SimpleHttpConnectionManager as it is not thread - * safe. Use MultiThreadedHttpConnectionManager as shown in the example - * below * Usage: *
  * {@code
  *    List urls = Arrays.asList("http://www.linkedin.com", "http://www.google.com");
- *    MultiGetRequest mget = new MultiGetRequest(Executors.newCachedThreadPool(),
+ *    MultiHttpRequest mhr = new MultiHttpRequest(Executors.newCachedThreadPool(),
  *           new MultiThreadedHttpConnectionManager());
- *    CompletionService completionService = mget.execute(urls);
+ *    CompletionService completionService = mhr.execute(urls, headers, timeoutMs);
  *    for (int i = 0; i < urls.size(); i++) {
  *      GetMethod getMethod = null;
  *      try {
@@ -72,8 +72,8 @@
  * }
  * 
*/ -public class MultiGetRequest { - private static final Logger LOGGER = LoggerFactory.getLogger(MultiGetRequest.class); +public class MultiHttpRequest { + private static final Logger LOGGER = LoggerFactory.getLogger(MultiHttpRequest.class); private final Executor _executor; // TODO: Verify that _connectionManager is an instaceOf MultithreadedHttpConnectionManager. @@ -84,7 +84,7 @@ public class MultiGetRequest { * @param executor executor service to use for making parallel requests * @param connectionManager http connection manager to use. */ - public MultiGetRequest(Executor executor, HttpConnectionManager connectionManager) { + public MultiHttpRequest(Executor executor, HttpConnectionManager connectionManager) { _executor = executor; _connectionManager = connectionManager; } @@ -99,24 +99,42 @@ public MultiGetRequest(Executor executor, HttpConnectionManager connectionManage */ public CompletionService execute(List urls, @Nullable Map requestHeaders, int timeoutMs) { + return execute(urls, requestHeaders, timeoutMs, "GET", GetMethod::new); + } + + /** + * Execute certain http method on the urls in parallel using the executor service. + * @param urls absolute URLs to execute the http method + * @param requestHeaders headers to set when making the request + * @param timeoutMs timeout in milliseconds for each http request + * @param httpMethodName the name of the http method like GET, DELETE etc. + * @param httpMethodSupplier a function to create a new http method object. + * @return instance of CompletionService. Completion service will provide + * results as they arrive. The order is NOT same as the order of URLs + */ + public CompletionService execute(List urls, + @Nullable Map requestHeaders, int timeoutMs, String httpMethodName, + Function httpMethodSupplier) { HttpClientParams clientParams = new HttpClientParams(); clientParams.setConnectionManagerTimeout(timeoutMs); HttpClient client = new HttpClient(clientParams, _connectionManager); - CompletionService completionService = new ExecutorCompletionService<>(_executor); + CompletionService completionService = new ExecutorCompletionService<>(_executor); for (String url : urls) { completionService.submit(() -> { try { - GetMethod getMethod = new GetMethod(url); + T httpMethod = httpMethodSupplier.apply(url); + // Explicitly cast type downwards to workaround a bug in jdk8: https://bugs.openjdk.org/browse/JDK-8056984 + HttpMethodBase httpMethodBase = httpMethod; if (requestHeaders != null) { - requestHeaders.forEach(getMethod::setRequestHeader); + requestHeaders.forEach((k, v) -> httpMethodBase.setRequestHeader(k, v)); } - getMethod.getParams().setSoTimeout(timeoutMs); - client.executeMethod(getMethod); - return getMethod; + httpMethodBase.getParams().setSoTimeout(timeoutMs); + client.executeMethod(httpMethodBase); + return httpMethod; } catch (Exception e) { // Log only exception type and message instead of the whole stack trace - LOGGER.warn("Caught '{}' while executing GET on URL: {}", e.toString(), url); + LOGGER.warn("Caught '{}' while executing: {} on URL: {}", e, httpMethodName, url); throw e; } }); diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java index 41e88b5cf0c7..c840674390b1 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java @@ -26,6 +26,7 @@ import org.apache.helix.model.InstanceConfig; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.spi.config.instance.Instance; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix; @@ -59,6 +60,31 @@ public static String getHelixInstanceId(Instance instance) { return prefix + instance.getHost() + "_" + instance.getPort(); } + public static String getServerAdminEndpoint(InstanceConfig instanceConfig) { + // Backward-compatible with legacy hostname of format 'Server_' + String hostname = instanceConfig.getHostName(); + if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) { + hostname = hostname.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH); + } + return getServerAdminEndpoint(instanceConfig, hostname, CommonConstants.HTTP_PROTOCOL); + } + + public static String getServerAdminEndpoint(InstanceConfig instanceConfig, String hostname, String defaultProtocol) { + String protocol = defaultProtocol; + int port = CommonConstants.Server.DEFAULT_ADMIN_API_PORT; + int adminPort = instanceConfig.getRecord().getIntField(Helix.Instance.ADMIN_PORT_KEY, -1); + int adminHttpsPort = instanceConfig.getRecord().getIntField(Helix.Instance.ADMIN_HTTPS_PORT_KEY, -1); + // NOTE: preference for insecure is sub-optimal, but required for incremental upgrade scenarios + if (adminPort > 0) { + protocol = CommonConstants.HTTP_PROTOCOL; + port = adminPort; + } else if (adminHttpsPort > 0) { + protocol = CommonConstants.HTTPS_PROTOCOL; + port = adminHttpsPort; + } + return String.format("%s://%s:%d", protocol, hostname, port); + } + /** * Returns the Helix InstanceConfig for the given instance. */ diff --git a/pinot-common/src/test/java/org/apache/pinot/common/http/MultiGetRequestTest.java b/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java similarity index 96% rename from pinot-common/src/test/java/org/apache/pinot/common/http/MultiGetRequestTest.java rename to pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java index 8cfd5a4cfe2b..95247eca13d2 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/http/MultiGetRequestTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java @@ -42,8 +42,8 @@ import org.testng.annotations.Test; -public class MultiGetRequestTest { - private static final Logger LOGGER = LoggerFactory.getLogger(MultiGetRequest.class); +public class MultiHttpRequestTest { + private static final Logger LOGGER = LoggerFactory.getLogger(MultiHttpRequest.class); private static final String SUCCESS_MSG = "success"; private static final String ERROR_MSG = "error"; private static final String TIMEOUT_MSG = "Timeout"; @@ -106,8 +106,8 @@ public void run() { @Test public void testMultiGet() { - MultiGetRequest mget = - new MultiGetRequest(Executors.newCachedThreadPool(), new MultiThreadedHttpConnectionManager()); + MultiHttpRequest mget = + new MultiHttpRequest(Executors.newCachedThreadPool(), new MultiThreadedHttpConnectionManager()); List urls = Arrays.asList("http://localhost:" + String.valueOf(_portStart) + URI_PATH, "http://localhost:" + String.valueOf(_portStart + 1) + URI_PATH, "http://localhost:" + String.valueOf(_portStart + 2) + URI_PATH, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index b52e86197d66..9f4a35eb3c3e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -207,8 +207,9 @@ public void init(PinotConfiguration pinotConfiguration) // Do not use this before the invocation of {@link PinotHelixResourceManager::start()}, which happens in {@link // ControllerStarter::start()} _helixResourceManager = new PinotHelixResourceManager(_config); + // This executor service is used to do async tasks from multiget util or table rebalancing. _executorService = - Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("restapi-multiget-thread-%d").build()); + Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build()); } // Initialize the table config tuner registry. diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 4eba9a8d5bfb..9d061d290a6e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -158,7 +158,6 @@ import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; -import org.apache.pinot.spi.utils.CommonConstants.Server; import org.apache.pinot.spi.utils.IngestionConfigUtils; import org.apache.pinot.spi.utils.InstanceTypeUtils; import org.apache.pinot.spi.utils.TimeUtils; @@ -226,28 +225,7 @@ public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullabl public String load(String instanceId) { InstanceConfig instanceConfig = getHelixInstanceConfig(instanceId); Preconditions.checkNotNull(instanceConfig, "Failed to find instance config for: %s", instanceId); - // Backward-compatible with legacy hostname of format 'Server_' - String hostname = instanceConfig.getHostName(); - if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) { - hostname = hostname.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH); - } - - String protocol = CommonConstants.HTTP_PROTOCOL; - int port = Server.DEFAULT_ADMIN_API_PORT; - - int adminPort = instanceConfig.getRecord().getIntField(Helix.Instance.ADMIN_PORT_KEY, -1); - int adminHttpsPort = instanceConfig.getRecord().getIntField(Helix.Instance.ADMIN_HTTPS_PORT_KEY, -1); - - // NOTE: preference for insecure is sub-optimal, but required for incremental upgrade scenarios - if (adminPort > 0) { - protocol = CommonConstants.HTTP_PROTOCOL; - port = adminPort; - } else if (adminHttpsPort > 0) { - protocol = CommonConstants.HTTPS_PROTOCOL; - port = adminHttpsPort; - } - - return String.format("%s://%s:%d", protocol, hostname, port); + return InstanceUtils.getServerAdminEndpoint(instanceConfig); } }); _tableUpdaterLocks = new Object[DEFAULT_TABLE_UPDATER_LOCKERS_SIZE]; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java index 4a1bbb2623a6..8ec3b282380f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java @@ -29,7 +29,7 @@ import org.apache.commons.httpclient.HttpConnectionManager; import org.apache.commons.httpclient.URI; import org.apache.commons.httpclient.methods.GetMethod; -import org.apache.pinot.common.http.MultiGetRequest; +import org.apache.pinot.common.http.MultiHttpRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,7 +79,7 @@ public CompletionServiceResponse doMultiGetRequest(List serverURLs, Stri // TODO: use some service other than completion service so that we know which server encounters the error CompletionService completionService = - new MultiGetRequest(_executor, _httpConnectionManager).execute(serverURLs, requestHeaders, timeoutMs); + new MultiHttpRequest(_executor, _httpConnectionManager).execute(serverURLs, requestHeaders, timeoutMs); for (int i = 0; i < serverURLs.size(); i++) { GetMethod getMethod = null; try { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java index 09bfa4ad2f0d..c272c7ffedbf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java @@ -87,6 +87,14 @@ public ServerQueryRequest(Server.ServerRequest serverRequest, ServerMetrics serv _timerContext = new TimerContext(_queryContext.getTableName(), serverMetrics, queryArrivalTimeMs); } + /** + * As _requestId can be same across brokers, so use _brokerId and _requestId together to uniquely identify a query. + * @return unique query Id within a pinot cluster. + */ + public String getQueryId() { + return _brokerId + "_" + _requestId; + } + public long getRequestId() { return _requestId; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java index 069a84337c45..f55cdb35f9bc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java @@ -19,12 +19,18 @@ package org.apache.pinot.core.query.scheduler; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.RateLimiter; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAccumulator; @@ -62,6 +68,7 @@ public abstract class QueryScheduler { private static final String INVALID_NUM_RESIZES = "-1"; private static final String INVALID_RESIZE_TIME_MS = "-1"; private static final String QUERY_LOG_MAX_RATE_KEY = "query.log.maxRatePerSecond"; + private static final String ENABLE_QUERY_CANCELLATION_KEY = "enable.query.cancellation"; private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d; protected final ServerMetrics _serverMetrics; protected final QueryExecutor _queryExecutor; @@ -70,8 +77,9 @@ public abstract class QueryScheduler { private final RateLimiter _queryLogRateLimiter; private final RateLimiter _numDroppedLogRateLimiter; private final AtomicInteger _numDroppedLogCounter; + private final boolean _enableQueryCancellation; protected volatile boolean _isRunning = false; - + private final Map> _queryFuturesById = new ConcurrentHashMap<>(); /** * Constructor to initialize QueryScheduler * @param queryExecutor QueryExecutor engine to use @@ -93,8 +101,12 @@ public QueryScheduler(PinotConfiguration config, QueryExecutor queryExecutor, Re _queryLogRateLimiter = RateLimiter.create(config.getProperty(QUERY_LOG_MAX_RATE_KEY, DEFAULT_QUERY_LOG_MAX_RATE)); _numDroppedLogRateLimiter = RateLimiter.create(1.0d); _numDroppedLogCounter = new AtomicInteger(0); - LOGGER.info("Query log max rate: {}", _queryLogRateLimiter.getRate()); + + _enableQueryCancellation = Boolean.parseBoolean(config.getProperty(ENABLE_QUERY_CANCELLATION_KEY)); + if (_enableQueryCancellation) { + LOGGER.info("Enable query cancellation"); + } } /** @@ -105,6 +117,76 @@ public QueryScheduler(PinotConfiguration config, QueryExecutor queryExecutor, Re */ public abstract ListenableFuture submit(ServerQueryRequest queryRequest); + /** + * Submit a query for execution and track runtime context about the query for things like cancellation. + * @param queryRequest query to schedule for execution + * @return Listenable future for query result representing serialized response. Custom callbacks can be added on + * the future to clean up the runtime context tracked during query execution. + */ + public ListenableFuture submitQuery(ServerQueryRequest queryRequest) { + ListenableFuture future = submit(queryRequest); + if (_enableQueryCancellation) { + String queryId = queryRequest.getQueryId(); + // Track the running query for cancellation. + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Keep track of running query: {}", queryId); + } + _queryFuturesById.put(queryId, future); + // And remove the track when the query ends. + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(@Nullable byte[] ignored) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Remove track of running query: {} on success", queryId); + } + _queryFuturesById.remove(queryId); + } + + @Override + public void onFailure(Throwable ignored) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Remove track of running query: {} on failure", queryId); + } + _queryFuturesById.remove(queryId); + } + }, MoreExecutors.directExecutor()); + } + return future; + } + + /** + * Cancel a query as identified by the queryId. This method is non-blocking and the query may still run for a while + * after calling this method. This method can be called multiple times. + * TODO: refine the errmsg when query is cancelled, instead of bubbling up the executor's CancellationException. + * + * @param queryId a unique Id to find the query + * @return true if a running query exists for the given queryId. + */ + public boolean cancelQuery(String queryId) { + Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on server"); + // Keep the future as it'll be cleaned up by the thread executing the query. + Future future = _queryFuturesById.get(queryId); + if (future == null) { + return false; + } + boolean done = future.isDone(); + if (!done) { + future.cancel(true); + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Cancelled query: {} that's done: {}", queryId, done); + } + return true; + } + + /** + * @return list of ids of the queries currently running on the server. + */ + public Set getRunningQueryIds() { + Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation is not enabled on server"); + return new HashSet<>(_queryFuturesById.keySet()); + } + /** * Query scheduler name for logging */ diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java index af841eea5b3e..31d0c8d536e3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java @@ -120,7 +120,7 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { tableNameWithType = queryRequest.getTableNameWithType(); // Submit query for execution and register callback for execution results. - Futures.addCallback(_queryScheduler.submit(queryRequest), + Futures.addCallback(_queryScheduler.submitQuery(queryRequest), createCallback(ctx, tableNameWithType, queryArrivalTimeMs, instanceRequest, queryRequest), MoreExecutors.directExecutor()); } catch (Exception e) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java index 149c1b055909..8809e4a015d2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java @@ -22,7 +22,9 @@ import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.utils.config.InstanceUtils; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix; @@ -43,6 +45,7 @@ public enum RoutingType { private final int _queryServicePort; private final int _queryMailboxPort; + private final String _adminEndpoint; /** * By default (auto joined instances), server instance name is of format: {@code Server__}, e.g. @@ -75,6 +78,7 @@ public ServerInstance(InstanceConfig instanceConfig) { INVALID_PORT); _queryMailboxPort = instanceConfig.getRecord().getIntField(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY, INVALID_PORT); + _adminEndpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig, _hostname, CommonConstants.HTTP_PROTOCOL); } @VisibleForTesting @@ -86,6 +90,7 @@ public ServerInstance(InstanceConfig instanceConfig) { _nettyTlsPort = INVALID_PORT; _queryServicePort = INVALID_PORT; _queryMailboxPort = INVALID_PORT; + _adminEndpoint = null; } public String getInstanceId() { @@ -100,6 +105,10 @@ public int getPort() { return _port; } + public String getAdminEndpoint() { + return _adminEndpoint; + } + public int getGrpcPort() { return _grpcPort; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerTest.java new file mode 100644 index 000000000000..b2bf14a1f2e8 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerTest.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.scheduler; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListenableFutureTask; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.LongAccumulator; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.core.query.executor.QueryExecutor; +import org.apache.pinot.core.query.request.ServerQueryRequest; +import org.apache.pinot.core.query.scheduler.resources.ResourceManager; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.testng.Assert; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class QuerySchedulerTest { + @Test + public void testCancelQuery() { + PinotConfiguration config = new PinotConfiguration(); + config.setProperty("enable.query.cancellation", "true"); + QueryScheduler qs = createQueryScheduler(config); + Set queryIds = new HashSet<>(); + queryIds.add("foo"); + queryIds.add("bar"); + queryIds.add("baz"); + for (String id : queryIds) { + ServerQueryRequest query = mock(ServerQueryRequest.class); + when(query.getQueryId()).thenReturn(id); + qs.submitQuery(query); + } + Assert.assertEquals(qs.getRunningQueryIds(), queryIds); + for (String id : queryIds) { + qs.cancelQuery(id); + } + Assert.assertTrue(qs.getRunningQueryIds().isEmpty()); + Assert.assertFalse(qs.cancelQuery("unknown")); + } + + private QueryScheduler createQueryScheduler(PinotConfiguration config) { + return new QueryScheduler(config, mock(QueryExecutor.class), mock(ResourceManager.class), mock(ServerMetrics.class), + new LongAccumulator(Long::max, 0)) { + @Override + public ListenableFuture submit(ServerQueryRequest queryRequest) { + // Create a FutureTask does nothing but waits to be cancelled and trigger callbacks. + return ListenableFutureTask.create(() -> null); + } + + @Override + public String name() { + return "noop"; + } + }; + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java index 01191ce79d89..618223cf394e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java @@ -70,7 +70,7 @@ private QueryServer getQueryServer(int responseDelayMs, byte[] responseBytes) { private QueryScheduler mockQueryScheduler(int responseDelayMs, byte[] responseBytes) { QueryScheduler queryScheduler = mock(QueryScheduler.class); - when(queryScheduler.submit(any())).thenAnswer(invocation -> { + when(queryScheduler.submitQuery(any())).thenAnswer(invocation -> { Thread.sleep(responseDelayMs); return Futures.immediateFuture(responseBytes); }); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java new file mode 100644 index 000000000000..b6c83ca94b00 --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.util.Set; +import javax.inject.Inject; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pinot.server.starter.ServerInstance; + +import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY; + + +/** + * API to cancel query running on the server, given a queryId. + */ +@Api(tags = "Query", authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY)}) +@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name = + HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY))) +@Path("/") +public class QueryResource { + @Inject + private ServerInstance _serverInstance; + + @DELETE + @Path("/query/{queryId}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Cancel a query running on the server as identified by the queryId", notes = "No effect if " + + "no query exists for the given queryId. Query may continue to run for a short while after calling cancel as " + + "it's done in a non-blocking manner. The cancel API can be called multiple times.") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 404, message = "Query not found running on the server") + }) + public String cancelQuery( + @ApiParam(value = "QueryId as in the format of _", required = true) @PathParam("queryId") + String queryId) { + try { + if (_serverInstance.getQueryScheduler().cancelQuery(queryId)) { + return "Cancelled query: " + queryId; + } + } catch (Exception e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(String.format("Failed to cancel query: %s on the server due to error: %s", queryId, e.getMessage())) + .build()); + } + throw new WebApplicationException( + Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not found on the server", queryId)) + .build()); + } + + @GET + @Path("/queries/id") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get queryIds of running queries on the server", notes = "QueryIds are in the format of " + + "_") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error") + }) + public Set getRunningQueryIds() { + try { + return _serverInstance.getQueryScheduler().getRunningQueryIds(); + } catch (Exception e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity("Failed to get queryIds of running queries on the server due to error: " + e.getMessage()).build()); + } + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java index 7ed0a57989f8..eff4a1cc0501 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java @@ -258,4 +258,8 @@ public InstanceDataManager getInstanceDataManager() { public long getLatestQueryTime() { return _latestQueryTime.get(); } + + public QueryScheduler getQueryScheduler() { + return _queryScheduler; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index c24702b0dceb..ec2719b47ebb 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -205,6 +205,7 @@ public static class Broker { public static final int DEFAULT_BROKER_QUERY_LOG_LENGTH = Integer.MAX_VALUE; public static final String CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = "pinot.broker.query.log.maxRatePerSecond"; + public static final String CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION = "pinot.broker.enable.query.cancellation"; public static final double DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = 10_000d; public static final String CONFIG_OF_BROKER_TIMEOUT_MS = "pinot.broker.timeoutMs"; public static final long DEFAULT_BROKER_TIMEOUT_MS = 10_000L;