Skip to content

Commit

Permalink
Merge branch 'master' into manual-authorization-annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
apucher authored Aug 24, 2022
2 parents 890aa3c + 8bf94e7 commit 8c1aa68
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.PinotSqlType;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.glassfish.jersey.server.ManagedAsync;
Expand Down Expand Up @@ -209,7 +209,7 @@ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, HttpRequesterI
throws Exception {
SqlNodeAndOptions sqlNodeAndOptions;
try {
sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sqlRequestJson.get(Request.SQL).asText());
sqlNodeAndOptions = RequestUtils.parseQuery(sqlRequestJson.get(Request.SQL).asText(), sqlRequestJson);
} catch (Exception e) {
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.util.concurrent.RateLimiter;
import java.net.InetAddress;
import java.util.ArrayList;
Expand Down Expand Up @@ -290,24 +289,20 @@ private BrokerResponseNative handleRequest(long requestId, String query,
throws Exception {
LOGGER.debug("SQL query for request {}: {}", requestId, query);

// Compile the request
long compilationStartTimeNs = System.nanoTime();
long compilationStartTimeNs;
PinotQuery pinotQuery;
try {
if (sqlNodeAndOptions != null) {
// Include parse time when the query is already parsed
compilationStartTimeNs -= sqlNodeAndOptions.getParseTimeNs();
} else {
sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
}
// Parse the request
sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : RequestUtils.parseQuery(query, request);
// Compile the request into PinotQuery
compilationStartTimeNs = System.nanoTime();
pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
} catch (Exception e) {
LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", requestId, query, e.getMessage());
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1);
requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
}
setOptions(pinotQuery, requestId, query, request);

if (isLiteralOnlyQuery(pinotQuery)) {
LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query);
Expand Down Expand Up @@ -377,8 +372,9 @@ private BrokerResponseNative handleRequest(long requestId, String query,
}

long compilationEndTimeNs = System.nanoTime();
// full request compile time = compilationTimeNs + parserTimeNs
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REQUEST_COMPILATION,
compilationEndTimeNs - compilationStartTimeNs);
(compilationEndTimeNs - compilationStartTimeNs) + sqlNodeAndOptions.getParseTimeNs());

// Second-stage table-level access control
// TODO: Modify AccessControl interface to directly take PinotQuery
Expand Down Expand Up @@ -1543,11 +1539,6 @@ static String getActualColumnName(String rawTableName, String columnName, @Nulla
throw new BadQueryRequestException("Unknown columnName '" + columnName + "' found in the query");
}

public static Map<String, String> getOptionsFromJson(JsonNode request, String optionsKey) {
return Splitter.on(';').omitEmptyStrings().trimResults().withKeyValueSeparator('=')
.split(request.get(optionsKey).asText());
}

/**
* Helper function to decide whether to force the log
*
Expand All @@ -1566,46 +1557,6 @@ private boolean forceLog(BrokerResponse brokerResponse, long totalTimeMs) {
return totalTimeMs > 1000L;
}

/**
* Sets extra options for the given query.
*/
@VisibleForTesting
static void setOptions(PinotQuery pinotQuery, long requestId, String query, JsonNode jsonRequest) {
Map<String, String> queryOptions = new HashMap<>();
if (jsonRequest.has(Broker.Request.DEBUG_OPTIONS)) {
Map<String, String> debugOptions = getOptionsFromJson(jsonRequest, Broker.Request.DEBUG_OPTIONS);
if (!debugOptions.isEmpty()) {
// TODO: Do not set debug options after releasing 0.11.0. Currently we kept it for backward compatibility.
LOGGER.debug("Debug options are set to: {} for request {}: {}", debugOptions, requestId, query);
pinotQuery.setDebugOptions(debugOptions);

// NOTE: Debug options are deprecated. Put all debug options into query options for backward compatibility.
queryOptions.putAll(debugOptions);
}
}
if (jsonRequest.has(Broker.Request.QUERY_OPTIONS)) {
Map<String, String> queryOptionsFromJson = getOptionsFromJson(jsonRequest, Broker.Request.QUERY_OPTIONS);
queryOptions.putAll(queryOptionsFromJson);
}
Map<String, String> queryOptionsFromQuery = pinotQuery.getQueryOptions();
if (queryOptionsFromQuery != null) {
queryOptions.putAll(queryOptionsFromQuery);
}
boolean enableTrace = jsonRequest.has(Broker.Request.TRACE) && jsonRequest.get(Broker.Request.TRACE).asBoolean();
if (enableTrace) {
queryOptions.put(Broker.Request.TRACE, "true");
}
// NOTE: Always set query options because we will put 'timeoutMs' later
pinotQuery.setQueryOptions(queryOptions);
if (!queryOptions.isEmpty()) {
LOGGER.debug("Query options are set to: {} for request {}: {}", queryOptions, requestId, query);
}
// TODO: Remove the SQL query options after releasing 0.11.0
// The query engine will break if these 2 options are missing during version upgrade.
queryOptions.put(Broker.Request.QueryOptionKey.GROUP_BY_MODE, Broker.Request.SQL);
queryOptions.put(Broker.Request.QueryOptionKey.RESPONSE_FORMAT, Broker.Request.SQL);
}

