Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multistage] support database in v2 #12591

Merged
merged 29 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
89d019f
[multistage] support database
shounakmk219 Mar 7, 2024
9d91b32
lint fix
shounakmk219 Mar 7, 2024
1c0b240
Move isPartOfDatabase utility to DatabaseUtils
shounakmk219 Mar 15, 2024
98195f4
Reuse utils and constants
shounakmk219 Mar 15, 2024
246a9b3
lint fix
shounakmk219 Mar 15, 2024
8c4649f
Fix test case
shounakmk219 Mar 15, 2024
84fcbde
review fixes
shounakmk219 Mar 18, 2024
3b3b87e
[multistage] support database
shounakmk219 Mar 7, 2024
ed25b48
lint fix
shounakmk219 Mar 7, 2024
62dd3fd
Move isPartOfDatabase utility to DatabaseUtils
shounakmk219 Mar 15, 2024
86007f9
Reuse utils and constants
shounakmk219 Mar 15, 2024
fc6bf1d
lint fix
shounakmk219 Mar 15, 2024
601b896
Fix test case
shounakmk219 Mar 15, 2024
96cee48
review fixes
shounakmk219 Mar 18, 2024
345cc0e
keep separate tests for all database context checks
shounakmk219 Mar 19, 2024
72520e1
Merge branch 'db-support-in-v2-engine' of https://github.com/shounakm…
shounakmk219 Mar 20, 2024
49d6c7f
remove subCatalog and use root catalog itself for query specific data…
shounakmk219 Mar 20, 2024
c0aa537
Merge remote-tracking branch 'upstream/master' into db-support-in-v2-…
shounakmk219 Mar 20, 2024
74d9bf6
table cache mock fix
shounakmk219 Mar 20, 2024
c1f67c0
Bugfix. Table name extraction from TableCache
shounakmk219 Mar 21, 2024
929bced
Use "default" database context when no context is passed
shounakmk219 Mar 21, 2024
724f392
cosmetic fixes
shounakmk219 Mar 21, 2024
cdae3c0
Fix the expected explain plan with default path
shounakmk219 Mar 21, 2024
7bd212c
Add cross database query test
shounakmk219 Mar 21, 2024
ae66547
Add more tests for database context from header
shounakmk219 Mar 21, 2024
071757d
fix missed LogicalTableScan content updates
shounakmk219 Mar 21, 2024
2c61d5b
typo
shounakmk219 Mar 21, 2024
de836ed
Merge remote-tracking branch 'upstream/master' into db-support-in-v2-…
shounakmk219 Mar 21, 2024
3294fcf
test fix
shounakmk219 Mar 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiStageBrokerRequestHandler.class);

private final QueryEnvironment _queryEnvironment;
private final WorkerManager _workerManager;
private final MailboxService _mailboxService;
private final QueryDispatcher _queryDispatcher;

Expand All @@ -93,9 +93,7 @@ public MultiStageBrokerRequestHandler(PinotConfiguration config, String brokerId
LOGGER.info("Using Multi-stage BrokerRequestHandler.");
String hostname = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME);
int port = Integer.parseInt(config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT));
_queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()),
CalciteSchemaBuilder.asRootSchema(new PinotCatalog(tableCache)),
new WorkerManager(hostname, port, routingManager), _tableCache);
_workerManager = new WorkerManager(hostname, port, _routingManager);
_mailboxService = new MailboxService(hostname, port, config);
_queryDispatcher = new QueryDispatcher(_mailboxService);

