-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add broker API to run a query on both query engines and compare results #13746
Changes from all commits
3e96471
e286e95
cbf303a
f40583d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,9 +33,12 @@ | |
import io.swagger.annotations.Authorization; | ||
import io.swagger.annotations.SecurityDefinition; | ||
import io.swagger.annotations.SwaggerDefinition; | ||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.Executor; | ||
import javax.inject.Inject; | ||
import javax.ws.rs.DELETE; | ||
|
@@ -64,12 +67,16 @@ | |
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse; | ||
import org.apache.pinot.common.response.broker.BrokerResponseNative; | ||
import org.apache.pinot.common.response.broker.QueryProcessingException; | ||
import org.apache.pinot.common.utils.DataSchema; | ||
import org.apache.pinot.common.utils.request.RequestUtils; | ||
import org.apache.pinot.core.auth.Actions; | ||
import org.apache.pinot.core.auth.Authorize; | ||
import org.apache.pinot.core.auth.ManualAuthorization; | ||
import org.apache.pinot.core.auth.TargetType; | ||
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor; | ||
import org.apache.pinot.core.query.request.context.QueryContext; | ||
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; | ||
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils; | ||
import org.apache.pinot.spi.trace.RequestContext; | ||
import org.apache.pinot.spi.trace.RequestScope; | ||
import org.apache.pinot.spi.trace.Tracing; | ||
|
@@ -280,6 +287,83 @@ public void processTimeSeriesInstantQuery(@Suspended AsyncResponse asyncResponse | |
asyncResponse.resume(Response.ok().entity("{}").build()); | ||
} | ||
|
||
@POST | ||
@Produces(MediaType.APPLICATION_JSON) | ||
@Path("query/compare") | ||
@ApiOperation(value = "Query Pinot using both the single-stage query engine and the multi-stage query engine and " | ||
+ "compare the results. The 'sql' field should be set in the request JSON to run the same query on both the " | ||
+ "query engines. Set '" + Request.SQL_V1 + "' and '" + Request.SQL_V2 + "' if the query needs to be adapted for " | ||
+ "the two query engines.") | ||
@ApiResponses(value = { | ||
@ApiResponse(code = 200, message = "Query result comparison response"), | ||
@ApiResponse(code = 500, message = "Internal Server Error") | ||
}) | ||
@ManualAuthorization | ||
public void processSqlQueryWithBothEnginesAndCompareResults(String query, @Suspended AsyncResponse asyncResponse, | ||
@Context org.glassfish.grizzly.http.server.Request requestContext, | ||
@Context HttpHeaders httpHeaders) { | ||
try { | ||
JsonNode requestJson = JsonUtils.stringToJsonNode(query); | ||
String v1Query; | ||
String v2Query; | ||
|
||
if (!requestJson.has(Request.SQL)) { | ||
if (!requestJson.has(Request.SQL_V1) || !requestJson.has(Request.SQL_V2)) { | ||
throw new IllegalStateException("Payload should either contain the query string field '" + Request.SQL + "' " | ||
+ "or both of '" + Request.SQL_V1 + "' and '" + Request.SQL_V2 + "'"); | ||
} else { | ||
v1Query = requestJson.get(Request.SQL_V1).asText(); | ||
v2Query = requestJson.get(Request.SQL_V2).asText(); | ||
} | ||
} else { | ||
v1Query = requestJson.has(Request.SQL_V1) ? requestJson.get(Request.SQL_V1).asText() | ||
: requestJson.get(Request.SQL).asText(); | ||
v2Query = requestJson.has(Request.SQL_V2) ? requestJson.get(Request.SQL_V2).asText() | ||
: requestJson.get(Request.SQL).asText(); | ||
} | ||
|
||
ObjectNode v1RequestJson = requestJson.deepCopy(); | ||
v1RequestJson.put(Request.SQL, v1Query); | ||
CompletableFuture<BrokerResponse> v1Response = CompletableFuture.supplyAsync( | ||
() -> { | ||
try { | ||
return executeSqlQuery(v1RequestJson, makeHttpIdentity(requestContext), true, httpHeaders, false); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
}, | ||
_executor | ||
); | ||
|
||
ObjectNode v2RequestJson = requestJson.deepCopy(); | ||
v2RequestJson.put(Request.SQL, v2Query); | ||
CompletableFuture<BrokerResponse> v2Response = CompletableFuture.supplyAsync( | ||
() -> { | ||
try { | ||
return executeSqlQuery(v2RequestJson, makeHttpIdentity(requestContext), true, httpHeaders, true); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
}, | ||
_executor | ||
); | ||
|
||
CompletableFuture.allOf(v1Response, v2Response).join(); | ||
|
||
asyncResponse.resume(getPinotQueryComparisonResponse(v1Query, v1Response.get(), v2Response.get())); | ||
} catch (WebApplicationException wae) { | ||
asyncResponse.resume(wae); | ||
} catch (Exception e) { | ||
LOGGER.error("Caught exception while processing request", e); | ||
asyncResponse.resume( | ||
new WebApplicationException(e, | ||
Response | ||
.status(Response.Status.INTERNAL_SERVER_ERROR) | ||
.entity(e.getMessage()) | ||
.build())); | ||
} | ||
} | ||
|
||
@DELETE | ||
@Path("query/{queryId}") | ||
@Authorize(targetType = TargetType.CLUSTER, action = Actions.Cluster.CANCEL_QUERY) | ||
|
@@ -356,7 +440,7 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI | |
PinotSqlType sqlType = sqlNodeAndOptions.getSqlType(); | ||
if (onlyDql && sqlType != PinotSqlType.DQL) { | ||
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, | ||
new UnsupportedOperationException("Unsupported SQL type - " + sqlType + ", GET API only supports DQL."))); | ||
new UnsupportedOperationException("Unsupported SQL type - " + sqlType + ", this API only supports DQL."))); | ||
} | ||
switch (sqlType) { | ||
case DQL: | ||
|
@@ -428,4 +512,92 @@ static Response getPinotQueryResponse(BrokerResponse brokerResponse) | |
.entity((StreamingOutput) brokerResponse::toOutputStream).type(MediaType.APPLICATION_JSON) | ||
.build(); | ||
} | ||
|
||
@VisibleForTesting | ||
static Response getPinotQueryComparisonResponse(String query, BrokerResponse v1Response, BrokerResponse v2Response) { | ||
ObjectNode response = JsonUtils.newObjectNode(); | ||
response.set("v1Response", JsonUtils.objectToJsonNode(v1Response)); | ||
response.set("v2Response", JsonUtils.objectToJsonNode(v2Response)); | ||
response.set("comparisonAnalysis", JsonUtils.objectToJsonNode( | ||
analyzeQueryResultDifferences(query, v1Response, v2Response))); | ||
|
||
return Response.ok() | ||
.header(PINOT_QUERY_ERROR_CODE_HEADER, -1) | ||
.entity(response).type(MediaType.APPLICATION_JSON) | ||
.build(); | ||
} | ||
|
||
/** | ||
* Given a query and the responses from the single-stage and multi-stage query engines, analyzes the differences | ||
* between the responses and returns a list of differences. Currently, the method only compares the column names, | ||
* column types, number of rows in the result set, and the aggregation values for aggregation-only queries. | ||
* | ||
* TODO: Add more comparison logic for different query types. This would require handling edge cases with group | ||
* trimming, non-deterministic results for order by queries with limits etc. | ||
*/ | ||
private static List<String> analyzeQueryResultDifferences(String query, BrokerResponse v1Response, | ||
BrokerResponse v2Response) { | ||
List<String> differences = new ArrayList<>(); | ||
|
||
if (v1Response.getExceptionsSize() != 0 || v2Response.getExceptionsSize() != 0) { | ||
differences.add("Exception encountered while running the query on one or both query engines"); | ||
return differences; | ||
} | ||
|
||
if (v1Response.getResultTable() == null && v2Response.getResultTable() == null) { | ||
return differences; | ||
} | ||
|
||
if (v1Response.getResultTable() == null) { | ||
differences.add("v1 response has an empty result table"); | ||
return differences; | ||
} | ||
|
||
if (v2Response.getResultTable() == null) { | ||
differences.add("v2 response has an empty result table"); | ||
return differences; | ||
} | ||
|
||
DataSchema.ColumnDataType[] v1ResponseTypes = v1Response.getResultTable().getDataSchema().getColumnDataTypes(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you want to recognise cases where column ordering is different? I dont know if there are cases where the engines may return the result in different column orders. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not aware of any such cases where the column ordering differs in the two engines. However, we do check for column data type mismatches. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIR any simple select query without group by can return different results, even using the same engine. This should be more frequent if there are several segments involved in the query. In fact we would like to verify order if the query is order by and do not do that in the other case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Column data type check will identify it. IME, if there is a query with 20 columns, there will be 20 messages in the |
||
DataSchema.ColumnDataType[] v2ResponseTypes = v2Response.getResultTable().getDataSchema().getColumnDataTypes(); | ||
|
||
if (v1ResponseTypes.length != v2ResponseTypes.length) { | ||
differences.add("Mismatch in number of columns returned. v1: " + v1ResponseTypes.length | ||
+ ", v2: " + v2ResponseTypes.length); | ||
return differences; | ||
} | ||
|
||
String[] v1ColumnNames = v1Response.getResultTable().getDataSchema().getColumnNames(); | ||
String[] v2ColumnNames = v2Response.getResultTable().getDataSchema().getColumnNames(); | ||
for (int i = 0; i < v1ResponseTypes.length; i++) { | ||
if (v1ResponseTypes[i] != v2ResponseTypes[i]) { | ||
String columnName = v1ColumnNames[i].equals(v2ColumnNames[i]) | ||
? v1ColumnNames[i] | ||
: v1ColumnNames[i] + " / " + v2ColumnNames[i]; | ||
differences.add("Mismatch in column data type for column with name " + columnName | ||
+ ". v1 type: " + v1ResponseTypes[i] + ", v2 type: " + v2ResponseTypes[i]); | ||
} | ||
} | ||
|
||
if (v1Response.getNumRowsResultSet() != v2Response.getNumRowsResultSet()) { | ||
differences.add("Mismatch in number of rows returned. v1: " + v1Response.getNumRowsResultSet() | ||
+ ", v2: " + v2Response.getNumRowsResultSet()); | ||
return differences; | ||
} | ||
|
||
QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query); | ||
if (QueryContextUtils.isAggregationQuery(queryContext) && queryContext.getGroupByExpressions() == null) { | ||
// Aggregation-only query with a single row output | ||
for (int i = 0; i < v1ColumnNames.length; i++) { | ||
if (!Objects.equals(v1Response.getResultTable().getRows().get(0)[i], | ||
v2Response.getResultTable().getRows().get(0)[i])) { | ||
differences.add("Mismatch in aggregation value for " + v1ColumnNames[i] | ||
+ ". v1 value: " + v1Response.getResultTable().getRows().get(0)[i] | ||
+ ", v2 value: " + v2Response.getResultTable().getRows().get(0)[i]); | ||
} | ||
} | ||
} | ||
|
||
return differences; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Add exception messages to help out the user ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The query response itself contains the entire v1 and v2 responses which will have the exceptions. It seems redundant to duplicate that here?