Skip to content

Commit

Permalink
Fix the bug of hybrid table request using the same request id (#9443)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang authored Sep 21, 2022
1 parent 0d14363 commit b6f3331
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,28 +190,47 @@ public Map<Long, String> getRunningQueries() {
Set<ServerInstance> getRunningServers(long requestId) {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
QueryServers queryServers = _queriesById.get(requestId);
return (queryServers == null) ? Collections.emptySet() : queryServers._servers;
if (queryServers == null) {
return Collections.emptySet();
}
Set<ServerInstance> runningServers = new HashSet<>();
if (queryServers._offlineRoutingTable != null) {
runningServers.addAll(queryServers._offlineRoutingTable.keySet());
}
if (queryServers._realtimeRoutingTable != null) {
runningServers.addAll(queryServers._realtimeRoutingTable.keySet());
}
return runningServers;
}

@Override
public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpConnectionManager connMgr,
public boolean cancelQuery(long requestId, int timeoutMs, Executor executor, HttpConnectionManager connMgr,
Map<String, Integer> serverResponses)
throws Exception {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
QueryServers queryServers = _queriesById.get(queryId);
QueryServers queryServers = _queriesById.get(requestId);
if (queryServers == null) {
return false;
}
String globalId = getGlobalQueryId(queryId);
List<String> serverUrls = new ArrayList<>();
for (ServerInstance server : queryServers._servers) {
serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(), globalId));
if (queryServers._offlineRoutingTable != null) {
for (ServerInstance server : queryServers._offlineRoutingTable.keySet()) {
serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(), getGlobalQueryId(requestId)));
}
}
if (queryServers._realtimeRoutingTable != null) {
// NOTE: When the query is sent to both OFFLINE and REALTIME table, the REALTIME one has negative request id to
// differentiate from the OFFLINE one
long realtimeRequestId = queryServers._offlineRoutingTable == null ? requestId : -requestId;
for (ServerInstance server : queryServers._realtimeRoutingTable.keySet()) {
serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(), getGlobalQueryId(realtimeRequestId)));
}
}
if (serverUrls.isEmpty()) {
LOGGER.debug("No servers running the query: {} right now", globalId);
LOGGER.debug("No servers running the query: {} right now", queryServers._query);
return true;
}
LOGGER.debug("Cancelling the query: {} via server urls: {}", globalId, serverUrls);
LOGGER.debug("Cancelling the query: {} via server urls: {}", queryServers._query, serverUrls);
CompletionService<DeleteMethod> completionService =
new MultiHttpRequest(executor, connMgr).execute(serverUrls, null, timeoutMs, "DELETE", DeleteMethod::new);
List<String> errMsgs = new ArrayList<>(serverUrls.size());
Expand All @@ -232,7 +251,7 @@ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor, HttpC
serverResponses.put(uri.getHost() + ":" + uri.getPort(), status);
}
} catch (Exception e) {
LOGGER.error("Failed to cancel query: {}", globalId, e);
LOGGER.error("Failed to cancel query: {}", queryServers._query, e);
// Can't just throw exception from here as there is a need to release the other connections.
// So just collect the error msg to throw them together after the for-loop.
errMsgs.add(e.getMessage());
Expand Down Expand Up @@ -271,16 +290,9 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
if (sql == null) {
throw new BadQueryRequestException("Failed to find 'sql' in the request: " + request);
}
try {
String query = sql.asText();
requestContext.setQuery(query);
return handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext);
} finally {
if (_queriesById != null) {
_queriesById.remove(requestId);
LOGGER.debug("Remove track of running query: {}", requestId);
}
}
String query = sql.asText();
requestContext.setQuery(query);
return handleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext);
}

