From 7d9dcdfd836a779c5791af1a154111fe5fb7c09a Mon Sep 17 00:00:00 2001 From: Xiaobing <61892277+klsince@users.noreply.github.com> Date: Wed, 24 Aug 2022 14:53:19 -0700 Subject: [PATCH] add query cancel APIs on controller to easily show running queries across the cluster and cancel them (#9276) --- .../api/resources/PinotClientRequest.java | 4 +- .../controller/api/resources/Constants.java | 1 + .../resources/PinotRunningQueryResource.java | 228 ++++++++++++++++++ 3 files changed, 231 insertions(+), 2 deletions(-) create mode 100644 pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java index 739dfe24ec58..64d890e0ab82 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java @@ -190,8 +190,8 @@ public String cancelQuery( @GET @Path("queries") @Produces(MediaType.APPLICATION_JSON) - @ApiOperation(value = "Get queryIds of the running queries submitted via the requested broker", notes = "The id is " - + "assigned by the requested broker and only unique at the scope of this broker") + @ApiOperation(value = "Get running queries submitted via the requested broker", notes = "The id is assigned by the " + + "requested broker and only unique at the scope of this broker") @ApiResponses(value = { @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error") }) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java index 513b12f9d395..56d1de5dfd31 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java @@ -39,6 +39,7 @@ private Constants() { public static final String VERSION_TAG = "Version"; public static final String HEALTH_TAG = "Health"; public static final String INSTANCE_TAG = "Instance"; + public static final String QUERY_TAG = "Query"; public static final String SCHEMA_TAG = "Schema"; public static final String TENANT_TAG = "Tenant"; public static final String BROKER_TAG = "Broker"; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java new file mode 100644 index 000000000000..4b977c08f79b --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRunningQueryResource.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiKeyAuthDefinition; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import io.swagger.annotations.Authorization; +import io.swagger.annotations.SecurityDefinition; +import io.swagger.annotations.SwaggerDefinition; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionService; +import java.util.concurrent.Executor; +import javax.inject.Inject; +import javax.ws.rs.DELETE; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpConnectionManager; +import org.apache.commons.httpclient.URI; +import org.apache.commons.httpclient.methods.DeleteMethod; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.params.HttpClientParams; +import org.apache.commons.lang3.StringUtils; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.http.MultiHttpRequest; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY; + + +@Api(tags = Constants.QUERY_TAG, authorizations = {@Authorization(value = SWAGGER_AUTHORIZATION_KEY)}) +@SwaggerDefinition(securityDefinition = @SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name = + HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY))) +@Path("/") +public class PinotRunningQueryResource { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotRunningQueryResource.class); + + @Inject + PinotHelixResourceManager _pinotHelixResourceManager; + + @Inject + ControllerConf _controllerConf; + + @Inject + private Executor _executor; + + @Inject + private HttpConnectionManager _httpConnMgr; + + @DELETE + @Path("query/{brokerId}/{queryId}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Cancel a query as identified by the queryId", notes = "No effect if no query exists for the " + + "given queryId on the requested broker. Query may continue to run for a short while after calling cancel as " + + "it's done in a non-blocking manner. The cancel method can be called multiple times.") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error"), + @ApiResponse(code = 404, message = "Query not found on the requested broker") + }) + public String cancelQuery( + @ApiParam(value = "Broker that's running the query", required = true) @PathParam("brokerId") String brokerId, + @ApiParam(value = "QueryId as assigned by the broker", required = true) @PathParam("queryId") long queryId, + @ApiParam(value = "Timeout for servers to respond the cancel request") @QueryParam("timeoutMs") + @DefaultValue("3000") int timeoutMs, + @ApiParam(value = "Return verbose responses for troubleshooting") @QueryParam("verbose") @DefaultValue("false") + boolean verbose, @Context HttpHeaders httpHeaders) { + InstanceConfig broker = _pinotHelixResourceManager.getHelixInstanceConfig(brokerId); + if (broker == null) { + throw new WebApplicationException( + Response.status(Response.Status.BAD_REQUEST).entity("Unknown broker: " + brokerId).build()); + } + try { + HttpClientParams clientParams = new HttpClientParams(); + clientParams.setConnectionManagerTimeout(timeoutMs); + HttpClient client = new HttpClient(clientParams, _httpConnMgr); + String protocol = _controllerConf.getControllerBrokerProtocol(); + int portOverride = _controllerConf.getControllerBrokerPortOverride(); + int port = portOverride > 0 ? portOverride : Integer.parseInt(broker.getPort()); + DeleteMethod deleteMethod = new DeleteMethod( + String.format("%s://%s:%d/query/%d?verbose=%b", protocol, broker.getHostName(), port, queryId, verbose)); + try { + Map requestHeaders = createRequestHeaders(httpHeaders); + requestHeaders.forEach(deleteMethod::setRequestHeader); + client.executeMethod(deleteMethod); + int status = deleteMethod.getStatusCode(); + if (status == 200) { + return deleteMethod.getResponseBodyAsString(); + } + if (status == 404) { + throw new WebApplicationException(Response.status(Response.Status.NOT_FOUND) + .entity(String.format("Query: %s not found on the broker: %s", queryId, brokerId)).build()); + } + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(String + .format("Failed to cancel query: %s on the broker: %s with unexpected status=%d and resp='%s'", queryId, + brokerId, status, deleteMethod.getResponseBodyAsString())).build()); + } finally { + deleteMethod.releaseConnection(); + } + } catch (WebApplicationException e) { + throw e; + } catch (Exception e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(String + .format("Failed to cancel query: %s on the broker: %s due to error: %s", queryId, brokerId, e.getMessage())) + .build()); + } + } + + @GET + @Path("/queries") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get running queries from all brokers", notes = "The queries are returned with brokers " + + "running them") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500, message = "Internal server error") + }) + public Map> getRunningQueries( + @ApiParam(value = "Timeout for brokers to return running queries") @QueryParam("timeoutMs") @DefaultValue("3000") + int timeoutMs, @Context HttpHeaders httpHeaders) { + try { + Map> tableBrokers = _pinotHelixResourceManager.getTableToLiveBrokersMapping(); + Map brokers = new HashMap<>(); + tableBrokers.values().forEach(list -> list.forEach(info -> brokers.putIfAbsent(getInstanceKey(info), info))); + return getRunningQueries(brokers, timeoutMs, createRequestHeaders(httpHeaders)); + } catch (Exception e) { + throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity("Failed to get running queries due to error: " + e.getMessage()).build()); + } + } + + private Map> getRunningQueries(Map brokers, int timeoutMs, + Map requestHeaders) + throws Exception { + String protocol = _controllerConf.getControllerBrokerProtocol(); + int portOverride = _controllerConf.getControllerBrokerPortOverride(); + List brokerUrls = new ArrayList<>(); + for (InstanceInfo broker : brokers.values()) { + int port = portOverride > 0 ? portOverride : broker.getPort(); + brokerUrls.add(String.format("%s://%s:%d/queries", protocol, broker.getHost(), port)); + } + LOGGER.debug("Getting running queries via broker urls: {}", brokerUrls); + CompletionService completionService = + new MultiHttpRequest(_executor, _httpConnMgr).execute(brokerUrls, requestHeaders, timeoutMs); + Map> queriesByBroker = new HashMap<>(); + List errMsgs = new ArrayList<>(brokerUrls.size()); + for (int i = 0; i < brokerUrls.size(); i++) { + GetMethod getMethod = null; + try { + // The completion order is different from brokerUrls, thus use uri in the response. + getMethod = completionService.take().get(); + URI uri = getMethod.getURI(); + int status = getMethod.getStatusCode(); + // Unexpected server responses are collected and returned as exception. + if (status != 200) { + throw new Exception(String.format("Unexpected status=%d and response='%s' from uri='%s'", status, + getMethod.getResponseBodyAsString(), uri)); + } + queriesByBroker.put(brokers.get(getInstanceKey(uri)).getInstanceName(), + JsonUtils.stringToObject(getMethod.getResponseBodyAsString(), Map.class)); + } catch (Exception e) { + LOGGER.error("Failed to get queries", e); + // Can't just throw exception from here as there is a need to release the other connections. + // So just collect the error msg to throw them together after the for-loop. + errMsgs.add(e.getMessage()); + } finally { + if (getMethod != null) { + getMethod.releaseConnection(); + } + } + } + if (errMsgs.size() > 0) { + throw new Exception("Unexpected responses from brokers: " + StringUtils.join(errMsgs, ",")); + } + return queriesByBroker; + } + + private static String getInstanceKey(InstanceInfo info) { + return info.getHost() + ":" + info.getPort(); + } + + private static String getInstanceKey(URI uri) + throws Exception { + return uri.getHost() + ":" + uri.getPort(); + } + + private static Map createRequestHeaders(HttpHeaders httpHeaders) { + Map requestHeaders = new HashMap<>(); + httpHeaders.getRequestHeaders().keySet().forEach(header -> { + requestHeaders.put(header, httpHeaders.getHeaderString(header)); + }); + return requestHeaders; + } +}