diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 27f7f3220246..d51625adb601 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -190,28 +190,47 @@ public Map getRunningQueries() { Set getRunningServers(long requestId) { Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker"); QueryServers queryServers = _queriesById.get(requestId); - return (queryServers == null) ? Collections.emptySet() : queryServers._servers; + if (queryServers == null) { + return Collections.emptySet(); + } + Set runningServers = new HashSet<>(); + if (queryServers._offlineRoutingTable != null) { + runningServers.addAll(queryServers._offlineRoutingTable.keySet()); + } + if (queryServers._realtimeRoutingTable != null) { + runningServers.addAll(queryServers._realtimeRoutingTable.keySet()); + } + return runningServers; } @Override - public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpConnectionManager connMgr, + public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, HttpConnectionManager connMgr, Map serverResponses) throws Exception { Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker"); - QueryServers queryServers = _queriesById.get(queryId); + QueryServers queryServers = _queriesById.get(requestId); if (queryServers == null) { return false; } - String globalId = getGlobalQueryId(queryId); List serverUrls = new ArrayList<>(); - for (ServerInstance server : queryServers._servers) { - serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(), globalId)); + if (queryServers._offlineRoutingTable != null) { + for (ServerInstance server : queryServers._offlineRoutingTable.keySet()) { + serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(), getGlobalQueryId(requestId))); + } + } + if (queryServers._realtimeRoutingTable != null) { + // NOTE: When the query is sent to both OFFLINE and REALTIME table, the REALTIME one has negative request id to + // differentiate from the OFFLINE one + long realtimeRequestId = queryServers._offlineRoutingTable == null ? requestId : -requestId; + for (ServerInstance server : queryServers._realtimeRoutingTable.keySet()) { + serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(), getGlobalQueryId(realtimeRequestId))); + } } if (serverUrls.isEmpty()) { - LOGGER.debug("No servers running the query: {} right now", globalId); + LOGGER.debug("No servers running the query: {} right now", queryServers._query); return true; } - LOGGER.debug("Cancelling the query: {} via server urls: {}", globalId, serverUrls); + LOGGER.debug("Cancelling the query: {} via server urls: {}", queryServers._query, serverUrls); CompletionService completionService = new MultiHttpRequest(executor, connMgr).execute(serverUrls, null, timeoutMs, "DELETE", DeleteMethod::new); List errMsgs = new ArrayList<>(serverUrls.size()); @@ -232,7 +251,7 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC serverResponses.put(uri.getHost() + ":" + uri.getPort(), status); } } catch (Exception e) { - LOGGER.error("Failed to cancel query: {}", globalId, e); + LOGGER.error("Failed to cancel query: {}", queryServers._query, e); // Can't just throw exception from here as there is a need to release the other connections. // So just collect the error msg to throw them together after the for-loop. errMsgs.add(e.getMessage()); @@ -271,16 +290,9 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption if (sql == null) { throw new BadQueryRequestException("Failed to find 'sql' in the request: " + request); } - try { - String query = sql.asText(); - requestContext.setQuery(query); - return handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext); - } finally { - if (_queriesById != null) { - _queriesById.remove(requestId); - LOGGER.debug("Remove track of running query: {}", requestId); - } - } + String query = sql.asText(); + requestContext.setQuery(query); + return handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext); } private BrokerResponseNative handleRequest(long requestId, String query, @@ -660,6 +672,7 @@ private BrokerResponseNative handleRequest(long requestId, String query, realtimeRoutingTable = null; } } + BrokerResponseNative brokerResponse; if (_queriesById != null) { // Start to track the running query for cancellation just before sending it out to servers to avoid any potential // failures that could happen before sending it out, like failures to calculate the routing table etc. @@ -669,14 +682,22 @@ private BrokerResponseNative handleRequest(long requestId, String query, // can always list the running queries and cancel query again until it ends. Just that such race // condition makes cancel API less reliable. This should be rare as it assumes sending queries out to // servers takes time, but will address later if needed. - QueryServers queryServers = _queriesById.computeIfAbsent(requestId, k -> new QueryServers(query)); + _queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable)); LOGGER.debug("Keep track of running query: {}", requestId); - queryServers.addServers(offlineRoutingTable, realtimeRoutingTable); + try { + brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, + offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, + requestContext); + } finally { + _queriesById.remove(requestId); + LOGGER.debug("Remove track of running query: {}", requestId); + } + } else { + brokerResponse = + processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, offlineRoutingTable, + realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, requestContext); } - // TODO: Modify processBrokerRequest() to directly take PinotQuery - BrokerResponseNative brokerResponse = - processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, offlineRoutingTable, - realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, requestContext); + brokerResponse.setExceptions(exceptions); brokerResponse.setNumSegmentsPrunedByBroker(numPrunedSegmentsTotal); long executionEndTimeNs = System.nanoTime(); @@ -777,7 +798,7 @@ private void logBrokerResponse(long requestId, String query, RequestContext requ Broker.DEFAULT_BROKER_REQUEST_CLIENT_IP_LOGGING); String clientIp = CommonConstants.UNKNOWN; if (enableClientIpLogging && requesterIdentity != null) { - clientIp = requesterIdentity.getClientIp(); + clientIp = requesterIdentity.getClientIp(); } // Please keep the format as name=value comma-separated with no spaces @@ -1654,6 +1675,7 @@ private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo t /** * Processes the optimized broker requests for both OFFLINE and REALTIME table. + * TODO: Directly take PinotQuery */ protected abstract BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, @@ -1710,20 +1732,14 @@ public void setServerStats(String serverStats) { */ private static class QueryServers { private final String _query; - private final Set _servers = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Map> _offlineRoutingTable; + private final Map> _realtimeRoutingTable; - public QueryServers(String query) { + public QueryServers(String query, @Nullable Map> offlineRoutingTable, + @Nullable Map> realtimeRoutingTable) { _query = query; - } - - public void addServers(Map> offlineRoutingTable, - Map> realtimeRoutingTable) { - if (offlineRoutingTable != null) { - _servers.addAll(offlineRoutingTable.keySet()); - } - if (realtimeRoutingTable != null) { - _servers.addAll(realtimeRoutingTable.keySet()); - } + _offlineRoutingTable = offlineRoutingTable; + _realtimeRoutingTable = realtimeRoutingTable; } } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java index 9c30bacbe7d1..cbfb0f54b70d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java @@ -85,21 +85,25 @@ public synchronized void shutDown() { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, - BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) + BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, + @Nullable Map> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, + @Nullable Map> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, + RequestContext requestContext) throws Exception { // TODO: Support failure detection assert offlineBrokerRequest != null || realtimeBrokerRequest != null; Map> responseMap = new HashMap<>(); if (offlineBrokerRequest != null) { assert offlineRoutingTable != null; - sendRequest(TableType.OFFLINE, offlineBrokerRequest, offlineRoutingTable, responseMap, + sendRequest(requestId, TableType.OFFLINE, offlineBrokerRequest, offlineRoutingTable, responseMap, requestContext.isSampledRequest()); } if (realtimeBrokerRequest != null) { assert realtimeRoutingTable != null; - sendRequest(TableType.REALTIME, realtimeBrokerRequest, realtimeRoutingTable, responseMap, + // NOTE: When both OFFLINE and REALTIME request exist, use negative request id for REALTIME to differentiate + // from the OFFLINE one + long realtimeRequestId = offlineBrokerRequest == null ? requestId : -requestId; + sendRequest(realtimeRequestId, TableType.REALTIME, realtimeBrokerRequest, realtimeRoutingTable, responseMap, requestContext.isSampledRequest()); } return _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, responseMap, timeoutMs, @@ -109,7 +113,7 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques /** * Query pinot server for data table. */ - private void sendRequest(TableType tableType, BrokerRequest brokerRequest, + private void sendRequest(long requestId, TableType tableType, BrokerRequest brokerRequest, Map> routingTable, Map> responseMap, boolean trace) { for (Map.Entry> routingEntry : routingTable.entrySet()) { @@ -119,8 +123,8 @@ private void sendRequest(TableType tableType, BrokerRequest brokerRequest, int port = serverInstance.getGrpcPort(); // TODO: enable throttling on per host bases. Iterator streamingResponse = _streamingQueryClient.submit(serverHost, port, - new GrpcRequestBuilder().setSegments(segments).setBrokerRequest(brokerRequest).setEnableStreaming(true) - .setEnableTrace(trace)); + new GrpcRequestBuilder().setRequestId(requestId).setBrokerId(_brokerId).setEnableTrace(trace) + .setEnableStreaming(true).setBrokerRequest(brokerRequest).setSegments(segments).build()); responseMap.put(serverInstance.toServerRoutingInstance(tableType, ServerInstance.RoutingType.GRPC), streamingResponse); } @@ -134,9 +138,9 @@ public PinotStreamingQueryClient(GrpcConfig config) { _config = config; } - public Iterator submit(String host, int port, GrpcRequestBuilder requestBuilder) { + public Iterator submit(String host, int port, Server.ServerRequest serverRequest) { GrpcQueryClient client = getOrCreateGrpcQueryClient(host, port); - return client.submit(requestBuilder.build()); + return client.submit(serverRequest); } private GrpcQueryClient getOrCreateGrpcQueryClient(String host, int port) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java index 51280be87526..e782af1d219b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcRequestBuilder.java @@ -33,7 +33,7 @@ public class GrpcRequestBuilder { - private int _requestId; + private long _requestId; private String _brokerId = "unknown"; private boolean _enableTrace; private boolean _enableStreaming; @@ -42,7 +42,7 @@ public class GrpcRequestBuilder { private BrokerRequest _brokerRequest; private List _segments; - public GrpcRequestBuilder setRequestId(int requestId) { + public GrpcRequestBuilder setRequestId(long requestId) { _requestId = requestId; return this; } @@ -84,7 +84,7 @@ public Server.ServerRequest build() { "Query and segmentsToQuery must be set"); Map metadata = new HashMap<>(); - metadata.put(Request.MetadataKeys.REQUEST_ID, Integer.toString(_requestId)); + metadata.put(Request.MetadataKeys.REQUEST_ID, Long.toString(_requestId)); metadata.put(Request.MetadataKeys.BROKER_ID, _brokerId); metadata.put(Request.MetadataKeys.ENABLE_TRACE, Boolean.toString(_enableTrace)); metadata.put(Request.MetadataKeys.ENABLE_STREAMING, Boolean.toString(_enableStreaming)); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java index bbad1f985a38..620dce3b1a40 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BaseReduceService.java @@ -161,7 +161,7 @@ protected synchronized void aggregate(ServerRoutingInstance routingInstance, Dat Map metadata = dataTable.getMetadata(); // Reduce on trace info. if (_enableTrace) { - _traceInfo.put(routingInstance.getHostname(), metadata.get(MetadataKey.TRACE_INFO.getName())); + _traceInfo.put(routingInstance.getShortName(), metadata.get(MetadataKey.TRACE_INFO.getName())); } // Reduce on exceptions. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java index ce009513d5c9..c0f5211a1e55 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java @@ -62,7 +62,6 @@ public abstract class QueryScheduler { private static final String INVALID_NUM_RESIZES = "-1"; private static final String INVALID_RESIZE_TIME_MS = "-1"; private static final String QUERY_LOG_MAX_RATE_KEY = "query.log.maxRatePerSecond"; - private static final String ENABLE_QUERY_CANCELLATION_KEY = "enable.query.cancellation"; private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d; protected final ServerMetrics _serverMetrics; protected final QueryExecutor _queryExecutor; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java index 43e92640074f..0c2d124ccf9e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java @@ -152,9 +152,9 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) { // Send error response String hexString = requestBytes != null ? BytesUtils.toHexString(requestBytes) : ""; - long reqestId = instanceRequest != null ? instanceRequest.getRequestId() : 0; + long requestId = instanceRequest != null ? instanceRequest.getRequestId() : 0; LOGGER.error("Exception while processing instance request: {}", hexString, e); - sendErrorResponse(ctx, reqestId, tableNameWithType, queryArrivalTimeMs, DataTableFactory.getEmptyDataTable(), e); + sendErrorResponse(ctx, requestId, tableNameWithType, queryArrivalTimeMs, DataTableFactory.getEmptyDataTable(), e); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index b9d5e9a4a3b1..e5c06f3086c5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -102,10 +102,14 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName, } if (realtimeBrokerRequest != null) { assert realtimeRoutingTable != null; + // NOTE: When both OFFLINE and REALTIME request exist, use negative request id for REALTIME to differentiate + // from the OFFLINE one + long realtimeRequestId = offlineBrokerRequest == null ? requestId : -requestId; for (Map.Entry> entry : realtimeRoutingTable.entrySet()) { ServerRoutingInstance serverRoutingInstance = entry.getKey().toServerRoutingInstance(TableType.REALTIME, preferTls); - InstanceRequest instanceRequest = getInstanceRequest(requestId, realtimeBrokerRequest, entry.getValue()); + InstanceRequest instanceRequest = + getInstanceRequest(realtimeRequestId, realtimeBrokerRequest, entry.getValue()); requestMap.put(serverRoutingInstance, instanceRequest); } } @@ -167,7 +171,8 @@ public void shutDown() { void receiveDataTable(ServerRoutingInstance serverRoutingInstance, DataTable dataTable, int responseSize, int deserializationTimeMs) { - long requestId = Long.parseLong(dataTable.getMetadata().get(MetadataKey.REQUEST_ID.getName())); + // NOTE: For hybrid table, REALTIME request has negative request id + long requestId = Math.abs(Long.parseLong(dataTable.getMetadata().get(MetadataKey.REQUEST_ID.getName()))); AsyncQueryResponse asyncQueryResponse = _asyncQueryResponseMap.get(requestId); // Query future might be null if the query is already done (maybe due to failure) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java index f694ea56a723..795fb4dfdeaf 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java @@ -209,6 +209,18 @@ public void testBrokerDebugRoutingTableSQL() Assert.assertNotNull(getDebugInfo("debug/routingTable/sql?query=" + encodedSQL)); } + @Test + public void testQueryTracing() + throws Exception { + JsonNode jsonNode = postQuery("SET trace = true; SELECT COUNT(*) FROM " + getTableName()); + Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(), getCountStarResult()); + Assert.assertTrue(jsonNode.get("exceptions").isEmpty()); + JsonNode traceInfo = jsonNode.get("traceInfo"); + Assert.assertEquals(traceInfo.size(), 2); + Assert.assertTrue(traceInfo.has("localhost_O")); + Assert.assertTrue(traceInfo.has("localhost_R")); + } + @Test @Override public void testHardcodedQueries() diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java index bf07b6f828ec..8990a2a3b767 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java @@ -25,34 +25,36 @@ */ public interface Tracer { - /** - * Registers the requestId on the current thread. This means the request will be traced. - * @param requestId the requestId - */ - void register(long requestId); + /** + * Registers the requestId on the current thread. This means the request will be traced. + * TODO: Consider using string id or random id. Currently different broker might send query with the same request id. + * + * @param requestId the requestId + */ + void register(long requestId); - /** - * Detach a trace from the current thread. - */ - void unregister(); + /** + * Detach a trace from the current thread. + */ + void unregister(); - /** - * - * @param clazz the enclosing context, e.g. Operator, PlanNode, BlockValSet... - * @return a new scope which MUST be closed on the current thread. - */ - InvocationScope createScope(Class clazz); + /** + * + * @param clazz the enclosing context, e.g. Operator, PlanNode, BlockValSet... + * @return a new scope which MUST be closed on the current thread. + */ + InvocationScope createScope(Class clazz); - /** - * Starts - * @return the request record - */ - default RequestScope createRequestScope() { - return new DefaultRequestContext(); - } + /** + * Starts + * @return the request record + */ + default RequestScope createRequestScope() { + return new DefaultRequestContext(); + } - /** - * @return the active recording - */ - InvocationRecording activeRecording(); + /** + * @return the active recording + */ + InvocationRecording activeRecording(); }