/**
* Sets the query timeout (remaining time in milliseconds) into the query options, and returns the remaining time in
* milliseconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,9 @@
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -84,42 +82,29 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
@Nullable RequesterIdentity requesterIdentity, RequestContext requestContext)
throws Exception {
if (sqlNodeAndOptions == null) {
JsonNode sql = request.get(Request.SQL);
if (sql == null) {
throw new BadQueryRequestException("Failed to find 'sql' in the request: " + request);
}
try {
sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(sql.asText());
sqlNodeAndOptions = RequestUtils.parseQuery(request.get(CommonConstants.Broker.Request.SQL).asText(), request);
} catch (Exception e) {
LOGGER.info("Caught exception while compiling SQL: {}, {}", sql.asText(), e.getMessage());
LOGGER.info("Caught exception while compiling SQL: {}, {}", request, e.getMessage());
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1);
requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
return new BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR, e));
}
}
if (request.has(CommonConstants.Broker.Request.QUERY_OPTIONS)) {
sqlNodeAndOptions.setExtraOptions(RequestUtils.getOptionsFromJson(request,
CommonConstants.Broker.Request.QUERY_OPTIONS));
}

if (_multiStageWorkerRequestHandler != null && useMultiStageEngine(request, sqlNodeAndOptions)) {
return _multiStageWorkerRequestHandler.handleRequest(request, sqlNodeAndOptions, requesterIdentity,
requestContext);
if (_multiStageWorkerRequestHandler != null && Boolean.parseBoolean(sqlNodeAndOptions.getOptions().get(
CommonConstants.Broker.Request.QueryOptionKey.USE_MULTISTAGE_ENGINE))) {
return _multiStageWorkerRequestHandler.handleRequest(request, requesterIdentity, requestContext);
} else {
return _singleStageBrokerRequestHandler.handleRequest(request, sqlNodeAndOptions, requesterIdentity,
requestContext);
}
}

private boolean useMultiStageEngine(JsonNode request, SqlNodeAndOptions sqlNodeAndOptions) {
Map<String, String> optionsFromSql = sqlNodeAndOptions.getOptions();
if (Boolean.parseBoolean(optionsFromSql.get(QueryOptionKey.USE_MULTISTAGE_ENGINE))) {
return true;
}
if (request.has(Request.QUERY_OPTIONS)) {
Map<String, String> optionsFromRequest =
BaseBrokerRequestHandler.getOptionsFromJson(request, Request.QUERY_OPTIONS);
return Boolean.parseBoolean(optionsFromRequest.get(QueryOptionKey.USE_MULTISTAGE_ENGINE));
}
return false;
}

@Override
public Map<Long, String> getRunningQueries() {
// TODO: add support for multiStaged engine: track running queries for multiStaged engine and combine its
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.query.QueryEnvironment;
Expand All @@ -57,7 +58,6 @@
import org.apache.pinot.spi.exception.BadQueryRequestException;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -133,16 +133,12 @@ private BrokerResponseNative handleRequest(long requestId, String query,
throws Exception {
LOGGER.debug("SQL query for request {}: {}", requestId, query);

// Parse the request
sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : RequestUtils.parseQuery(query, request);
// Compile the request
long compilationStartTimeNs = System.nanoTime();
QueryPlan queryPlan;
try {
if (sqlNodeAndOptions != null) {
// Include parse time when the query is already parsed
compilationStartTimeNs -= sqlNodeAndOptions.getParseTimeNs();
} else {
sqlNodeAndOptions = CalciteSqlParser.compileToSqlNodeAndOptions(query);
}
queryPlan = _queryEnvironment.planQuery(query, sqlNodeAndOptions);
} catch (Exception e) {
LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", requestId, query, e.getMessage());
Expand All @@ -163,7 +159,8 @@ private BrokerResponseNative handleRequest(long requestId, String query,
long executionEndTimeNs = System.nanoTime();

// Set total query processing time
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs - compilationStartTimeNs);
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(sqlNodeAndOptions.getParseTimeNs()
+ (executionEndTimeNs - compilationStartTimeNs));
brokerResponse.setTimeUsedMs(totalTimeMs);
brokerResponse.setResultTable(toResultTable(queryResults));
requestContext.setQueryProcessingTime(totalTimeMs);
Expand Down
Loading

0 comments on commit 8c1aa68

Please sign in to comment.