Skip to content

Commit

Permalink
Automatically detect whether a v1 query could have run on the v2 quer…
Browse files Browse the repository at this point in the history
…y engine (#13628)
  • Loading branch information
yashmayya authored Jul 30, 2024
1 parent 6ad08a7 commit 835949b
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -138,6 +142,9 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
protected final boolean _enableDistinctCountBitmapOverride;
protected final int _queryResponseLimit;
protected final Map<Long, QueryServers> _queriesById;
protected final boolean _enableMultistageMigrationMetric;
protected ExecutorService _multistageCompileExecutor;
protected BlockingQueue<Pair<String, String>> _multistageCompileQueryQueue;

public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String brokerId,
BrokerRoutingManager routingManager, AccessControlFactory accessControlFactory,
Expand All @@ -155,12 +162,53 @@ public BaseSingleStageBrokerRequestHandler(PinotConfiguration config, String bro
boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
_queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;

_enableMultistageMigrationMetric = _config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC,
Broker.DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC);
if (_enableMultistageMigrationMetric) {
_multistageCompileExecutor = Executors.newSingleThreadExecutor();
_multistageCompileQueryQueue = new LinkedBlockingQueue<>(1000);
}

LOGGER.info("Initialized {} with broker id: {}, timeout: {}ms, query response limit: {}, query log max length: {}, "
+ "query log max rate: {}, query cancellation enabled: {}", getClass().getSimpleName(), _brokerId,
_brokerTimeoutMs, _queryResponseLimit, _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(),
enableQueryCancellation);
}

@Override
public void start() {
if (_enableMultistageMigrationMetric) {
_multistageCompileExecutor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
Pair<String, String> query;
try {
query = _multistageCompileQueryQueue.take();
} catch (InterruptedException e) {
// Exit gracefully when the thread is interrupted, presumably when this single thread executor is shutdown.
// Since this task is all that this single thread is doing, there's no need to preserve the thread's
// interrupt status flag.
return;
}
String queryString = query.getLeft();
String database = query.getRight();

// Check if the query is a v2 supported query
if (!ParserUtils.canCompileWithMultiStageEngine(queryString, database, _tableCache)) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.SINGLE_STAGE_QUERIES_INVALID_MULTI_STAGE, 1);
}
}
});
}
}

@Override
public void shutDown() {
if (_enableMultistageMigrationMetric) {
_multistageCompileExecutor.shutdownNow();
}
}

@Override
public Map<Long, String> getRunningQueries() {
Preconditions.checkState(_queriesById != null, "Query cancellation is not enabled on broker");
Expand Down Expand Up @@ -478,8 +526,19 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
}

_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES, 1);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.QUERIES_GLOBAL, 1);
_brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.REQUEST_SIZE, query.length());

if (!pinotQuery.isExplain() && _enableMultistageMigrationMetric) {
// Check if the query is a v2 supported query
String database = DatabaseUtils.extractDatabaseFromQueryRequest(sqlNodeAndOptions.getOptions(), httpHeaders);
// Attempt to add the query to the compile queue; drop if queue is full
if (!_multistageCompileQueryQueue.offer(Pair.of(query, database))) {
LOGGER.trace("Not compiling query `{}` using the multi-stage query engine because the query queue is full",
query);
}
}

// Prepare OFFLINE and REALTIME requests
BrokerRequest offlineBrokerRequest = null;
BrokerRequest realtimeBrokerRequest = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId, Brok

@Override
public void start() {
super.start();
}