private BrokerResponseNative handleRequest(long requestId, String query,
Expand Down Expand Up @@ -660,6 +672,7 @@ private BrokerResponseNative handleRequest(long requestId, String query,
realtimeRoutingTable = null;
}
}
BrokerResponseNative brokerResponse;
if (_queriesById != null) {
// Start to track the running query for cancellation just before sending it out to servers to avoid any potential
// failures that could happen before sending it out, like failures to calculate the routing table etc.
Expand All @@ -669,14 +682,22 @@ private BrokerResponseNative handleRequest(long requestId, String query,
// can always list the running queries and cancel query again until it ends. Just that such race
// condition makes cancel API less reliable. This should be rare as it assumes sending queries out to
// servers takes time, but will address later if needed.
QueryServers queryServers = _queriesById.computeIfAbsent(requestId, k -> new QueryServers(query));
_queriesById.put(requestId, new QueryServers(query, offlineRoutingTable, realtimeRoutingTable));
LOGGER.debug("Keep track of running query: {}", requestId);
queryServers.addServers(offlineRoutingTable, realtimeRoutingTable);
try {
brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest,
offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats,
requestContext);
} finally {
_queriesById.remove(requestId);
LOGGER.debug("Remove track of running query: {}", requestId);
}
} else {
brokerResponse =
processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, offlineRoutingTable,
realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, requestContext);
}
// TODO: Modify processBrokerRequest() to directly take PinotQuery
BrokerResponseNative brokerResponse =
processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, offlineRoutingTable,
realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, requestContext);

brokerResponse.setExceptions(exceptions);
brokerResponse.setNumSegmentsPrunedByBroker(numPrunedSegmentsTotal);
long executionEndTimeNs = System.nanoTime();
Expand Down Expand Up @@ -777,7 +798,7 @@ private void logBrokerResponse(long requestId, String query, RequestContext requ
Broker.DEFAULT_BROKER_REQUEST_CLIENT_IP_LOGGING);
String clientIp = CommonConstants.UNKNOWN;
if (enableClientIpLogging && requesterIdentity != null) {
clientIp = requesterIdentity.getClientIp();
clientIp = requesterIdentity.getClientIp();
}

// Please keep the format as name=value comma-separated with no spaces
Expand Down Expand Up @@ -1654,6 +1675,7 @@ private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo t