Expand Down Expand Up @@ -128,9 +126,11 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
queryTimeoutMs = timeoutMsFromQueryOption == null ? _brokerTimeoutMs : timeoutMsFromQueryOption;
// Compile the request
compilationStartTimeNs = System.nanoTime();
QueryEnvironment queryEnvironment = new QueryEnvironment(new TypeFactory(new TypeSystem()),
CalciteSchemaBuilder.asRootSchema(new PinotCatalog(_tableCache)), _workerManager, _tableCache);
switch (sqlNodeAndOptions.getSqlNode().getKind()) {
case EXPLAIN:
queryPlanResult = _queryEnvironment.explainQuery(query, sqlNodeAndOptions, requestId);
queryPlanResult = queryEnvironment.explainQuery(query, sqlNodeAndOptions, requestId);
String plan = queryPlanResult.getExplainPlan();
Set<String> tableNames = queryPlanResult.getTableNames();
if (!hasTableAccess(requesterIdentity, tableNames, requestContext, httpHeaders)) {
Expand All @@ -140,7 +140,7 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
return constructMultistageExplainPlan(query, plan);
case SELECT:
default:
queryPlanResult = _queryEnvironment.planQuery(query, sqlNodeAndOptions, requestId);
queryPlanResult = queryEnvironment.planQuery(query, sqlNodeAndOptions, requestId);
break;
}
} catch (WebApplicationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,13 @@ public static String translateTableName(String tableName, HttpHeaders headers, b
public static String translateTableName(String tableName, HttpHeaders headers) {
return translateTableName(tableName, headers, false);
}

public static boolean isPartOfDatabase(String tableName, @Nullable String databaseName) {
// assumes tableName will not have default database prefix ('default.')
if (StringUtils.isEmpty(databaseName) || databaseName.equalsIgnoreCase(CommonConstants.DEFAULT_DATABASE)) {
return StringUtils.split(tableName, '.').length == 1;
} else {
return tableName.startsWith(databaseName + ".");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -732,22 +732,14 @@ public List<String> getAllTables() {
public List<String> getAllTables(@Nullable String databaseName) {
List<String> tableNames = new ArrayList<>();
for (String resourceName : getAllResources()) {
if (TableNameBuilder.isTableResource(resourceName) && isPartOfDatabase(resourceName, databaseName)) {
if (TableNameBuilder.isTableResource(resourceName)
&& DatabaseUtils.isPartOfDatabase(resourceName, databaseName)) {
tableNames.add(resourceName);
}
}
return tableNames;
}

private boolean isPartOfDatabase(String tableName, @Nullable String databaseName) {
// assumes tableName will not have default database prefix ('default.')
if (StringUtils.isEmpty(databaseName) || databaseName.equalsIgnoreCase(CommonConstants.DEFAULT_DATABASE)) {
return StringUtils.split(tableName, '.').length == 1;
} else {
return tableName.startsWith(databaseName + ".");
}
}

/**
* Get all offline table names from default database.
*
Expand All @@ -766,7 +758,8 @@ public List<String> getAllOfflineTables() {
public List<String> getAllOfflineTables(@Nullable String databaseName) {
List<String> offlineTableNames = new ArrayList<>();
for (String resourceName : getAllResources()) {
if (isPartOfDatabase(resourceName, databaseName) && TableNameBuilder.isOfflineTableResource(resourceName)) {
if (DatabaseUtils.isPartOfDatabase(resourceName, databaseName)
&& TableNameBuilder.isOfflineTableResource(resourceName)) {
offlineTableNames.add(resourceName);
}
}
Expand All @@ -790,7 +783,7 @@ public List<String> getAllDimensionTables() {
*/
public List<String> getAllDimensionTables(@Nullable String databaseName) {
return _tableCache.getAllDimensionTables().stream()
.filter(table -> isPartOfDatabase(table, databaseName))
.filter(table -> DatabaseUtils.isPartOfDatabase(table, databaseName))
.collect(Collectors.toList());
}

Expand All @@ -812,7 +805,8 @@ public List<String> getAllRealtimeTables() {
public List<String> getAllRealtimeTables(@Nullable String databaseName) {
List<String> realtimeTableNames = new ArrayList<>();
for (String resourceName : getAllResources()) {
if (isPartOfDatabase(resourceName, databaseName) && TableNameBuilder.isRealtimeTableResource(resourceName)) {
if (DatabaseUtils.isPartOfDatabase(resourceName, databaseName)
&& TableNameBuilder.isRealtimeTableResource(resourceName)) {
realtimeTableNames.add(resourceName);
}
}
Expand All @@ -837,7 +831,8 @@ public List<String> getAllRawTables() {
public List<String> getAllRawTables(@Nullable String databaseName) {
Set<String> rawTableNames = new HashSet<>();
for (String resourceName : getAllResources()) {
if (TableNameBuilder.isTableResource(resourceName) && isPartOfDatabase(resourceName, databaseName)) {
if (TableNameBuilder.isTableResource(resourceName)
&& DatabaseUtils.isPartOfDatabase(resourceName, databaseName)) {
rawTableNames.add(TableNameBuilder.extractRawTableName(resourceName));
}
}
Expand Down Expand Up @@ -1616,7 +1611,7 @@ public List<String> getSchemaNames(@Nullable String databaseName) {
List<String> schemas = _propertyStore.getChildNames(
PinotHelixPropertyStoreZnRecordProvider.forSchema(_propertyStore).getRelativePath(), AccessOption.PERSISTENT);
if (schemas != null) {
return schemas.stream().filter(schemaName -> isPartOfDatabase(schemaName, databaseName))
return schemas.stream().filter(schemaName -> DatabaseUtils.isPartOfDatabase(schemaName, databaseName))
.collect(Collectors.toList());
}
return Collections.emptyList();
Expand Down Expand Up @@ -4011,7 +4006,7 @@ public Map<String, List<InstanceInfo>> getTableToLiveBrokersMapping(@Nullable St
ZNRecord znRecord = ev.getRecord();
for (Map.Entry<String, Map<String, String>> tableToBrokersEntry : znRecord.getMapFields().entrySet()) {
String tableName = tableToBrokersEntry.getKey();
if (!isPartOfDatabase(tableName, databaseName)) {
if (!DatabaseUtils.isPartOfDatabase(tableName, databaseName)) {
continue;
}
Map<String, String> brokersToState = tableToBrokersEntry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,19 @@
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
Expand All @@ -47,6 +54,8 @@

public class MultiStageEngineIntegrationTest extends BaseClusterIntegrationTestSet {
private static final String SCHEMA_FILE_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls.schema";
private String _tableName = DEFAULT_TABLE_NAME;
private List<File> _avroFiles = new ArrayList<>();

@Override
protected String getSchemaFileName() {
Expand All @@ -72,17 +81,17 @@ public void setUp()
addTableConfig(tableConfig);

// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);
_avroFiles = unpackAvroData(_tempDir);

// Create and upload segments
ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
ClusterIntegrationTestUtils.buildSegmentsFromAvro(_avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
uploadSegments(getTableName(), _tarDir);

// Set up the H2 connection
setUpH2Connection(avroFiles);
setUpH2Connection(_avroFiles);

// Initialize the query generator
setUpQueryGenerator(avroFiles);
setUpQueryGenerator(_avroFiles);

// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);
Expand Down Expand Up @@ -749,6 +758,76 @@ public void testSearch()
assertNoError(jsonNode);
}

@Override
protected String getTableName() {
return _tableName;
}

@Test
public void testWithDatabaseContext()
throws Exception {
try {
_tableName = "db1." + DEFAULT_TABLE_NAME;
String defaultCol = "ActualElapsedTime";
String customCol = "ActualElapsedTime_2";
Schema schema = createSchema();
schema.addField(new MetricFieldSpec(customCol, FieldSpec.DataType.INT));
addSchema(schema);
TableConfig tableConfig = createOfflineTableConfig();
assert tableConfig.getIndexingConfig().getNoDictionaryColumns() != null;
List<String> noDicCols = new ArrayList<>(DEFAULT_NO_DICTIONARY_COLUMNS);
noDicCols.add(customCol);
tableConfig.getIndexingConfig().setNoDictionaryColumns(noDicCols);
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(List.of(new TransformConfig(customCol, defaultCol)));
tableConfig.setIngestionConfig(ingestionConfig);
addTableConfig(tableConfig);

// Create and upload segments to 'db1.mytable'
TestUtils.ensureDirectoriesExistAndEmpty(_segmentDir, _tarDir);
ClusterIntegrationTestUtils.buildSegmentsFromAvro(_avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
uploadSegments(getTableName(), _tarDir);

// Wait for all documents loaded
waitForAllDocsLoaded(600_000L);

// default database check. No database context passed
checkQueryResultForDBTest(defaultCol, DEFAULT_TABLE_NAME);

// default database check. Default database context passed as table prefix
checkQueryResultForDBTest(defaultCol, "default." + DEFAULT_TABLE_NAME);

// default database check. Default database context passed as SET database='dbName'
checkQueryResultForDBTest(defaultCol, DEFAULT_TABLE_NAME, "default");

// Using renamed column "ActualElapsedTime_2" to ensure that the same table is not being queried.
// custom database check. Database context passed as table prefix
checkQueryResultForDBTest(customCol, _tableName);

// custom database check. Database context passed as SET database='dbName'
checkQueryResultForDBTest(customCol, DEFAULT_TABLE_NAME, "db1");
} finally {
dropOfflineTable(_tableName);
_tableName = DEFAULT_TABLE_NAME;
}
}

private void checkQueryResultForDBTest(String column, String tableName)
throws Exception {
checkQueryResultForDBTest(column, tableName, null);
}

private void checkQueryResultForDBTest(String column, String tableName, @Nullable String database)
throws Exception {
String query = (StringUtils.isNotBlank(database) ? "SET database='" + database + "'; " : "")
+ "select max(" + column + ") from " + tableName + ";";
// max value of 'ActualElapsedTime'
long expectedValue = 678;
JsonNode jsonNode = postQuery(query);
long result = jsonNode.get("resultTable").get("rows").get(0).get(0).asLong();
assertEquals(result, expectedValue);
}

@AfterClass
public void tearDown()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2772,11 +2772,7 @@ public void testQuerySourceWithDatabaseNameV2()
testQuery(pinotQuery, h2Query);

pinotQuery = "SELECT DISTINCT Carrier FROM default.mytable LIMIT 1000000";
JsonNode response = postQuery(pinotQuery);
JsonNode exceptions = response.get("exceptions");
assertFalse(exceptions.isEmpty(), "At least one exception was expected");
JsonNode firstException = exceptions.get(0);
assertEquals(firstException.get("errorCode").asInt(), QueryException.QUERY_PLANNING_ERROR_CODE);
testQuery(pinotQuery, h2Query);
}

@Test(dataProvider = "useBothQueryEngines")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,15 @@ public static CalciteSchema asRootSchema(Schema root) {
}
return rootSchema;
}

public static CalciteSchema asSubSchema(Schema root, String name) {
CalciteSchema subSchema = CalciteSchema.createRootSchema(false, false, name, root);
SchemaPlus schemaPlus = subSchema.plus();
for (Map.Entry<String, List<Function>> e : FunctionRegistry.getRegisteredCalciteFunctionMap().entrySet()) {
for (Function f : e.getValue()) {
schemaPlus.add(e.getKey(), f);
}
}
return subSchema;
}
}
Loading
Loading