@Override
public void shutDown() {
super.shutDown();
_streamingQueryClient.shutdown();
_streamingReduceService.shutDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,14 @@ public SingleConnectionBrokerRequestHandler(PinotConfiguration config, String br

@Override
public void start() {
super.start();
_failureDetector.register(this);
_failureDetector.start();
}

@Override
public void shutDown() {
super.shutDown();
_failureDetector.stop();
_queryRouter.shutDown();
_brokerReduceService.shutDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
* At this moment this counter does not include queries executed in multi-stage mode.
*/
QUERIES("queries", false),
/**
* Number of single-stage queries that have been started.
* <p>
* Unlike {@link #QUERIES}, this metric is global and not attached to a particular table.
* That means it can be used to know how many single-stage queries have been started in total.
*/
QUERIES_GLOBAL("queries", true),
/**
* Number of multi-stage queries that have been started.
* <p>
Expand All @@ -49,7 +56,10 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
* sum of this metric across all tables should be greater or equal than {@link #MULTI_STAGE_QUERIES_GLOBAL}.
*/
MULTI_STAGE_QUERIES("queries", false),

/**
* Number of single-stage queries executed that would not have successfully run on the multi-stage query engine as is.
*/
SINGLE_STAGE_QUERIES_INVALID_MULTI_STAGE("queries", true),
// These metrics track the exceptions caught during query execution in broker side.
// Query rejected by Jersey thread pool executor
QUERY_REJECTED_EXCEPTIONS("exceptions", true),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;


/**
* Tests that verify JMX metrics emitted by various Pinot components.
*/
public class JmxMetricsIntegrationTest extends BaseClusterIntegrationTestSet {

private static final int NUM_BROKERS = 1;
private static final int NUM_SERVERS = 1;

private static final MBeanServer MBEAN_SERVER = ManagementFactory.getPlatformMBeanServer();
private static final String PINOT_JMX_METRICS_DOMAIN = "\"org.apache.pinot.common.metrics\"";
private static final String BROKER_METRICS_TYPE = "\"BrokerMetrics\"";

@BeforeClass
public void setUp() throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);

// Start the Pinot cluster
startZk();
startController();
startBrokers(NUM_BROKERS);
startServers(NUM_SERVERS);

// Create and upload the schema and table config
Schema schema = createSchema();
addSchema(schema);
TableConfig tableConfig = createOfflineTableConfig();
addTableConfig(tableConfig);

// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);

// Create and upload segments
ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
uploadSegments(getTableName(), _tarDir);

// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
}

@Test
public void testMultiStageMigrationMetric() throws Exception {
ObjectName multiStageMigrationMetric = new ObjectName(PINOT_JMX_METRICS_DOMAIN,
new Hashtable<>(Map.of("type", BROKER_METRICS_TYPE,
"name", "\"pinot.broker.singleStageQueriesInvalidMultiStage\"")));

ObjectName queriesGlobalMetric = new ObjectName(PINOT_JMX_METRICS_DOMAIN,
new Hashtable<>(Map.of("type", BROKER_METRICS_TYPE,
"name", "\"pinot.broker.queriesGlobal\"")));

// Some queries are run during setup to ensure that all the docs are loaded
long initialQueryCount = (Long) MBEAN_SERVER.getAttribute(queriesGlobalMetric, "Count");
assertTrue(initialQueryCount > 0L);
assertEquals((Long) MBEAN_SERVER.getAttribute(multiStageMigrationMetric, "Count"), 0L);

postQuery("SELECT COUNT(*) FROM mytable");

// Run some queries that are known to not work as is with the multi-stage query engine

// Type differences
// STRING is VARCHAR in v2
JsonNode response = postQuery("SELECT CAST(ArrTime AS STRING) FROM mytable");
assertFalse(response.get("resultTable").get("rows").isEmpty());
// LONG is BIGINT in v2
response = postQuery("SELECT CAST(ArrTime AS LONG) FROM mytable");
assertFalse(response.get("resultTable").get("rows").isEmpty());
// FLOAT_ARRAY is FLOAT ARRAY in v2
response = postQuery("SELECT CAST(DivAirportIDs AS FLOAT_ARRAY) FROM mytable");
assertFalse(response.get("resultTable").get("rows").isEmpty());

// MV column requires ARRAY_TO_MV wrapper to be used in filter predicates
response = postQuery("SELECT COUNT(*) FROM mytable WHERE DivAirports = 'JFK'");
assertFalse(response.get("resultTable").get("rows").isEmpty());

// Unsupported function
response = postQuery("SELECT AirlineID, count(*) FROM mytable WHERE IN_SUBQUERY(airlineID, 'SELECT "
+ "ID_SET(AirlineID) FROM mytable WHERE Carrier = ''AA''') = 1 GROUP BY AirlineID;");
assertFalse(response.get("resultTable").get("rows").isEmpty());

// Repeated columns in an ORDER BY query
response = postQuery("SELECT AirTime, AirTime FROM mytable ORDER BY AirTime");
assertFalse(response.get("resultTable").get("rows").isEmpty());

assertEquals((Long) MBEAN_SERVER.getAttribute(queriesGlobalMetric, "Count"), initialQueryCount + 8L);

AtomicLong multiStageMigrationMetricValue = new AtomicLong();
TestUtils.waitForCondition((aVoid) -> {
try {
multiStageMigrationMetricValue.set((Long) MBEAN_SERVER.getAttribute(multiStageMigrationMetric, "Count"));
return multiStageMigrationMetricValue.get() == 6L;
} catch (Exception e) {
throw new RuntimeException(e);
}
}, 5000, "Expected value of MBean 'pinot.broker.singleStageQueriesInvalidMultiStage' to be: "
+ 6L + "; actual value: " + multiStageMigrationMetricValue.get());

assertEquals((Long) MBEAN_SERVER.getAttribute(multiStageMigrationMetric, "Count"), 6L);
}

@Override
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC, "true");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
import org.apache.pinot.sql.parsers.CalciteSqlParser;
import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
import org.apache.pinot.sql.parsers.parser.SqlPhysicalExplain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -79,6 +81,7 @@
* <p>It provide the higher level entry interface to convert a SQL string into a {@link DispatchableSubPlan}.
*/
public class QueryEnvironment {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryEnvironment.class);
private static final CalciteConnectionConfig CONNECTION_CONFIG;

static {
Expand Down Expand Up @@ -200,6 +203,24 @@ public List<String> getTableNamesForQuery(String sqlQuery) {
}
}

/**
* Returns whether the query can be successfully compiled in this query environment
*/
public boolean canCompileQuery(String query) {
try (PlannerContext plannerContext = getPlannerContext()) {
SqlNode sqlNode = CalciteSqlParser.compileToSqlNodeAndOptions(query).getSqlNode();
if (sqlNode.getKind().equals(SqlKind.EXPLAIN)) {
sqlNode = ((SqlExplain) sqlNode).getExplicandum();
}
compileQuery(sqlNode, plannerContext);
LOGGER.debug("Successfully compiled query using the multi-stage query engine: `{}`", query);
return true;
} catch (Exception e) {
LOGGER.warn("Encountered an error while compiling query `{}` using the multi-stage query engine", query, e);
return false;
}
}

/**
* Results of planning a query
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,11 @@ private ParserUtils() {
*/
public static boolean canCompileWithMultiStageEngine(String query, String database, TableCache tableCache) {
// try to parse and compile the query with the Calcite planner used by the multi-stage query engine
try {
LOGGER.info("Trying to compile query `{}` using the multi-stage query engine", query);
QueryEnvironment queryEnvironment = new QueryEnvironment(database, tableCache, null);
queryEnvironment.getTableNamesForQuery(query);
LOGGER.info("Successfully compiled query using the multi-stage query engine: `{}`", query);
return true;
} catch (Exception e) {
LOGGER.error("Encountered an error while compiling query `{}` using the multi-stage query engine", query, e);
return false;
}
long compileStartTime = System.currentTimeMillis();
LOGGER.debug("Trying to compile query `{}` using the multi-stage query engine", query);
QueryEnvironment queryEnvironment = new QueryEnvironment(database, tableCache, null);
boolean canCompile = queryEnvironment.canCompileQuery(query);
LOGGER.debug("Multi-stage query compilation time = {}ms", System.currentTimeMillis() - compileStartTime);
return canCompile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,14 @@ public static class Broker {
public static final String CONFIG_OF_NEW_SEGMENT_EXPIRATION_SECONDS = "pinot.broker.new.segment.expiration.seconds";
public static final long DEFAULT_VALUE_OF_NEW_SEGMENT_EXPIRATION_SECONDS = TimeUnit.MINUTES.toSeconds(5);

// If this config is set to true, the broker will check every query executed using the v1 query engine and attempt
// to determine whether the query could have successfully been run on the v2 / multi-stage query engine. If not,
// a counter metric will be incremented - if this counter remains 0 during regular query workload execution, it
// signals that users can potentially migrate their query workload to the multistage query engine.
public static final String CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC
= "pinot.broker.enable.multistage.migration.metric";
public static final boolean DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC = false;

public static class Request {
public static final String SQL = "sql";
public static final String TRACE = "trace";
Expand Down

0 comments on commit 835949b

Please sign in to comment.