Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support creating dictionary at runtime for an existing column #9678

Merged
merged 5 commits into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.queries;

import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -43,13 +44,21 @@
import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.core.util.GapfillUtils;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.apache.pinot.sql.parsers.CalciteSqlParser;

Expand Down Expand Up @@ -249,6 +258,26 @@ protected BrokerResponseNative getBrokerResponseForOptimizedQuery(String query,
return getBrokerResponse(pinotQuery, PLAN_MAKER);
}

/**
* Helper function to call reloadSegment on an existing index directory. The segment is preprocessed using the
* config provided in indexLoadingConfig. It returns an immutable segment.
*/
protected ImmutableSegment reloadSegment(File indexDir, IndexLoadingConfig indexLoadingConfig, Schema schema)
throws Exception {
Map<String, Object> props = new HashMap<>();
props.put(IndexLoadingConfig.READ_MODE_KEY, ReadMode.mmap.toString());
PinotConfiguration configuration = new PinotConfiguration(props);

try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader()
.load(indexDir.toURI(),
new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(configuration).build());
SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, schema)) {
processor.process();
}
ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig);
return immutableSegment;
}

/**
* Run query on multiple index segments with custom plan maker.
* This test is particularly useful for testing statistical aggregation functions such as COVAR_POP, COVAR_SAMP, etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,42 @@ public void testSelectionOverRangeFilter(String query, int min, int max, boolean
}
}

@Test(dataProvider = "selectionTestCases")
public void testSelectionOverRangeFilterAfterReload(String query, int min, int max, boolean inclusive)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Query execution tests seem to be focused on range index only. Any reason ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add other tests like

  • Run query on a raw column using that in filter
  • Enable dict + reload
  • Run same query again and it should use dict based predicate evaluator and return same result

Same goes for using the column in SELECT clause. It should correctly return the same result after doing dual lookup in rewriten fwd index and dict as it did before with raw fwd index. Things like that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of types for both SV and MV should be considered

throws Exception {
// Enable dictionary on RAW_INT_COL and reload the segment.
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, TABLE_CONFIG);
indexLoadingConfig.getNoDictionaryColumns().remove(RAW_INT_COL);
File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
ImmutableSegment immutableSegment = reloadSegment(indexDir, indexLoadingConfig, SCHEMA);
_indexSegment = immutableSegment;
_indexSegments = Arrays.asList(immutableSegment, immutableSegment);

Operator<?> operator = getOperator(query);
assertTrue(operator instanceof SelectionOnlyOperator);
for (Object[] row : Objects.requireNonNull(((SelectionOnlyOperator) operator).nextBlock().getRows())) {
int value = (int) row[0];
assertTrue(inclusive ? value >= min : value > min);
assertTrue(inclusive ? value <= max : value < max);
}

// Enable dictionary on RAW_DOUBLE_COL and reload the segment.
indexLoadingConfig = new IndexLoadingConfig(null, TABLE_CONFIG);
indexLoadingConfig.getNoDictionaryColumns().remove(RAW_DOUBLE_COL);
indexDir = new File(INDEX_DIR, SEGMENT_NAME);
immutableSegment = reloadSegment(indexDir, indexLoadingConfig, SCHEMA);
_indexSegment = immutableSegment;
_indexSegments = Arrays.asList(immutableSegment, immutableSegment);

operator = getOperator(query);
assertTrue(operator instanceof SelectionOnlyOperator);
for (Object[] row : Objects.requireNonNull(((SelectionOnlyOperator) operator).nextBlock().getRows())) {
int value = (int) row[0];
assertTrue(inclusive ? value >= min : value > min);
assertTrue(inclusive ? value <= max : value < max);
}
}

@Test(dataProvider = "countTestCases")
public void testCountOverRangeFilter(String query, int expectedCount) {
Operator<?> operator = getOperator(query);
Expand All @@ -232,4 +268,38 @@ public void testCountOverRangeFilter(String query, int expectedCount) {
assertEquals(aggregationResult.size(), 1);
assertEquals(((Number) aggregationResult.get(0)).intValue(), expectedCount, query);
}

@Test(dataProvider = "countTestCases")
public void testCountOverRangeFilterAfterReload(String query, int expectedCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best for us to control the query. Not really sure if the query being sent here by the provider is indeed using the concerned column in parts where we want

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • SELECT FROM foo LIMIT -- this will test out that dict is correct and fwd index got rewritten
  • Try the same in GROUP BY as GROUP BY uses dict.
  • Try MIN and MAX aggregations -- they can be answered from dict.
  • WHERE clause like mentioned in another commend

throws Exception {
// Enable dictionary on RAW_LONG_COL and reload the segment.
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, TABLE_CONFIG);
indexLoadingConfig.getNoDictionaryColumns().remove(RAW_LONG_COL);
File indexDir = new File(INDEX_DIR, SEGMENT_NAME);
ImmutableSegment immutableSegment = reloadSegment(indexDir, indexLoadingConfig, SCHEMA);
_indexSegment = immutableSegment;
_indexSegments = Arrays.asList(immutableSegment, immutableSegment);


Operator<?> operator = getOperator(query);
assertTrue(operator instanceof FastFilteredCountOperator);
List<Object> aggregationResult = ((FastFilteredCountOperator) operator).nextBlock().getResults();
assertNotNull(aggregationResult);
assertEquals(aggregationResult.size(), 1);
assertEquals(((Number) aggregationResult.get(0)).intValue(), expectedCount, query);

// Enable dictionary on RAW_FLOAT_COL and reload the segment.
indexLoadingConfig = new IndexLoadingConfig(null, TABLE_CONFIG);
indexLoadingConfig.getNoDictionaryColumns().remove(RAW_FLOAT_COL);
immutableSegment = reloadSegment(indexDir, indexLoadingConfig, SCHEMA);
_indexSegment = immutableSegment;
_indexSegments = Arrays.asList(immutableSegment, immutableSegment);

operator = getOperator(query);
assertTrue(operator instanceof FastFilteredCountOperator);
aggregationResult = ((FastFilteredCountOperator) operator).nextBlock().getResults();
assertNotNull(aggregationResult);
assertEquals(aggregationResult.size(), 1);
assertEquals(((Number) aggregationResult.get(0)).intValue(), expectedCount, query);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,11 @@ void buildIndexCreationInfo()
String columnName = fieldSpec.getName();
DataType storedType = fieldSpec.getDataType().getStoredType();
ColumnStatistics columnProfile = _segmentStats.getColumnProfileFor(columnName);
boolean useVarLengthDictionary = varLengthDictionaryColumns.contains(columnName);
boolean useVarLengthDictionary =
shouldUseVarLengthDictionary(columnName, varLengthDictionaryColumns, storedType, columnProfile);
Object defaultNullValue = fieldSpec.getDefaultNullValue();
if (storedType == DataType.BYTES || storedType == DataType.BIG_DECIMAL) {
if (!columnProfile.isFixedLength()) {
useVarLengthDictionary = true;
}
if (storedType == DataType.BYTES) {
defaultNullValue = new ByteArray((byte[]) defaultNullValue);
}
if (storedType == DataType.BYTES) {
defaultNullValue = new ByteArray((byte[]) defaultNullValue);
}
boolean createDictionary = !rawIndexCreationColumns.contains(columnName)
&& !rawIndexCompressionTypeKeys.contains(columnName);
Expand All @@ -435,6 +431,25 @@ void buildIndexCreationInfo()
_segmentIndexCreationInfo.setTotalDocs(_totalDocs);
}

/**
* Uses config and column properties like storedType and length of elements to determine if
* varLengthDictionary should be used for a column
*/
public static boolean shouldUseVarLengthDictionary(String columnName, Set<String> varLengthDictColumns,
DataType columnStoredType, ColumnStatistics columnProfile) {
if (varLengthDictColumns.contains(columnName)) {
return true;
}

if (columnStoredType == DataType.BYTES || columnStoredType == DataType.BIG_DECIMAL) {
if (!columnProfile.isFixedLength()) {
return true;
}
}

return false;
}

/**
* Returns the name of the segment associated with this index creation driver.
*/
Expand Down
Loading