/**
* Processes the optimized broker requests for both OFFLINE and REALTIME table.
* TODO: Directly take PinotQuery
*/
protected abstract BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
Expand Down Expand Up @@ -1710,20 +1732,14 @@ public void setServerStats(String serverStats) {
*/
private static class QueryServers {
private final String _query;
private final Set<ServerInstance> _servers = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Map<ServerInstance, List<String>> _offlineRoutingTable;
private final Map<ServerInstance, List<String>> _realtimeRoutingTable;

public QueryServers(String query) {
public QueryServers(String query, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
@Nullable Map<ServerInstance, List<String>> realtimeRoutingTable) {
_query = query;
}

public void addServers(Map<ServerInstance, List<String>> offlineRoutingTable,
Map<ServerInstance, List<String>> realtimeRoutingTable) {
if (offlineRoutingTable != null) {
_servers.addAll(offlineRoutingTable.keySet());
}
if (realtimeRoutingTable != null) {
_servers.addAll(realtimeRoutingTable.keySet());
}
_offlineRoutingTable = offlineRoutingTable;
_realtimeRoutingTable = realtimeRoutingTable;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,25 @@ public synchronized void shutDown() {

@Override
protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance,
List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance,
List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext)
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
@Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest,
@Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
RequestContext requestContext)
throws Exception {
// TODO: Support failure detection
assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap = new HashMap<>();
if (offlineBrokerRequest != null) {
assert offlineRoutingTable != null;
sendRequest(TableType.OFFLINE, offlineBrokerRequest, offlineRoutingTable, responseMap,
sendRequest(requestId, TableType.OFFLINE, offlineBrokerRequest, offlineRoutingTable, responseMap,
requestContext.isSampledRequest());
}
if (realtimeBrokerRequest != null) {
assert realtimeRoutingTable != null;
sendRequest(TableType.REALTIME, realtimeBrokerRequest, realtimeRoutingTable, responseMap,
// NOTE: When both OFFLINE and REALTIME request exist, use negative request id for REALTIME to differentiate
// from the OFFLINE one
long realtimeRequestId = offlineBrokerRequest == null ? requestId : -requestId;
sendRequest(realtimeRequestId, TableType.REALTIME, realtimeBrokerRequest, realtimeRoutingTable, responseMap,
requestContext.isSampledRequest());
}
return _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, responseMap, timeoutMs,
Expand All @@ -109,7 +113,7 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
/**
* Query pinot server for data table.
*/
private void sendRequest(TableType tableType, BrokerRequest brokerRequest,
private void sendRequest(long requestId, TableType tableType, BrokerRequest brokerRequest,
Map<ServerInstance, List<String>> routingTable,
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap, boolean trace) {
for (Map.Entry<ServerInstance, List<String>> routingEntry : routingTable.entrySet()) {
Expand All @@ -119,8 +123,8 @@ private void sendRequest(TableType tableType, BrokerRequest brokerRequest,
int port = serverInstance.getGrpcPort();
// TODO: enable throttling on per host bases.
Iterator<Server.ServerResponse> streamingResponse = _streamingQueryClient.submit(serverHost, port,
new GrpcRequestBuilder().setSegments(segments).setBrokerRequest(brokerRequest).setEnableStreaming(true)
.setEnableTrace(trace));
new GrpcRequestBuilder().setRequestId(requestId).setBrokerId(_brokerId).setEnableTrace(trace)
.setEnableStreaming(true).setBrokerRequest(brokerRequest).setSegments(segments).build());
responseMap.put(serverInstance.toServerRoutingInstance(tableType, ServerInstance.RoutingType.GRPC),
streamingResponse);
}
Expand All @@ -134,9 +138,9 @@ public PinotStreamingQueryClient(GrpcConfig config) {
_config = config;
}

public Iterator<Server.ServerResponse> submit(String host, int port, GrpcRequestBuilder requestBuilder) {
public Iterator<Server.ServerResponse> submit(String host, int port, Server.ServerRequest serverRequest) {
GrpcQueryClient client = getOrCreateGrpcQueryClient(host, port);
return client.submit(requestBuilder.build());
return client.submit(serverRequest);
}

private GrpcQueryClient getOrCreateGrpcQueryClient(String host, int port) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@


public class GrpcRequestBuilder {
private int _requestId;
private long _requestId;
private String _brokerId = "unknown";
private boolean _enableTrace;
private boolean _enableStreaming;
Expand All @@ -42,7 +42,7 @@ public class GrpcRequestBuilder {
private BrokerRequest _brokerRequest;
private List<String> _segments;

public GrpcRequestBuilder setRequestId(int requestId) {
public GrpcRequestBuilder setRequestId(long requestId) {
_requestId = requestId;
return this;
}
Expand Down Expand Up @@ -84,7 +84,7 @@ public Server.ServerRequest build() {
"Query and segmentsToQuery must be set");

Map<String, String> metadata = new HashMap<>();
metadata.put(Request.MetadataKeys.REQUEST_ID, Integer.toString(_requestId));
metadata.put(Request.MetadataKeys.REQUEST_ID, Long.toString(_requestId));
metadata.put(Request.MetadataKeys.BROKER_ID, _brokerId);
metadata.put(Request.MetadataKeys.ENABLE_TRACE, Boolean.toString(_enableTrace));
metadata.put(Request.MetadataKeys.ENABLE_STREAMING, Boolean.toString(_enableStreaming));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ protected synchronized void aggregate(ServerRoutingInstance routingInstance, Dat
Map<String, String> metadata = dataTable.getMetadata();
// Reduce on trace info.
if (_enableTrace) {
_traceInfo.put(routingInstance.getHostname(), metadata.get(MetadataKey.TRACE_INFO.getName()));
_traceInfo.put(routingInstance.getShortName(), metadata.get(MetadataKey.TRACE_INFO.getName()));
}

// Reduce on exceptions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public abstract class QueryScheduler {
private static final String INVALID_NUM_RESIZES = "-1";
private static final String INVALID_RESIZE_TIME_MS = "-1";
private static final String QUERY_LOG_MAX_RATE_KEY = "query.log.maxRatePerSecond";
private static final String ENABLE_QUERY_CANCELLATION_KEY = "enable.query.cancellation";
private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
protected final ServerMetrics _serverMetrics;
protected final QueryExecutor _queryExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {

// Send error response
String hexString = requestBytes != null ? BytesUtils.toHexString(requestBytes) : "";
long reqestId = instanceRequest != null ? instanceRequest.getRequestId() : 0;
long requestId = instanceRequest != null ? instanceRequest.getRequestId() : 0;
LOGGER.error("Exception while processing instance request: {}", hexString, e);
sendErrorResponse(ctx, reqestId, tableNameWithType, queryArrivalTimeMs, DataTableFactory.getEmptyDataTable(), e);
sendErrorResponse(ctx, requestId, tableNameWithType, queryArrivalTimeMs, DataTableFactory.getEmptyDataTable(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,14 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName,
}
if (realtimeBrokerRequest != null) {
assert realtimeRoutingTable != null;
// NOTE: When both OFFLINE and REALTIME request exist, use negative request id for REALTIME to differentiate
// from the OFFLINE one
long realtimeRequestId = offlineBrokerRequest == null ? requestId : -requestId;
for (Map.Entry<ServerInstance, List<String>> entry : realtimeRoutingTable.entrySet()) {
ServerRoutingInstance serverRoutingInstance =
entry.getKey().toServerRoutingInstance(TableType.REALTIME, preferTls);
InstanceRequest instanceRequest = getInstanceRequest(requestId, realtimeBrokerRequest, entry.getValue());
InstanceRequest instanceRequest =
getInstanceRequest(realtimeRequestId, realtimeBrokerRequest, entry.getValue());
requestMap.put(serverRoutingInstance, instanceRequest);
}
}
Expand Down Expand Up @@ -167,7 +171,8 @@ public void shutDown() {

void receiveDataTable(ServerRoutingInstance serverRoutingInstance, DataTable dataTable, int responseSize,
int deserializationTimeMs) {
long requestId = Long.parseLong(dataTable.getMetadata().get(MetadataKey.REQUEST_ID.getName()));
// NOTE: For hybrid table, REALTIME request has negative request id
long requestId = Math.abs(Long.parseLong(dataTable.getMetadata().get(MetadataKey.REQUEST_ID.getName())));
AsyncQueryResponse asyncQueryResponse = _asyncQueryResponseMap.get(requestId);

// Query future might be null if the query is already done (maybe due to failure)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@ public void testBrokerDebugRoutingTableSQL()
Assert.assertNotNull(getDebugInfo("debug/routingTable/sql?query=" + encodedSQL));
}

@Test
public void testQueryTracing()
throws Exception {
JsonNode jsonNode = postQuery("SET trace = true; SELECT COUNT(*) FROM " + getTableName());
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asLong(), getCountStarResult());
Assert.assertTrue(jsonNode.get("exceptions").isEmpty());
JsonNode traceInfo = jsonNode.get("traceInfo");
Assert.assertEquals(traceInfo.size(), 2);
Assert.assertTrue(traceInfo.has("localhost_O"));
Assert.assertTrue(traceInfo.has("localhost_R"));
}

@Test
@Override
public void testHardcodedQueries()
Expand Down
54 changes: 28 additions & 26 deletions pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,36 @@
*/
public interface Tracer {

/**
* Registers the requestId on the current thread. This means the request will be traced.
* @param requestId the requestId
*/
void register(long requestId);
/**
* Registers the requestId on the current thread. This means the request will be traced.
* TODO: Consider using string id or random id. Currently different broker might send query with the same request id.
*
* @param requestId the requestId
*/
void register(long requestId);

/**
* Detach a trace from the current thread.
*/
void unregister();
/**
* Detach a trace from the current thread.
*/
void unregister();

/**
*
* @param clazz the enclosing context, e.g. Operator, PlanNode, BlockValSet...
* @return a new scope which MUST be closed on the current thread.
*/
InvocationScope createScope(Class<?> clazz);
/**
*
* @param clazz the enclosing context, e.g. Operator, PlanNode, BlockValSet...
* @return a new scope which MUST be closed on the current thread.
*/
InvocationScope createScope(Class<?> clazz);

/**
* Starts
* @return the request record
*/
default RequestScope createRequestScope() {
return new DefaultRequestContext();
}
/**
* Starts
* @return the request record
*/
default RequestScope createRequestScope() {
return new DefaultRequestContext();
}

/**
* @return the active recording
*/
InvocationRecording activeRecording();
/**
* @return the active recording
*/
InvocationRecording activeRecording();
}

0 comments on commit b6f3331

Please sign in to comment.