diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java index e44d40d70f47..f3599877804d 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.queries; +import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -43,13 +44,21 @@ import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.core.util.GapfillUtils; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor; +import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; +import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; +import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Server; +import org.apache.pinot.spi.utils.ReadMode; import org.apache.pinot.sql.parsers.CalciteSqlCompiler; import org.apache.pinot.sql.parsers.CalciteSqlParser; @@ -249,6 +258,26 @@ protected BrokerResponseNative getBrokerResponseForOptimizedQuery(String query, return getBrokerResponse(pinotQuery, PLAN_MAKER); } + /** + * Helper function to call reloadSegment on an existing index directory. The segment is preprocessed using the + * config provided in indexLoadingConfig. It returns an immutable segment. + */ + protected ImmutableSegment reloadSegment(File indexDir, IndexLoadingConfig indexLoadingConfig, Schema schema) + throws Exception { + Map props = new HashMap<>(); + props.put(IndexLoadingConfig.READ_MODE_KEY, ReadMode.mmap.toString()); + PinotConfiguration configuration = new PinotConfiguration(props); + + try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() + .load(indexDir.toURI(), + new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(configuration).build()); + SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, schema)) { + processor.process(); + } + ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig); + return immutableSegment; + } + /** * Run query on multiple index segments with custom plan maker. * This test is particularly useful for testing statistical aggregation functions such as COVAR_POP, COVAR_SAMP, etc. diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/RangeQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/RangeQueriesTest.java index 4da0b345bb66..44ba46b2dbbe 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/RangeQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/RangeQueriesTest.java @@ -223,6 +223,42 @@ public void testSelectionOverRangeFilter(String query, int min, int max, boolean } } + @Test(dataProvider = "selectionTestCases") + public void testSelectionOverRangeFilterAfterReload(String query, int min, int max, boolean inclusive) + throws Exception { + // Enable dictionary on RAW_INT_COL and reload the segment. + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, TABLE_CONFIG); + indexLoadingConfig.getNoDictionaryColumns().remove(RAW_INT_COL); + File indexDir = new File(INDEX_DIR, SEGMENT_NAME); + ImmutableSegment immutableSegment = reloadSegment(indexDir, indexLoadingConfig, SCHEMA); + _indexSegment = immutableSegment; + _indexSegments = Arrays.asList(immutableSegment, immutableSegment); + + Operator operator = getOperator(query); + assertTrue(operator instanceof SelectionOnlyOperator); + for (Object[] row : Objects.requireNonNull(((SelectionOnlyOperator) operator).nextBlock().getRows())) { + int value = (int) row[0]; + assertTrue(inclusive ? value >= min : value > min); + assertTrue(inclusive ? value <= max : value < max); + } + + // Enable dictionary on RAW_DOUBLE_COL and reload the segment. + indexLoadingConfig = new IndexLoadingConfig(null, TABLE_CONFIG); + indexLoadingConfig.getNoDictionaryColumns().remove(RAW_DOUBLE_COL); + indexDir = new File(INDEX_DIR, SEGMENT_NAME); + immutableSegment = reloadSegment(indexDir, indexLoadingConfig, SCHEMA); + _indexSegment = immutableSegment; + _indexSegments = Arrays.asList(immutableSegment, immutableSegment); + + operator = getOperator(query); + assertTrue(operator instanceof SelectionOnlyOperator); + for (Object[] row : Objects.requireNonNull(((SelectionOnlyOperator) operator).nextBlock().getRows())) { + int value = (int) row[0]; + assertTrue(inclusive ? value >= min : value > min); + assertTrue(inclusive ? value <= max : value < max); + } + } + @Test(dataProvider = "countTestCases") public void testCountOverRangeFilter(String query, int expectedCount) { Operator operator = getOperator(query); @@ -232,4 +268,38 @@ public void testCountOverRangeFilter(String query, int expectedCount) { assertEquals(aggregationResult.size(), 1); assertEquals(((Number) aggregationResult.get(0)).intValue(), expectedCount, query); } + + @Test(dataProvider = "countTestCases") + public void testCountOverRangeFilterAfterReload(String query, int expectedCount) + throws Exception { + // Enable dictionary on RAW_LONG_COL and reload the segment. + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, TABLE_CONFIG); + indexLoadingConfig.getNoDictionaryColumns().remove(RAW_LONG_COL); + File indexDir = new File(INDEX_DIR, SEGMENT_NAME); + ImmutableSegment immutableSegment = reloadSegment(indexDir, indexLoadingConfig, SCHEMA); + _indexSegment = immutableSegment; + _indexSegments = Arrays.asList(immutableSegment, immutableSegment); + + + Operator operator = getOperator(query); + assertTrue(operator instanceof FastFilteredCountOperator); + List aggregationResult = ((FastFilteredCountOperator) operator).nextBlock().getResults(); + assertNotNull(aggregationResult); + assertEquals(aggregationResult.size(), 1); + assertEquals(((Number) aggregationResult.get(0)).intValue(), expectedCount, query); + + // Enable dictionary on RAW_FLOAT_COL and reload the segment. + indexLoadingConfig = new IndexLoadingConfig(null, TABLE_CONFIG); + indexLoadingConfig.getNoDictionaryColumns().remove(RAW_FLOAT_COL); + immutableSegment = reloadSegment(indexDir, indexLoadingConfig, SCHEMA); + _indexSegment = immutableSegment; + _indexSegments = Arrays.asList(immutableSegment, immutableSegment); + + operator = getOperator(query); + assertTrue(operator instanceof FastFilteredCountOperator); + aggregationResult = ((FastFilteredCountOperator) operator).nextBlock().getResults(); + assertNotNull(aggregationResult); + assertEquals(aggregationResult.size(), 1); + assertEquals(((Number) aggregationResult.get(0)).intValue(), expectedCount, query); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java index 1738096ebdca..228f43f7935e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java @@ -416,15 +416,11 @@ void buildIndexCreationInfo() String columnName = fieldSpec.getName(); DataType storedType = fieldSpec.getDataType().getStoredType(); ColumnStatistics columnProfile = _segmentStats.getColumnProfileFor(columnName); - boolean useVarLengthDictionary = varLengthDictionaryColumns.contains(columnName); + boolean useVarLengthDictionary = + shouldUseVarLengthDictionary(columnName, varLengthDictionaryColumns, storedType, columnProfile); Object defaultNullValue = fieldSpec.getDefaultNullValue(); - if (storedType == DataType.BYTES || storedType == DataType.BIG_DECIMAL) { - if (!columnProfile.isFixedLength()) { - useVarLengthDictionary = true; - } - if (storedType == DataType.BYTES) { - defaultNullValue = new ByteArray((byte[]) defaultNullValue); - } + if (storedType == DataType.BYTES) { + defaultNullValue = new ByteArray((byte[]) defaultNullValue); } boolean createDictionary = !rawIndexCreationColumns.contains(columnName) && !rawIndexCompressionTypeKeys.contains(columnName); @@ -435,6 +431,25 @@ void buildIndexCreationInfo() _segmentIndexCreationInfo.setTotalDocs(_totalDocs); } + /** + * Uses config and column properties like storedType and length of elements to determine if + * varLengthDictionary should be used for a column + */ + public static boolean shouldUseVarLengthDictionary(String columnName, Set varLengthDictColumns, + DataType columnStoredType, ColumnStatistics columnProfile) { + if (varLengthDictColumns.contains(columnName)) { + return true; + } + + if (columnStoredType == DataType.BYTES || columnStoredType == DataType.BIG_DECIMAL) { + if (!columnProfile.isFixedLength()) { + return true; + } + } + + return false; + } + /** * Returns the name of the segment associated with this index creation driver. */ diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java index 67ba9ff6e690..9ec3804563f5 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandler.java @@ -26,8 +26,21 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentDictionaryCreator; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.stats.AbstractColumnStatisticsCollector; +import org.apache.pinot.segment.local.segment.creator.impl.stats.BigDecimalColumnPreIndexStatsCollector; +import org.apache.pinot.segment.local.segment.creator.impl.stats.BytesColumnPredIndexStatsCollector; +import org.apache.pinot.segment.local.segment.creator.impl.stats.DoubleColumnPreIndexStatsCollector; +import org.apache.pinot.segment.local.segment.creator.impl.stats.FloatColumnPreIndexStatsCollector; +import org.apache.pinot.segment.local.segment.creator.impl.stats.IntColumnPreIndexStatsCollector; +import org.apache.pinot.segment.local.segment.creator.impl.stats.LongColumnPreIndexStatsCollector; +import org.apache.pinot.segment.local.segment.creator.impl.stats.StringColumnPreIndexStatsCollector; +import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.segment.spi.V1Constants; @@ -35,15 +48,20 @@ import org.apache.pinot.segment.spi.creator.IndexCreationContext; import org.apache.pinot.segment.spi.creator.IndexCreatorProvider; import org.apache.pinot.segment.spi.creator.SegmentVersion; +import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; import org.apache.pinot.segment.spi.store.ColumnIndexType; import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.spi.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.DICTIONARY_ELEMENT_SIZE; +import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.HAS_DICTIONARY; +import static org.apache.pinot.segment.spi.V1Constants.MetadataKeys.Column.getKeyFor; /** * Helper class used by {@link SegmentPreProcessor} to make changes to forward index and dictionary configs. Note * that this handler only works for segment versions >= 3.0. Support for segment version < 3.0 is not added because @@ -52,24 +70,26 @@ * 1. Change compression on raw SV and MV columns. * * TODO: Add support for the following: - * 1. Enable dictionary - * 2. Disable dictionary - * 3. Segment versions < V3 + * 1. Disable dictionary + * 2. Segment versions < V3 */ public class ForwardIndexHandler implements IndexHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ForwardIndexHandler.class); private final SegmentMetadata _segmentMetadata; IndexLoadingConfig _indexLoadingConfig; + Schema _schema; protected enum Operation { - // TODO: Add other operations like ENABLE_DICTIONARY, DISABLE_DICTIONARY. + // TODO: Add other operations like DISABLE_DICTIONARY, CHANGE_RAW_INDEX_COMPRESSION_TYPE, + ENABLE_DICTIONARY } - public ForwardIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) { + public ForwardIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig, Schema schema) { _segmentMetadata = segmentMetadata; _indexLoadingConfig = indexLoadingConfig; + _schema = schema; } @Override @@ -92,10 +112,14 @@ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorPro Operation operation = entry.getValue(); switch (operation) { - case CHANGE_RAW_INDEX_COMPRESSION_TYPE: + case CHANGE_RAW_INDEX_COMPRESSION_TYPE: { rewriteRawForwardIndex(column, segmentWriter, indexCreatorProvider); break; - // TODO: Add other operations here. + } + case ENABLE_DICTIONARY: { + createDictBasedForwardIndex(column, segmentWriter, indexCreatorProvider); + break; + } default: throw new IllegalStateException("Unsupported operation for column " + column); } @@ -127,7 +151,15 @@ Map computeOperation(SegmentDirectory.Reader segmentReader) Set newNoDictColumns = _indexLoadingConfig.getNoDictionaryColumns(); for (String column : existingAllColumns) { - if (existingNoDictColumns.contains(column) && newNoDictColumns.contains(column)) { + if (existingNoDictColumns.contains(column) && !newNoDictColumns.contains(column)) { + if (_schema == null || _indexLoadingConfig.getTableConfig() == null) { + // This can only happen in tests. + LOGGER.warn("Cannot enable dictionary for column={} as schema or tableConfig is null.", column); + continue; + } + // Existing column is RAW. New column is dictionary enabled. + columnOperationMap.put(column, Operation.ENABLE_DICTIONARY); + } else if (existingNoDictColumns.contains(column) && newNoDictColumns.contains(column)) { // Both existing and new column is RAW forward index encoded. Check if compression needs to be changed. if (shouldChangeCompressionType(column, segmentReader)) { columnOperationMap.put(column, Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE); @@ -138,7 +170,8 @@ Map computeOperation(SegmentDirectory.Reader segmentReader) return columnOperationMap; } - private boolean shouldChangeCompressionType(String column, SegmentDirectory.Reader segmentReader) throws Exception { + private boolean shouldChangeCompressionType(String column, SegmentDirectory.Reader segmentReader) + throws Exception { ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column); // The compression type for an existing segment can only be determined by reading the forward index header. @@ -168,7 +201,6 @@ private boolean shouldChangeCompressionType(String column, SegmentDirectory.Read private void rewriteRawForwardIndex(String column, SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider) throws Exception { - Preconditions.checkState(_segmentMetadata.getVersion() == SegmentVersion.v3); ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column); boolean isSingleValue = existingColMetadata.isSingleValue(); @@ -244,8 +276,8 @@ private void rewriteRawMVForwardIndex(String column, ColumnMetadata existingColM .forForwardIndex(newCompressionType, _indexLoadingConfig.getColumnProperties()); try (ForwardIndexCreator creator = indexCreatorProvider.newForwardIndexCreator(context)) { - // If creator stored type and the reader stored type do not match, throw an exception. if (!reader.getStoredType().equals(creator.getValueType())) { + // Creator stored type should match reader stored type for raw columns. We do not support changing datatypes. String failureMsg = "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType() .toString() + " to " + creator.getValueType().toString(); @@ -253,7 +285,7 @@ private void rewriteRawMVForwardIndex(String column, ColumnMetadata existingColM } int numDocs = existingColMetadata.getTotalDocs(); - forwardIndexWriterHelper(column, reader, creator, numDocs); + forwardIndexWriterHelper(column, existingColMetadata, reader, creator, numDocs, null); } } } @@ -271,8 +303,8 @@ private void rewriteRawSVForwardIndex(String column, ColumnMetadata existingColM .forForwardIndex(newCompressionType, _indexLoadingConfig.getColumnProperties()); try (ForwardIndexCreator creator = indexCreatorProvider.newForwardIndexCreator(context)) { - // If creator stored type and the reader stored type do not match, throw an exception. if (!reader.getStoredType().equals(creator.getValueType())) { + // Creator stored type should match reader stored type for raw columns. We do not support changing datatypes. String failureMsg = "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType() .toString() + " to " + creator.getValueType().toString(); @@ -280,109 +312,282 @@ private void rewriteRawSVForwardIndex(String column, ColumnMetadata existingColM } int numDocs = existingColMetadata.getTotalDocs(); - forwardIndexWriterHelper(column, reader, creator, numDocs); + forwardIndexWriterHelper(column, existingColMetadata, reader, creator, numDocs, null); } } } - private void forwardIndexWriterHelper(String column, ForwardIndexReader reader, ForwardIndexCreator creator, - int numDocs) { - // If creator stored type should match reader stored type. We do not support changing datatypes. - if (!reader.getStoredType().equals(creator.getValueType())) { - String failureMsg = - "Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType().toString() - + " to " + creator.getValueType().toString(); - throw new UnsupportedOperationException(failureMsg); - } - + private void forwardIndexWriterHelper(String column, ColumnMetadata existingColumnMetadata, ForwardIndexReader reader, + ForwardIndexCreator creator, int numDocs, @Nullable SegmentDictionaryCreator dictionaryCreator) { ForwardIndexReaderContext readerContext = reader.createContext(); boolean isSVColumn = reader.isSingleValue(); - switch (reader.getStoredType()) { - // JSON fields are either stored as string or bytes. No special handling is needed because we make this - // decision based on the storedType of the reader. - case INT: { - for (int i = 0; i < numDocs; i++) { - if (isSVColumn) { - int val = reader.getInt(i, readerContext); - creator.putInt(val); - } else { - int[] ints = reader.getIntMV(i, readerContext); - creator.putIntMV(ints); - } + if (dictionaryCreator != null) { + int maxNumValuesPerEntry = existingColumnMetadata.getMaxNumberOfMultiValues(); + PinotSegmentColumnReader columnReader = new PinotSegmentColumnReader(reader, null, null, maxNumValuesPerEntry); + + for (int i = 0; i < numDocs; i++) { + Object obj = columnReader.getValue(i); + + if (isSVColumn) { + int dictId = dictionaryCreator.indexOfSV(obj); + creator.putDictId(dictId); + } else { + int[] dictIds = dictionaryCreator.indexOfMV(obj); + creator.putDictIdMV(dictIds); } - break; } - case LONG: { - for (int i = 0; i < numDocs; i++) { - if (isSVColumn) { - long val = reader.getLong(i, readerContext); - creator.putLong(val); - } else { - long[] longs = reader.getLongMV(i, readerContext); - creator.putLongMV(longs); + } else { + switch (reader.getStoredType()) { + // JSON fields are either stored as string or bytes. No special handling is needed because we make this + // decision based on the storedType of the reader. + case INT: { + for (int i = 0; i < numDocs; i++) { + if (isSVColumn) { + int val = reader.getInt(i, readerContext); + creator.putInt(val); + } else { + int[] ints = reader.getIntMV(i, readerContext); + creator.putIntMV(ints); + } } + break; } - break; - } - case FLOAT: { - for (int i = 0; i < numDocs; i++) { - if (isSVColumn) { - float val = reader.getFloat(i, readerContext); - creator.putFloat(val); - } else { - float[] floats = reader.getFloatMV(i, readerContext); - creator.putFloatMV(floats); + case LONG: { + for (int i = 0; i < numDocs; i++) { + if (isSVColumn) { + long val = reader.getLong(i, readerContext); + creator.putLong(val); + } else { + long[] longs = reader.getLongMV(i, readerContext); + creator.putLongMV(longs); + } } + break; } - break; - } - case DOUBLE: { - for (int i = 0; i < numDocs; i++) { - if (isSVColumn) { - double val = reader.getDouble(i, readerContext); - creator.putDouble(val); - } else { - double[] doubles = reader.getDoubleMV(i, readerContext); - creator.putDoubleMV(doubles); + case FLOAT: { + for (int i = 0; i < numDocs; i++) { + if (isSVColumn) { + float val = reader.getFloat(i, readerContext); + creator.putFloat(val); + } else { + float[] floats = reader.getFloatMV(i, readerContext); + creator.putFloatMV(floats); + } } + break; } - break; - } - case STRING: { - for (int i = 0; i < numDocs; i++) { - if (isSVColumn) { - String val = reader.getString(i, readerContext); - creator.putString(val); - } else { - String[] strings = reader.getStringMV(i, readerContext); - creator.putStringMV(strings); + case DOUBLE: { + for (int i = 0; i < numDocs; i++) { + if (isSVColumn) { + double val = reader.getDouble(i, readerContext); + creator.putDouble(val); + } else { + double[] doubles = reader.getDoubleMV(i, readerContext); + creator.putDoubleMV(doubles); + } } + break; } - break; - } - case BYTES: { - for (int i = 0; i < numDocs; i++) { - if (isSVColumn) { - byte[] val = reader.getBytes(i, readerContext); - creator.putBytes(val); - } else { - byte[][] bytesArray = reader.getBytesMV(i, readerContext); - creator.putBytesMV(bytesArray); + case STRING: { + for (int i = 0; i < numDocs; i++) { + if (isSVColumn) { + String val = reader.getString(i, readerContext); + creator.putString(val); + } else { + String[] strings = reader.getStringMV(i, readerContext); + creator.putStringMV(strings); + } } + break; } - break; - } - case BIG_DECIMAL: { - for (int i = 0; i < numDocs; i++) { + case BYTES: { + for (int i = 0; i < numDocs; i++) { + if (isSVColumn) { + byte[] val = reader.getBytes(i, readerContext); + creator.putBytes(val); + } else { + byte[][] bytesArray = reader.getBytesMV(i, readerContext); + creator.putBytesMV(bytesArray); + } + } + break; + } + case BIG_DECIMAL: { Preconditions.checkState(isSVColumn, "BigDecimal is not supported for MV columns"); - BigDecimal val = reader.getBigDecimal(i, readerContext); - creator.putBigDecimal(val); + for (int i = 0; i < numDocs; i++) { + BigDecimal val = reader.getBigDecimal(i, readerContext); + creator.putBigDecimal(val); + } + break; } - break; + default: + throw new IllegalStateException("Unsupported storedType=" + reader.getStoredType() + " for column=" + column); } - default: - throw new IllegalStateException(); } } + + private void createDictBasedForwardIndex(String column, SegmentDirectory.Writer segmentWriter, + IndexCreatorProvider indexCreatorProvider) + throws Exception { + ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column); + boolean isSingleValue = existingColMetadata.isSingleValue(); + + File indexDir = _segmentMetadata.getIndexDir(); + String segmentName = _segmentMetadata.getName(); + File inProgress = new File(indexDir, column + ".dict.inprogress"); + File dictionaryFile = new File(indexDir, column + V1Constants.Dict.FILE_EXTENSION); + String fwdIndexFileExtension; + if (isSingleValue) { + fwdIndexFileExtension = + existingColMetadata.isSorted() ? V1Constants.Indexes.SORTED_SV_FORWARD_INDEX_FILE_EXTENSION + : V1Constants.Indexes.UNSORTED_SV_FORWARD_INDEX_FILE_EXTENSION; + } else { + fwdIndexFileExtension = V1Constants.Indexes.UNSORTED_MV_FORWARD_INDEX_FILE_EXTENSION; + } + File fwdIndexFile = new File(indexDir, column + fwdIndexFileExtension); + + if (!inProgress.exists()) { + // Marker file does not exist, which means last run ended normally. + // Create a marker file. + FileUtils.touch(inProgress); + } else { + // Marker file exists, which means last run was interrupted. + // Remove forward index and dictionary files if they exist. + FileUtils.deleteQuietly(fwdIndexFile); + FileUtils.deleteQuietly(dictionaryFile); + } + + LOGGER.info("Creating a new dictionary for segment={} and column={}", segmentName, column); + SegmentDictionaryCreator dictionaryCreator = buildDictionary(column, existingColMetadata, segmentWriter); + LoaderUtils.writeIndexToV3Format(segmentWriter, column, dictionaryFile, ColumnIndexType.DICTIONARY); + + LOGGER.info("Built dictionary. Rewriting dictionary enabled forward index for segment={} and column={}", + segmentName, column); + writeDictEnabledForwardIndex(column, existingColMetadata, segmentWriter, indexDir, indexCreatorProvider, + dictionaryCreator); + // We used the existing forward index to generate a new forward index. The existing forward index will be in V3 + // format and the new forward index will be in V1 format. Remove the existing forward index as it is not needed + // anymore. Note that removeIndex() will only mark an index for removal and remove the in-memory state. The + // actual cleanup from columns.psf file will happen when singleFileIndexDirectory.cleanupRemovedIndices() is + // called during segmentWriter.close(). + segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX); + LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, ColumnIndexType.FORWARD_INDEX); + + LOGGER.info("Created forwardIndex. Updating metadata properties for segment={} and column={}", segmentName, column); + Map metadataProperties = new HashMap<>(); + metadataProperties.put(getKeyFor(column, HAS_DICTIONARY), String.valueOf(true)); + metadataProperties.put(getKeyFor(column, DICTIONARY_ELEMENT_SIZE), + String.valueOf(dictionaryCreator.getNumBytesPerEntry())); + updateMetadataProperties(indexDir, metadataProperties); + + // We remove indexes that have to be rewritten when a dictEnabled is toggled. Note that the respective index + // handler will take care of recreating the index. + removeDictRelatedIndexes(column, segmentWriter); + + // Delete the marker file. + FileUtils.deleteQuietly(inProgress); + + LOGGER.info("Created dictionary based forward index for segment: {}, column: {}", segmentName, column); + } + + private SegmentDictionaryCreator buildDictionary(String column, ColumnMetadata existingColMetadata, + SegmentDirectory.Writer segmentWriter) + throws Exception { + int numDocs = existingColMetadata.getTotalDocs(); + // SegmentPartitionConfig is not relevant for rewrites. + StatsCollectorConfig statsCollectorConfig = + new StatsCollectorConfig(_indexLoadingConfig.getTableConfig(), _schema, null); + AbstractColumnStatisticsCollector statsCollector; + + try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) { + boolean isSVColumn = reader.isSingleValue(); + + switch (reader.getStoredType()) { + case INT: + statsCollector = new IntColumnPreIndexStatsCollector(column, statsCollectorConfig); + break; + case LONG: + statsCollector = new LongColumnPreIndexStatsCollector(column, statsCollectorConfig); + break; + case FLOAT: + statsCollector = new FloatColumnPreIndexStatsCollector(column, statsCollectorConfig); + break; + case DOUBLE: + statsCollector = new DoubleColumnPreIndexStatsCollector(column, statsCollectorConfig); + break; + case STRING: + statsCollector = new StringColumnPreIndexStatsCollector(column, statsCollectorConfig); + break; + case BYTES: + statsCollector = new BytesColumnPredIndexStatsCollector(column, statsCollectorConfig); + break; + case BIG_DECIMAL: + Preconditions.checkState(isSVColumn, "BigDecimal is not supported for MV columns"); + statsCollector = new BigDecimalColumnPreIndexStatsCollector(column, statsCollectorConfig); + break; + default: + throw new IllegalStateException("Unsupported storedType=" + reader.getStoredType() + " for column=" + column); + } + + // Note: Special Null handling is not necessary here. This is because, the existing default null value in the + // raw forwardIndex will be retained as such while created the dictionary and dict-based forward index. Also, + // null value vectors maintain a bitmap of docIds. No handling is necessary there. + PinotSegmentColumnReader columnReader = + new PinotSegmentColumnReader(reader, null, null, existingColMetadata.getMaxNumberOfMultiValues()); + for (int i = 0; i < numDocs; i++) { + Object obj = columnReader.getValue(i); + statsCollector.collect(obj); + } + statsCollector.seal(); + + boolean useVarLength = SegmentIndexCreationDriverImpl.shouldUseVarLengthDictionary(column, + _indexLoadingConfig.getVarLengthDictionaryColumns(), reader.getStoredType(), statsCollector); + SegmentDictionaryCreator dictionaryCreator = + new SegmentDictionaryCreator(existingColMetadata.getFieldSpec(), _segmentMetadata.getIndexDir(), + useVarLength); + + dictionaryCreator.build(statsCollector.getUniqueValuesSet()); + return dictionaryCreator; + } + } + + private void writeDictEnabledForwardIndex(String column, ColumnMetadata existingColMetadata, + SegmentDirectory.Writer segmentWriter, File indexDir, IndexCreatorProvider indexCreatorProvider, + SegmentDictionaryCreator dictionaryCreator) + throws Exception { + try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) { + int lengthOfLongestEntry = reader.getLengthOfLongestEntry(); + IndexCreationContext.Builder builder = + IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata) + .withLengthOfLongestEntry(lengthOfLongestEntry); + // existingColMetadata has dictEnable=false. Overwrite the value. + builder.withDictionary(true); + IndexCreationContext.Forward context = + builder.build().forForwardIndex(null, _indexLoadingConfig.getColumnProperties()); + + try (ForwardIndexCreator creator = indexCreatorProvider.newForwardIndexCreator(context)) { + int numDocs = existingColMetadata.getTotalDocs(); + forwardIndexWriterHelper(column, existingColMetadata, reader, creator, numDocs, dictionaryCreator); + } + } + } + + private void removeDictRelatedIndexes(String column, SegmentDirectory.Writer segmentWriter) { + // TODO: Move this logic as a static function in each index creator. + segmentWriter.removeIndex(column, ColumnIndexType.RANGE_INDEX); + } + + private void updateMetadataProperties(File indexDir, Map metadataProperties) + throws Exception { + File v3Dir = SegmentDirectoryPaths.segmentDirectoryFor(indexDir, SegmentVersion.v3); + File metadataFile = new File(v3Dir, V1Constants.MetadataKeys.METADATA_FILE_NAME); + PropertiesConfiguration properties = new PropertiesConfiguration(metadataFile); + + for (Map.Entry entry : metadataProperties.entrySet()) { + properties.setProperty(entry.getKey(), entry.getValue()); + } + + properties.save(); + } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java index 6be28d8bb91c..b8d972e46970 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java @@ -29,6 +29,7 @@ import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.store.ColumnIndexType; import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.spi.data.Schema; public class IndexHandlerFactory { @@ -47,7 +48,7 @@ public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) { }; public static IndexHandler getIndexHandler(ColumnIndexType type, SegmentMetadataImpl segmentMetadata, - IndexLoadingConfig indexLoadingConfig) { + IndexLoadingConfig indexLoadingConfig, Schema schema) { switch (type) { case INVERTED_INDEX: return new InvertedIndexHandler(segmentMetadata, indexLoadingConfig); @@ -64,7 +65,7 @@ public static IndexHandler getIndexHandler(ColumnIndexType type, SegmentMetadata case BLOOM_FILTER: return new BloomFilterHandler(segmentMetadata, indexLoadingConfig); case FORWARD_INDEX: - return new ForwardIndexHandler(segmentMetadata, indexLoadingConfig); + return new ForwardIndexHandler(segmentMetadata, indexLoadingConfig, schema); default: return NO_OP_HANDLER; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java index 3da4c2b524a1..34c40dc09763 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessor.java @@ -104,8 +104,17 @@ public void process() // Update single-column indices, like inverted index, json index etc. IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider(); for (ColumnIndexType type : ColumnIndexType.values()) { - IndexHandler handler = IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig); + IndexHandler handler = + IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig, _schema); handler.updateIndices(segmentWriter, indexCreatorProvider); + if (type == ColumnIndexType.FORWARD_INDEX) { + // TODO: Find a way to ensure ForwardIndexHandler is always executed before other handlers instead of + // relying on enum ordering. + // ForwardIndexHandler may modify the segment metadata while rewriting forward index to create a dictionary + // . This new metadata is needed to construct other indexes like RangeIndex. + _segmentMetadata = new SegmentMetadataImpl(indexDir); + _segmentDirectory.reloadMetadata(); + } } // Create/modify/remove star-trees if required. @@ -149,7 +158,7 @@ public boolean needProcess() } // Check if there is need to update single-column indices, like inverted index, json index etc. for (ColumnIndexType type : ColumnIndexType.values()) { - if (IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig) + if (IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig, _schema) .needUpdateIndices(segmentReader)) { LOGGER.info("Found index type: {} needs updates", type); return true; diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java index f70779feed25..f49b7755ce07 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/ForwardIndexHandlerTest.java @@ -20,6 +20,7 @@ import java.io.File; import java.io.IOException; +import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -28,6 +29,7 @@ import java.util.List; import java.util.Map; import java.util.Random; +import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; @@ -41,6 +43,7 @@ import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; import org.apache.pinot.segment.spi.index.IndexingOverrides; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.spi.config.table.FieldConfig; @@ -56,16 +59,15 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; public class ForwardIndexHandlerTest { + private static final BigDecimal BASE_BIG_DECIMAL = BigDecimal.valueOf(new Random().nextDouble()); private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "ForwardIndexHandlerTest"); private static final String TABLE_NAME = "myTable"; private static final String SEGMENT_NAME = "testSegment"; - // TODO: - // 1. Add other datatypes (double, float, bigdecimal, bytes). Also add MV columns. - // 2. Add text index and other index types for raw columns. private static final String DIM_SNAPPY_STRING = "DIM_SNAPPY_STRING"; private static final String DIM_PASS_THROUGH_STRING = "DIM_PASS_THROUGH_STRING"; private static final String DIM_ZSTANDARD_STRING = "DIM_ZSTANDARD_STRING"; @@ -81,6 +83,14 @@ public class ForwardIndexHandlerTest { private static final String DIM_ZSTANDARD_INTEGER = "DIM_ZSTANDARD_INTEGER"; private static final String DIM_LZ4_INTEGER = "DIM_LZ4_INTEGER"; + private static final String DIM_SNAPPY_BYTES = "DIM_SNAPPY_BYTES"; + private static final String DIM_PASS_THROUGH_BYTES = "DIM_PASS_THROUGH_BYTES"; + private static final String DIM_ZSTANDARD_BYTES = "DIM_ZSTANDARD_BYTES"; + private static final String DIM_LZ4_BYTES = "DIM_LZ4_BYTES"; + + // Sorted column + private static final String DIM_PASS_THROUGH_SORTED_LONG = "DIM_PASS_THROUGH_SORTED_LONG"; + // Dictionary columns private static final String DIM_DICT_INTEGER = "DIM_DICT_INTEGER"; private static final String DIM_DICT_STRING = "DIM_DICT_STRING"; @@ -92,6 +102,11 @@ public class ForwardIndexHandlerTest { private static final String METRIC_ZSTANDARD_INTEGER = "METRIC_ZSTANDARD_INTEGER"; private static final String METRIC_LZ4_INTEGER = "METRIC_LZ4_INTEGER"; + private static final String METRIC_SNAPPY_BIG_DECIMAL = "METRIC_SNAPPY_BIG_DECIMAL"; + private static final String METRIC_PASS_THROUGH_BIG_DECIMAL = "METRIC_PASS_THROUGH_BIG_DECIMAL"; + private static final String METRIC_ZSTANDARD_BIG_DECIMAL = "METRIC_ZSTANDARD_BIG_DECIMAL"; + private static final String METRIC_LZ4_BIG_DECIMAL = "METRIC_LZ4_BIG_DECIMAL"; + // Multivalue columns private static final String DIM_MV_PASS_THROUGH_INTEGER = "DIM_MV_PASS_THROUGH_INTEGER"; private static final String DIM_MV_PASS_THROUGH_LONG = "DIM_MV_PASS_THROUGH_LONG"; @@ -99,18 +114,22 @@ public class ForwardIndexHandlerTest { private static final String DIM_MV_PASS_THROUGH_BYTES = "DIM_MV_PASS_THROUGH_BYTES"; private static final List RAW_SNAPPY_INDEX_COLUMNS = - Arrays.asList(DIM_SNAPPY_STRING, DIM_SNAPPY_LONG, DIM_SNAPPY_INTEGER, METRIC_SNAPPY_INTEGER); + Arrays.asList(DIM_SNAPPY_STRING, DIM_SNAPPY_LONG, DIM_SNAPPY_INTEGER, DIM_SNAPPY_BYTES, METRIC_SNAPPY_BIG_DECIMAL, + METRIC_SNAPPY_INTEGER); private static final List RAW_ZSTANDARD_INDEX_COLUMNS = - Arrays.asList(DIM_ZSTANDARD_STRING, DIM_ZSTANDARD_LONG, DIM_ZSTANDARD_INTEGER, METRIC_ZSTANDARD_INTEGER); + Arrays.asList(DIM_ZSTANDARD_STRING, DIM_ZSTANDARD_LONG, DIM_ZSTANDARD_INTEGER, DIM_ZSTANDARD_BYTES, + METRIC_ZSTANDARD_BIG_DECIMAL, METRIC_ZSTANDARD_INTEGER); private static final List RAW_PASS_THROUGH_INDEX_COLUMNS = - Arrays.asList(DIM_PASS_THROUGH_STRING, DIM_PASS_THROUGH_LONG, DIM_PASS_THROUGH_INTEGER, - METRIC_PASS_THROUGH_INTEGER, DIM_MV_PASS_THROUGH_INTEGER, DIM_MV_PASS_THROUGH_LONG, - DIM_MV_PASS_THROUGH_STRING, DIM_MV_PASS_THROUGH_BYTES); + Arrays.asList(DIM_PASS_THROUGH_STRING, DIM_PASS_THROUGH_LONG, DIM_PASS_THROUGH_INTEGER, DIM_PASS_THROUGH_BYTES, + METRIC_PASS_THROUGH_BIG_DECIMAL, METRIC_PASS_THROUGH_INTEGER, DIM_MV_PASS_THROUGH_INTEGER, + DIM_MV_PASS_THROUGH_LONG, DIM_MV_PASS_THROUGH_STRING, DIM_MV_PASS_THROUGH_BYTES, + DIM_PASS_THROUGH_SORTED_LONG); private static final List RAW_LZ4_INDEX_COLUMNS = - Arrays.asList(DIM_LZ4_STRING, DIM_LZ4_LONG, DIM_LZ4_INTEGER, METRIC_LZ4_INTEGER); + Arrays.asList(DIM_LZ4_STRING, DIM_LZ4_LONG, DIM_LZ4_INTEGER, DIM_LZ4_BYTES, METRIC_LZ4_BIG_DECIMAL, + METRIC_LZ4_INTEGER); private final List _noDictionaryColumns = new ArrayList<>(); TableConfig _tableConfig; @@ -177,9 +196,18 @@ private void buildSegment() .addSingleValueDimension(DIM_ZSTANDARD_LONG, FieldSpec.DataType.LONG) .addSingleValueDimension(DIM_PASS_THROUGH_LONG, FieldSpec.DataType.LONG) .addSingleValueDimension(DIM_LZ4_LONG, FieldSpec.DataType.LONG) + .addSingleValueDimension(DIM_SNAPPY_BYTES, FieldSpec.DataType.BYTES) + .addSingleValueDimension(DIM_PASS_THROUGH_BYTES, FieldSpec.DataType.BYTES) + .addSingleValueDimension(DIM_ZSTANDARD_BYTES, FieldSpec.DataType.BYTES) + .addSingleValueDimension(DIM_LZ4_BYTES, FieldSpec.DataType.BYTES) + .addMetric(METRIC_SNAPPY_BIG_DECIMAL, FieldSpec.DataType.BIG_DECIMAL) + .addMetric(METRIC_PASS_THROUGH_BIG_DECIMAL, FieldSpec.DataType.BIG_DECIMAL) + .addMetric(METRIC_ZSTANDARD_BIG_DECIMAL, FieldSpec.DataType.BIG_DECIMAL) + .addMetric(METRIC_LZ4_BIG_DECIMAL, FieldSpec.DataType.BIG_DECIMAL) .addSingleValueDimension(DIM_DICT_INTEGER, FieldSpec.DataType.INT) .addSingleValueDimension(DIM_DICT_LONG, FieldSpec.DataType.LONG) .addSingleValueDimension(DIM_DICT_STRING, FieldSpec.DataType.STRING) + .addSingleValueDimension(DIM_PASS_THROUGH_SORTED_LONG, FieldSpec.DataType.LONG) .addMetric(METRIC_PASS_THROUGH_INTEGER, FieldSpec.DataType.INT) .addMetric(METRIC_SNAPPY_INTEGER, FieldSpec.DataType.INT).addMetric(METRIC_LZ4_INTEGER, FieldSpec.DataType.INT) .addMetric(METRIC_ZSTANDARD_INTEGER, FieldSpec.DataType.INT) @@ -210,6 +238,8 @@ private List createTestData() { String[] tempStringRows = new String[rowLength]; Integer[] tempIntRows = new Integer[rowLength]; Long[] tempLongRows = new Long[rowLength]; + byte[][] tempBytesRows = new byte[rowLength][]; + BigDecimal[] tempBigDecimalRows = new BigDecimal[rowLength]; int maxNumberOfMVEntries = random.nextInt(500); String[][] tempMVStringRows = new String[rowLength][maxNumberOfMVEntries]; @@ -220,30 +250,34 @@ private List createTestData() { for (int i = 0; i < rowLength; i++) { //Adding a fixed value to check for filter queries if (i % 10 == 0) { - tempStringRows[i] = "testRow"; + String str = "testRow"; + tempStringRows[i] = str; tempIntRows[i] = 1001; tempLongRows[i] = 1001L; + tempBytesRows[i] = str.getBytes(); + tempBigDecimalRows[i] = BASE_BIG_DECIMAL.add(BigDecimal.valueOf(1001)); // Avoid creating empty arrays. int numMVElements = random.nextInt(maxNumberOfMVEntries - 1) + 1; for (int j = 0; j < numMVElements; j++) { tempMVIntRows[i][j] = 1001; tempMVLongRows[i][j] = 1001L; - String str = "testRow"; tempMVStringRows[i][j] = str; tempMVByteRows[i][j] = (byte[]) str.getBytes(); } } else { - tempStringRows[i] = "n" + i; + String str = "n" + i; + tempStringRows[i] = str; tempIntRows[i] = i; tempLongRows[i] = (long) i; + tempBytesRows[i] = (byte[]) str.getBytes(); + tempBigDecimalRows[i] = BASE_BIG_DECIMAL.add(BigDecimal.valueOf(i)); // Avoid creating empty arrays. int numMVElements = random.nextInt(maxNumberOfMVEntries - 1) + 1; for (int j = 0; j < numMVElements; j++) { tempMVIntRows[i][j] = j; tempMVLongRows[i][j] = (long) j; - String str = "n" + i; tempMVStringRows[i][j] = str; tempMVByteRows[i][j] = (byte[]) str.getBytes(); } @@ -275,6 +309,18 @@ private List createTestData() { row.putValue(DIM_PASS_THROUGH_LONG, tempLongRows[i]); row.putValue(DIM_LZ4_LONG, tempLongRows[i]); + // Raw Byte columns + row.putValue(DIM_SNAPPY_BYTES, tempBytesRows[i]); + row.putValue(DIM_ZSTANDARD_BYTES, tempBytesRows[i]); + row.putValue(DIM_PASS_THROUGH_BYTES, tempBytesRows[i]); + row.putValue(DIM_LZ4_BYTES, tempBytesRows[i]); + + // Raw BigDecimal column + row.putValue(METRIC_SNAPPY_BIG_DECIMAL, tempBigDecimalRows[i]); + row.putValue(METRIC_ZSTANDARD_BIG_DECIMAL, tempBigDecimalRows[i]); + row.putValue(METRIC_PASS_THROUGH_BIG_DECIMAL, tempBigDecimalRows[i]); + row.putValue(METRIC_LZ4_BIG_DECIMAL, tempBigDecimalRows[i]); + // Dictionary columns row.putValue(DIM_DICT_INTEGER, tempIntRows[i]); row.putValue(DIM_DICT_LONG, tempLongRows[i]); @@ -286,12 +332,15 @@ private List createTestData() { row.putValue(DIM_MV_PASS_THROUGH_STRING, tempMVStringRows[i]); row.putValue(DIM_MV_PASS_THROUGH_BYTES, tempMVByteRows[i]); + // Sorted columns + row.putValue(DIM_PASS_THROUGH_SORTED_LONG, (long) i); + rows.add(row); } return rows; } - @Test + @Test(priority = 0) public void testComputeOperation() throws Exception { // Setup @@ -302,34 +351,62 @@ public void testComputeOperation() // TEST1 : Validate with zero changes. ForwardIndexHandler should be a No-Op. IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig); - ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig); + ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, null); Map operationMap = new HashMap<>(); operationMap = fwdIndexHandler.computeOperation(writer); assertEquals(operationMap, Collections.EMPTY_MAP); - // TEST2: Enable dictionary for a RAW_ZSTANDARD_INDEX_COLUMN. ForwardIndexHandler should be a No-Op. + // TEST2: Enable dictionary for a RAW_ZSTANDARD_INDEX_COLUMN. indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig); indexLoadingConfig.getNoDictionaryColumns().remove(DIM_ZSTANDARD_STRING); - fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig); + fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, _schema); + operationMap = fwdIndexHandler.computeOperation(writer); + assertEquals(operationMap.get(DIM_ZSTANDARD_STRING), ForwardIndexHandler.Operation.ENABLE_DICTIONARY); + + // TEST3: Enable dictionary for an MV column. + indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig); + indexLoadingConfig.getNoDictionaryColumns().remove(DIM_MV_PASS_THROUGH_STRING); + fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, _schema); + operationMap = fwdIndexHandler.computeOperation(writer); + assertEquals(operationMap.get(DIM_MV_PASS_THROUGH_STRING), ForwardIndexHandler.Operation.ENABLE_DICTIONARY); + + // TEST4: Enable dictionary for a sorted column. + indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig); + indexLoadingConfig.getNoDictionaryColumns().remove(DIM_PASS_THROUGH_SORTED_LONG); + fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, _schema); + operationMap = fwdIndexHandler.computeOperation(writer); + assertEquals(operationMap.get(DIM_PASS_THROUGH_SORTED_LONG), ForwardIndexHandler.Operation.ENABLE_DICTIONARY); + + // TEST5: Enable dictionary for a dict column. Should be a No-op. + indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig); + fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, _schema); operationMap = fwdIndexHandler.computeOperation(writer); assertEquals(operationMap, Collections.EMPTY_MAP); - // TEST3: Disable dictionary. ForwardIndexHandler should be a No-Op. + // TEST6: Disable dictionary. Should be a No-op. indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig); indexLoadingConfig.getNoDictionaryColumns().add(DIM_DICT_INTEGER); - fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig); + fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, _schema); operationMap = fwdIndexHandler.computeOperation(writer); assertEquals(operationMap, Collections.EMPTY_MAP); - // TEST4: Add an additional text index. ForwardIndexHandler should be a No-Op. + // TEST7: Add an additional text index. ForwardIndexHandler should be a No-Op. indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig); indexLoadingConfig.getTextIndexColumns().add(DIM_DICT_INTEGER); indexLoadingConfig.getTextIndexColumns().add(DIM_LZ4_INTEGER); - fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig); + fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, _schema); operationMap = fwdIndexHandler.computeOperation(writer); assertEquals(operationMap, Collections.EMPTY_MAP); - // TEST5: Change compression + // TEST8: Add text index and disable forward index. + indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig); + indexLoadingConfig.getRangeIndexColumns().add(METRIC_LZ4_INTEGER); + indexLoadingConfig.getNoDictionaryColumns().remove(METRIC_LZ4_INTEGER); + fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, _schema); + operationMap = fwdIndexHandler.computeOperation(writer); + assertEquals(operationMap.get(METRIC_LZ4_INTEGER), ForwardIndexHandler.Operation.ENABLE_DICTIONARY); + + // TEST8: Change compression Random rand = new Random(); // Create new tableConfig with the modified fieldConfigs. @@ -352,12 +429,12 @@ public void testComputeOperation() tableConfig.setFieldConfigList(fieldConfigs); indexLoadingConfig = new IndexLoadingConfig(null, tableConfig); - fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig); + fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, null); operationMap = fwdIndexHandler.computeOperation(writer); assertEquals(operationMap.size(), 1); assertEquals(operationMap.get(config.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE); - // TEST6: Change compression and add index. Change compressionType for more than 1 column. + // TEST9: Change compression and add index. Change compressionType for more than 1 column. fieldConfigs = new ArrayList<>(_tableConfig.getFieldConfigList()); FieldConfig config1 = fieldConfigs.remove(0); FieldConfig config2 = fieldConfigs.remove(1); @@ -377,7 +454,7 @@ public void testComputeOperation() indexLoadingConfig = new IndexLoadingConfig(null, tableConfig); indexLoadingConfig.getTextIndexColumns().add(config1.getName()); indexLoadingConfig.getInvertedIndexColumns().add(config1.getName()); - fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig); + fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, null); operationMap = fwdIndexHandler.computeOperation(writer); assertEquals(operationMap.size(), 2); assertEquals(operationMap.get(config1.getName()), ForwardIndexHandler.Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE); @@ -387,7 +464,7 @@ public void testComputeOperation() segmentLocalFSDirectory.close(); } - @Test + @Test(priority = 1) public void testRewriteRawForwardIndexForSingleColumn() throws Exception { for (int i = 0; i < _noDictionaryColumns.size(); i++) { @@ -415,24 +492,31 @@ public void testRewriteRawForwardIndexForSingleColumn() IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, tableConfig); IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider(); - ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig); + ForwardIndexHandler fwdIndexHandler = + new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, null); boolean val = fwdIndexHandler.needUpdateIndices(writer); - - - fwdIndexHandler.updateIndices(writer, indexCreatorProvider); // Tear down before validation. Because columns.psf and index map cleanup happens at segmentDirectory.close() segmentLocalFSDirectory.close(); // Validation - validateIndexMap(); + validateIndexMap(columnName, false); validateForwardIndex(columnName, newCompressionType); + + // Validate metadata properties. Nothing should change when a forwardIndex is rewritten for compressionType + // change. + ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(columnName); + validateMetadataProperties(columnName, metadata.hasDictionary(), metadata.getColumnMaxLength(), + metadata.getCardinality(), metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), + metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), + metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), + metadata.getMaxValue()); } } } - @Test + @Test(priority = 2) public void testRewriteRawForwardIndexForMultipleColumns() throws Exception { // Setup @@ -469,18 +553,131 @@ public void testRewriteRawForwardIndexForMultipleColumns() IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, tableConfig); IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider(); - ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig); + ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, null); fwdIndexHandler.updateIndices(writer, indexCreatorProvider); // Tear down before validation. Because columns.psf and index map cleanup happens at segmentDirectory.close() segmentLocalFSDirectory.close(); - validateIndexMap(); + validateIndexMap(column1, false); validateForwardIndex(column1, newCompressionType); + // Validate metadata properties. Nothing should change when a forwardIndex is rewritten for compressionType + // change. + ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(column1); + validateMetadataProperties(column1, metadata.hasDictionary(), metadata.getColumnMaxLength(), + metadata.getCardinality(), metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), + metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), + metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue()); + + validateIndexMap(column2, false); validateForwardIndex(column2, newCompressionType); + metadata = existingSegmentMetadata.getColumnMetadataFor(column2); + validateMetadataProperties(column2, metadata.hasDictionary(), metadata.getColumnMaxLength(), + metadata.getCardinality(), metadata.getTotalDocs(), metadata.getDataType(), metadata.getFieldType(), + metadata.isSorted(), metadata.isSingleValue(), metadata.getMaxNumberOfMultiValues(), + metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), metadata.getMinValue(), metadata.getMaxValue()); + } + + @Test(priority = 3) + public void testEnableDictionaryForMultipleColumns() + throws Exception { + SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory); + SegmentDirectory segmentLocalFSDirectory = + new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap); + SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter(); + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig); + + Random rand = new Random(); + String col1 = _noDictionaryColumns.get(rand.nextInt(_noDictionaryColumns.size())); + indexLoadingConfig.getNoDictionaryColumns().remove(col1); + String col2 = _noDictionaryColumns.get(rand.nextInt(_noDictionaryColumns.size())); + indexLoadingConfig.getNoDictionaryColumns().remove(col2); + + ForwardIndexHandler fwdIndexHandler = new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, _schema); + IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider(); + fwdIndexHandler.updateIndices(writer, indexCreatorProvider); + + // Tear down before validation. Because columns.psf and index map cleanup happens at segmentDirectory.close() + segmentLocalFSDirectory.close(); + + // Col1 validation. + validateIndexMap(col1, true); + validateForwardIndex(col1, null); + // In column metadata, nothing other than hasDictionary and dictionaryElementSize should change. + int dictionaryElementSize = 0; + ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(col1); + FieldSpec.DataType dataType = metadata.getDataType(); + if (dataType == FieldSpec.DataType.STRING || dataType == FieldSpec.DataType.BYTES) { + // This value is based on the rows in createTestData(). + dictionaryElementSize = 7; + } else if (dataType == FieldSpec.DataType.BIG_DECIMAL) { + dictionaryElementSize = 11; + } + validateMetadataProperties(col1, true, dictionaryElementSize, metadata.getCardinality(), metadata.getTotalDocs(), + dataType, metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), + metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), + metadata.getMinValue(), metadata.getMaxValue()); + + // Col2 validation. + validateIndexMap(col2, true); + validateForwardIndex(col2, null); + // In column metadata, nothing other than hasDictionary and dictionaryElementSize should change. + dictionaryElementSize = 0; + metadata = existingSegmentMetadata.getColumnMetadataFor(col2); + dataType = metadata.getDataType(); + if (dataType == FieldSpec.DataType.STRING || dataType == FieldSpec.DataType.BYTES) { + // This value is based on the rows in createTestData(). + dictionaryElementSize = 7; + } else if (dataType == FieldSpec.DataType.BIG_DECIMAL) { + dictionaryElementSize = 11; + } + validateMetadataProperties(col2, true, dictionaryElementSize, metadata.getCardinality(), metadata.getTotalDocs(), + dataType, metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), + metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), + metadata.getMinValue(), metadata.getMaxValue()); + } + + @Test(priority = 4) + public void testEnableDictionaryForSingleColumn() + throws Exception { + for (int i = 0; i < _noDictionaryColumns.size(); i++) { + SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory); + SegmentDirectory segmentLocalFSDirectory = + new SegmentLocalFSDirectory(_segmentDirectory, existingSegmentMetadata, ReadMode.mmap); + SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter(); + + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(null, _tableConfig); + String column = _noDictionaryColumns.get(i); + indexLoadingConfig.getNoDictionaryColumns().remove(column); + ForwardIndexHandler fwdIndexHandler = + new ForwardIndexHandler(existingSegmentMetadata, indexLoadingConfig, _schema); + IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider(); + fwdIndexHandler.updateIndices(writer, indexCreatorProvider); + + // Tear down before validation. Because columns.psf and index map cleanup happens at segmentDirectory.close() + segmentLocalFSDirectory.close(); + + validateIndexMap(column, true); + validateForwardIndex(column, null); + + // In column metadata, nothing other than hasDictionary and dictionaryElementSize should change. + int dictionaryElementSize = 0; + ColumnMetadata metadata = existingSegmentMetadata.getColumnMetadataFor(column); + FieldSpec.DataType dataType = metadata.getDataType(); + if (dataType == FieldSpec.DataType.STRING || dataType == FieldSpec.DataType.BYTES) { + // This value is based on the rows in createTestData(). + dictionaryElementSize = 7; + } else if (dataType == FieldSpec.DataType.BIG_DECIMAL) { + dictionaryElementSize = 11; + } + validateMetadataProperties(column, true, dictionaryElementSize, metadata.getCardinality(), + metadata.getTotalDocs(), dataType, metadata.getFieldType(), metadata.isSorted(), metadata.isSingleValue(), + metadata.getMaxNumberOfMultiValues(), metadata.getTotalNumberOfEntries(), metadata.isAutoGenerated(), + metadata.getMinValue(), metadata.getMaxValue()); + } } - private void validateForwardIndex(String columnName, FieldConfig.CompressionCodec expectedCompressionType) + private void validateForwardIndex(String columnName, @Nullable FieldConfig.CompressionCodec expectedCompressionType) throws IOException { // Setup SegmentMetadataImpl existingSegmentMetadata = new SegmentMetadataImpl(_segmentDirectory); @@ -489,21 +686,27 @@ private void validateForwardIndex(String columnName, FieldConfig.CompressionCode SegmentDirectory.Writer writer = segmentLocalFSDirectory.createWriter(); ColumnMetadata columnMetadata = existingSegmentMetadata.getColumnMetadataFor(columnName); boolean isSingleValue = columnMetadata.isSingleValue(); + boolean isSorted = columnMetadata.isSorted(); // Check Compression type in header ForwardIndexReader fwdIndexReader = LoaderUtils.getForwardIndexReader(writer, columnMetadata); ChunkCompressionType fwdIndexCompressionType = fwdIndexReader.getCompressionType(); - assertEquals(fwdIndexCompressionType.name(), expectedCompressionType.name()); + if (expectedCompressionType != null) { + assertEquals(fwdIndexCompressionType.name(), expectedCompressionType.name()); + } try (ForwardIndexReader forwardIndexReader = LoaderUtils.getForwardIndexReader(writer, columnMetadata)) { - PinotSegmentColumnReader columnReader = - new PinotSegmentColumnReader(forwardIndexReader, null, null, columnMetadata.getMaxNumberOfMultiValues()); + Dictionary dictionary = null; + if (columnMetadata.hasDictionary()) { + dictionary = LoaderUtils.getDictionary(writer, columnMetadata); + } + PinotSegmentColumnReader columnReader = new PinotSegmentColumnReader(forwardIndexReader, dictionary, null, + columnMetadata.getMaxNumberOfMultiValues()); for (int rowIdx = 0; rowIdx < columnMetadata.getTotalDocs(); rowIdx++) { if (rowIdx % 10 == 0) { Object val = columnReader.getValue(rowIdx); - - FieldSpec.DataType dataType = forwardIndexReader.getStoredType(); + FieldSpec.DataType dataType = columnMetadata.getDataType(); switch (dataType) { case STRING: { @@ -532,7 +735,11 @@ private void validateForwardIndex(String columnName, FieldConfig.CompressionCode } case LONG: { if (isSingleValue) { - assertEquals((long) val, 1001L, columnName + " " + rowIdx + " " + expectedCompressionType); + if (isSorted) { + assertEquals((long) val, rowIdx, columnName + " " + rowIdx + " " + expectedCompressionType); + } else { + assertEquals((long) val, 1001L, columnName + " " + rowIdx + " " + expectedCompressionType); + } } else { Object[] values = (Object[]) val; int length = values.length; @@ -556,6 +763,11 @@ private void validateForwardIndex(String columnName, FieldConfig.CompressionCode } break; } + case BIG_DECIMAL: { + assertTrue(isSingleValue); + assertEquals((BigDecimal) val, BASE_BIG_DECIMAL.add(BigDecimal.valueOf(1001))); + break; + } default: // Unreachable code. throw new IllegalStateException("Invalid datatype for column=" + columnName); @@ -565,15 +777,41 @@ private void validateForwardIndex(String columnName, FieldConfig.CompressionCode } } - private void validateIndexMap() + private void validateIndexMap(String columnName, boolean dictionaryEnabled) throws IOException { // Panic validation to make sure all columns have only one forward index entry in index map. - for (String columnName : _noDictionaryColumns) { - String segmentDir = INDEX_DIR + "/" + SEGMENT_NAME + "/v3"; - File idxMapFile = new File(segmentDir, V1Constants.INDEX_MAP_FILE_NAME); - String indexMapStr = FileUtils.readFileToString(idxMapFile, StandardCharsets.UTF_8); - assertEquals(StringUtils.countMatches(indexMapStr, columnName + ".forward_index" + ".startOffset"), 1); - assertEquals(StringUtils.countMatches(indexMapStr, columnName + ".forward_index" + ".size"), 1); + String segmentDir = INDEX_DIR + "/" + SEGMENT_NAME + "/v3"; + File idxMapFile = new File(segmentDir, V1Constants.INDEX_MAP_FILE_NAME); + String indexMapStr = FileUtils.readFileToString(idxMapFile, StandardCharsets.UTF_8); + assertEquals(StringUtils.countMatches(indexMapStr, columnName + ".forward_index" + ".startOffset"), 1, columnName); + assertEquals(StringUtils.countMatches(indexMapStr, columnName + ".forward_index" + ".size"), 1, columnName); + + if (dictionaryEnabled) { + assertEquals(StringUtils.countMatches(indexMapStr, columnName + ".dictionary" + ".startOffset"), 1, columnName); + assertEquals(StringUtils.countMatches(indexMapStr, columnName + ".dictionary" + ".size"), 1, columnName); } } + + private void validateMetadataProperties(String column, boolean hasDictionary, int dictionaryElementSize, + int cardinality, int totalDocs, FieldSpec.DataType dataType, FieldSpec.FieldType fieldType, boolean isSorted, + boolean isSingleValue, int maxNumberOfMVEntries, int totalNumberOfEntries, boolean isAutoGenerated, + Comparable minValue, Comparable maxValue) + throws IOException { + SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_segmentDirectory); + ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column); + + assertEquals(columnMetadata.hasDictionary(), hasDictionary, column); + assertEquals(columnMetadata.getColumnMaxLength(), dictionaryElementSize, column); + assertEquals(columnMetadata.getCardinality(), cardinality, column); + assertEquals(columnMetadata.getTotalDocs(), totalDocs, column); + assertTrue(columnMetadata.getDataType().equals(dataType), column); + assertTrue(columnMetadata.getFieldType().equals(fieldType)); + assertEquals(columnMetadata.isSorted(), isSorted); + assertEquals(columnMetadata.isSingleValue(), isSingleValue); + assertEquals(columnMetadata.getMaxNumberOfMultiValues(), maxNumberOfMVEntries); + assertEquals(columnMetadata.getTotalNumberOfEntries(), totalNumberOfEntries); + assertEquals(columnMetadata.isAutoGenerated(), isAutoGenerated); + assertEquals(columnMetadata.getMinValue(), minValue); + assertEquals(columnMetadata.getMaxValue(), maxValue); + } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java index 85ffeb555d67..4ad93b8c6e70 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java @@ -98,6 +98,9 @@ public class SegmentPreProcessorTest { private static final String NEWLY_ADDED_STRING_MV_COL_RAW = "newTextMVColRaw"; private static final String NEWLY_ADDED_STRING_MV_COL_DICT = "newTextMVColDict"; + // For int RAW column + private static final String EXISTING_INT_COL_RAW = "column2"; + // For raw MV column. private static final String EXISTING_INT_COL_RAW_MV = "column6"; @@ -150,12 +153,6 @@ public void setUp() props.put(IndexLoadingConfig.READ_MODE_KEY, ReadMode.mmap.toString()); _configuration = new PinotConfiguration(props); - // We specify two columns without inverted index ('column1', 'column13'), one non-existing column ('noSuchColumn') - // and one column with existed inverted index ('column7'). - _indexLoadingConfig = new IndexLoadingConfig(); - _indexLoadingConfig.setInvertedIndexColumns( - new HashSet<>(Arrays.asList(COLUMN1_NAME, COLUMN7_NAME, COLUMN13_NAME, NO_SUCH_COLUMN_NAME))); - // The segment generation code in SegmentColumnarIndexCreator will throw // exception if start and end time in time column are not in acceptable // range. For this test, we first need to fix the input avro data @@ -167,6 +164,13 @@ public void setUp() _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch") .setIngestionConfig(ingestionConfig).build(); + _indexLoadingConfig = getDefaultIndexLoadingConfig(); + + // We specify two columns without inverted index ('column1', 'column13'), one non-existing column ('noSuchColumn') + // and one column with existed inverted index ('column7'). + _indexLoadingConfig.setInvertedIndexColumns( + new HashSet<>(Arrays.asList(COLUMN1_NAME, COLUMN7_NAME, COLUMN13_NAME, NO_SUCH_COLUMN_NAME))); + _indexLoadingConfig.setTableConfig(_tableConfig); ClassLoader classLoader = getClass().getClassLoader(); @@ -208,19 +212,43 @@ public void tearDown() FileUtils.deleteQuietly(INDEX_DIR); } - private void constructV1Segment() + private IndexLoadingConfig getDefaultIndexLoadingConfig() { + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(); + + // Set RAW columns. Otherwise, they will end up being converted to dict columns (default) during segment reload. + indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW); + indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW_MV); + indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW); + return indexLoadingConfig; + } + + private void constructV1Segment(List invertedIndexCols, List textIndexCols, + List rangeIndexCols) throws Exception { FileUtils.deleteQuietly(INDEX_DIR); List rawCols = new ArrayList<>(); rawCols.add(EXISTING_STRING_COL_RAW); rawCols.add(EXISTING_INT_COL_RAW_MV); + rawCols.add(EXISTING_INT_COL_RAW); // Create inverted index for 'column7' when constructing the segment. SegmentGeneratorConfig segmentGeneratorConfig = SegmentTestUtils.getSegmentGeneratorConfigWithSchema(_avroFile, INDEX_DIR, "testTable", _tableConfig, _schema); - segmentGeneratorConfig.setInvertedIndexCreationColumns(Collections.singletonList(COLUMN7_NAME)); segmentGeneratorConfig.setRawIndexCreationColumns(rawCols); + segmentGeneratorConfig.setInvertedIndexCreationColumns(Collections.singletonList(COLUMN7_NAME)); + if (invertedIndexCols.size() > 0) { + for (String col : invertedIndexCols) { + segmentGeneratorConfig.getInvertedIndexCreationColumns().add(col); + } + } + if (textIndexCols.size() > 0) { + segmentGeneratorConfig.setTextIndexCreationColumns(textIndexCols); + } + if (rangeIndexCols.size() > 0) { + segmentGeneratorConfig.setRangeIndexCreationColumns(rangeIndexCols); + } + SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null); driver.init(segmentGeneratorConfig); driver.build(); @@ -230,7 +258,7 @@ private void constructV1Segment() private void constructV3Segment() throws Exception { - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); new SegmentV1V2ToV3FormatConverter().convert(_indexDir); } @@ -263,7 +291,7 @@ public void testEnableTextIndexOnNewColumnRaw() 1); // Create a segment in V1, add a new raw column with text index enabled - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); segmentMetadata = new SegmentMetadataImpl(_indexDir); columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_STRING_COL_RAW); // should be null since column does not exist in the schema @@ -277,7 +305,6 @@ public void testEnableFSTIndexOnExistingColumnRaw() Set fstColumns = new HashSet<>(); fstColumns.add(EXISTING_STRING_COL_RAW); _indexLoadingConfig.setFSTIndexColumns(fstColumns); - _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW); constructV3Segment(); SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), @@ -286,7 +313,7 @@ public void testEnableFSTIndexOnExistingColumnRaw() new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchemaWithFST); expectThrows(UnsupportedOperationException.class, () -> v3Processor.process()); - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(_indexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); SegmentPreProcessor v1Processor = @@ -304,7 +331,7 @@ public void testEnableFSTIndexOnNewColumnDictEncoded() constructV3Segment(); checkFSTIndexCreation(NEWLY_ADDED_FST_COL_DICT, 1, 1, _newColumnsSchemaWithFST, true, true, 4); - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); checkFSTIndexCreation(NEWLY_ADDED_FST_COL_DICT, 1, 1, _newColumnsSchemaWithFST, true, true, 4); } @@ -321,7 +348,7 @@ public void testEnableFSTIndexOnExistingColumnDictEncoded() assertNotNull(columnMetadata); checkFSTIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _newColumnsSchemaWithFST, false, false, 26); - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); segmentMetadata = new SegmentMetadataImpl(_indexDir); columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_DICT); assertNotNull(columnMetadata); @@ -329,7 +356,98 @@ public void testEnableFSTIndexOnExistingColumnDictEncoded() } @Test - public void testForwardIndexHandler() + public void testForwardIndexHandlerEnableDictionary() + throws Exception { + // Add raw columns in indexingConfig so that the ForwardIndexHandler doesn't end up converting them to dictionary + // enabled columns + _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW_MV); + _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW); + _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW); + + // TEST 1. Check running forwardIndexHandler on a V1 segment. No-op for all existing raw columns. + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + checkForwardIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _schema, false, true, false, 26, null, true, 0, + DataType.STRING, 100000); + validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true, + 0, ChunkCompressionType.LZ4, false, DataType.STRING, 100000); + validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_INT_COL_RAW_MV, 18499, 15, _schema, false, false, false, 0, + false, 13, ChunkCompressionType.LZ4, false, DataType.INT, 106688); + validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_INT_COL_RAW, 42242, 16, _schema, false, false, false, 0, true, + 0, ChunkCompressionType.LZ4, false, DataType.INT, 100000); + + // Convert the segment to V3. + new SegmentV1V2ToV3FormatConverter().convert(_indexDir); + + // TEST 2: Run reload with no-changes. + checkForwardIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _schema, false, true, false, 26, null, true, 0, + DataType.STRING, 100000); + + // TEST 3: EXISTING_STRING_COL_RAW. Enable dictionary. Also add inverted index and text index. Reload code path + // will create dictionary, inverted index and text index. + _indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_STRING_COL_RAW); + _indexLoadingConfig.getInvertedIndexColumns().add(EXISTING_STRING_COL_RAW); + _indexLoadingConfig.getTextIndexColumns().add(EXISTING_STRING_COL_RAW); + checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, true, false, 4, null, true, 0, + DataType.STRING, 100000); + validateIndex(ColumnIndexType.INVERTED_INDEX, EXISTING_STRING_COL_RAW, 5, 3, _schema, false, true, false, 4, true, + 0, null, false, DataType.STRING, 100000); + validateIndex(ColumnIndexType.TEXT_INDEX, EXISTING_STRING_COL_RAW, 5, 3, _schema, false, true, false, 4, true, 0, + null, false, DataType.STRING, 100000); + + // TEST4: EXISTING_STRING_COL_RAW. Enable dictionary on a raw column that already has text index. + List textIndexCols = new ArrayList<>(); + textIndexCols.add(EXISTING_STRING_COL_RAW); + constructV1Segment(Collections.emptyList(), textIndexCols, Collections.emptyList()); + new SegmentV1V2ToV3FormatConverter().convert(_indexDir); + validateIndex(ColumnIndexType.TEXT_INDEX, EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, true, 0, + null, false, DataType.STRING, 100000); + + // At this point, the segment has text index. Now, the reload path should create a dictionary. + checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, true, false, 4, null, true, 0, + DataType.STRING, 100000); + validateIndex(ColumnIndexType.TEXT_INDEX, EXISTING_STRING_COL_RAW, 5, 3, _schema, false, true, false, 4, true, 0, + null, false, DataType.STRING, 100000); + // Add it back so that this column is not rewritten for the other tests below. + _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW); + + // TEST 5: EXISTING_INT_COL_RAW. Enable dictionary on a column that already has range index. + List rangeIndexCols = new ArrayList<>(); + rangeIndexCols.add(EXISTING_INT_COL_RAW); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), rangeIndexCols); + new SegmentV1V2ToV3FormatConverter().convert(_indexDir); + validateIndex(ColumnIndexType.RANGE_INDEX, EXISTING_INT_COL_RAW, 42242, 16, _schema, false, false, false, 0, true, + 0, ChunkCompressionType.LZ4, false, DataType.INT, 100000); + // At this point, the segment has range index. Now the reload path should create a dictionary and rewrite the + // range index. + _indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_INT_COL_RAW); + _indexLoadingConfig.getRangeIndexColumns().add(EXISTING_INT_COL_RAW); + checkForwardIndexCreation(EXISTING_INT_COL_RAW, 42242, 16, _schema, false, true, false, 0, null, true, 0, + DataType.INT, 100000); + validateIndex(ColumnIndexType.RANGE_INDEX, EXISTING_INT_COL_RAW, 42242, 16, _schema, false, true, false, 0, true, 0, + null, false, DataType.INT, 100000); + // Add it back so that this column is not rewritten for the other tests below. + _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW); + + // TEST 6: EXISTING_INT_COL_RAW_MV. Enable dictionary for an MV column. Also enable inverted index and range index. + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + new SegmentV1V2ToV3FormatConverter().convert(_indexDir); + validateIndex(ColumnIndexType.FORWARD_INDEX, EXISTING_INT_COL_RAW_MV, 18499, 15, _schema, false, false, false, 0, + false, 13, ChunkCompressionType.LZ4, false, DataType.INT, 106688); + + // Enable dictionary and inverted index. + _indexLoadingConfig.getNoDictionaryColumns().remove(EXISTING_INT_COL_RAW_MV); + _indexLoadingConfig.getInvertedIndexColumns().add(EXISTING_INT_COL_RAW_MV); + _indexLoadingConfig.getRangeIndexColumns().add(EXISTING_INT_COL_RAW_MV); + checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, 18499, 15, _schema, false, true, false, 0, null, false, 13, + DataType.INT, 106688); + validateIndex(ColumnIndexType.INVERTED_INDEX, EXISTING_INT_COL_RAW_MV, 18499, 15, _schema, false, true, false, 0, + false, 13, null, false, DataType.INT, 106688); + validateIndex(ColumnIndexType.RANGE_INDEX, EXISTING_INT_COL_RAW_MV, 18499, 15, _schema, false, true, false, 0, + false, 13, null, false, DataType.INT, 106688); + } + + @Test + public void testForwardIndexHandlerChangeCompression() throws Exception { Map compressionConfigs = new HashMap<>(); ChunkCompressionType newCompressionType = ChunkCompressionType.ZSTANDARD; @@ -338,7 +456,7 @@ public void testForwardIndexHandler() _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW); // Test1: Rewriting forward index will be a no-op for v1 segments. Default LZ4 compressionType will be retained. - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); checkForwardIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0, ChunkCompressionType.LZ4, true, 0, DataType.STRING, 100000); @@ -387,7 +505,7 @@ public void testForwardIndexHandler() _indexLoadingConfig.setCompressionConfigs(compressionConfigs); _indexLoadingConfig.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW_MV); - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); new SegmentV1V2ToV3FormatConverter().convert(_indexDir); checkForwardIndexCreation(EXISTING_INT_COL_RAW_MV, 18499, 15, _schema, false, false, false, 0, ChunkCompressionType.ZSTANDARD, false, 13, DataType.INT, 106688); @@ -421,7 +539,7 @@ public void testEnableTextIndexOnNewColumnDictEncoded() 1); // Create a segment in V1, add a new dict encoded column with text index enabled - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); segmentMetadata = new SegmentMetadataImpl(_indexDir); columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_STRING_COL_RAW); // should be null since column does not exist in the schema @@ -450,7 +568,7 @@ public void testEnableTextIndexOnExistingRawColumn() checkTextIndexCreation(EXISTING_STRING_COL_RAW, 5, 3, _schema, false, false, false, 0); // Create a segment in V1, add a new column with text index enabled - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); segmentMetadata = new SegmentMetadataImpl(_indexDir); columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW); assertNotNull(columnMetadata); @@ -480,7 +598,7 @@ public void testEnableTextIndexOnExistingDictEncodedColumn() checkTextIndexCreation(EXISTING_STRING_COL_DICT, 9, 4, _schema, false, true, false, 26); // Create a segment in V1, add a new column with text index enabled - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); segmentMetadata = new SegmentMetadataImpl(_indexDir); columnMetadata = segmentMetadata.getColumnMetadataFor(EXISTING_STRING_COL_RAW); assertNotNull(columnMetadata); @@ -545,13 +663,13 @@ private void validateIndex(ColumnIndexType indexType, String column, int cardina throws Exception { SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir); ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(column); + assertEquals(columnMetadata.hasDictionary(), hasDictionary); assertEquals(columnMetadata.getFieldSpec(), new DimensionFieldSpec(column, dataType, isSingleValued)); assertEquals(columnMetadata.getCardinality(), cardinality); assertEquals(columnMetadata.getTotalDocs(), 100000); assertEquals(columnMetadata.getBitsPerElement(), bits); assertEquals(columnMetadata.getColumnMaxLength(), dictionaryElementSize); assertEquals(columnMetadata.isSorted(), isSorted); - assertEquals(columnMetadata.hasDictionary(), hasDictionary); assertEquals(columnMetadata.getMaxNumberOfMultiValues(), maxNumberOfMultiValues); assertEquals(columnMetadata.getTotalNumberOfEntries(), totalNumberOfEntries); assertEquals(columnMetadata.isAutoGenerated(), isAutoGenerated); @@ -575,6 +693,8 @@ private void validateIndex(ColumnIndexType indexType, String column, int cardina // Check if the raw forward index compressionType is correct. if (expectedCompressionType != null) { + assertTrue(!hasDictionary); + try (ForwardIndexReader fwdIndexReader = LoaderUtils.getForwardIndexReader(reader, columnMetadata)) { ChunkCompressionType compressionType = fwdIndexReader.getCompressionType(); assertTrue(compressionType.equals(expectedCompressionType), compressionType.toString()); @@ -582,7 +702,14 @@ private void validateIndex(ColumnIndexType indexType, String column, int cardina File inProgressFile = new File(_indexDir, column + ".fwd.inprogress"); assertTrue(!inProgressFile.exists()); - File v1FwdIndexFile = new File(_indexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION); + + String fwdIndexFileExtension; + if (isSingleValued) { + fwdIndexFileExtension = V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION; + } else { + fwdIndexFileExtension = V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION; + } + File v1FwdIndexFile = new File(_indexDir, column + fwdIndexFileExtension); if (segmentMetadata.getVersion() == SegmentVersion.v3) { assertTrue(!v1FwdIndexFile.exists()); } else { @@ -603,7 +730,7 @@ private void validateIndex(ColumnIndexType indexType, String column, int cardina @Test public void testV1CreateInvertedIndices() throws Exception { - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir); assertEquals(segmentMetadata.getVersion(), SegmentVersion.v1); @@ -733,7 +860,7 @@ private void checkInvertedIndexCreation(boolean reCreate) @Test public void testV1UpdateDefaultColumns() throws Exception { - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs( Collections.singletonList(new TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)"))); @@ -940,12 +1067,12 @@ private void checkUpdateDefaultColumns() @Test public void testColumnMinMaxValue() throws Exception { - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); // Remove min/max value from the metadata removeMinMaxValuesFromMetadataFile(_indexDir); - IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(); + IndexLoadingConfig indexLoadingConfig = getDefaultIndexLoadingConfig(); indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.NONE); try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), @@ -1022,7 +1149,7 @@ public void testColumnMinMaxValue() @Test public void testV1CleanupIndices() throws Exception { - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir); assertEquals(segmentMetadata.getVersion(), SegmentVersion.v1); @@ -1030,12 +1157,12 @@ public void testV1CleanupIndices() // Need to create two default columns with Bytes and JSON string for H3 and JSON index. // Other kinds of indices can all be put on column3 with String values. String strColumn = "column3"; - _indexLoadingConfig = new IndexLoadingConfig(); - _indexLoadingConfig.setInvertedIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); - _indexLoadingConfig.setRangeIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); - _indexLoadingConfig.setTextIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); - _indexLoadingConfig.setFSTIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); - _indexLoadingConfig.setBloomFilterConfigs(ImmutableMap.of(strColumn, new BloomFilterConfig(0.1, 1024, true))); + IndexLoadingConfig indexLoadingConfig = getDefaultIndexLoadingConfig(); + indexLoadingConfig.setInvertedIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); + indexLoadingConfig.setRangeIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); + indexLoadingConfig.setTextIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); + indexLoadingConfig.setFSTIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); + indexLoadingConfig.setBloomFilterConfigs(ImmutableMap.of(strColumn, new BloomFilterConfig(0.1, 1024, true))); // V1 use separate file for each column index. File iiFile = new File(_indexDir, strColumn + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION); @@ -1054,7 +1181,7 @@ public void testV1CleanupIndices() try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); - SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, null)) { + SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, null)) { processor.process(); } assertTrue(iiFile.exists()); @@ -1067,7 +1194,8 @@ public void testV1CleanupIndices() try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); - SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), null)) { + SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, getDefaultIndexLoadingConfig(), + null)) { processor.process(); } assertFalse(iiFile.exists()); @@ -1093,7 +1221,9 @@ public void testV3CleanupIndices() try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); - SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), null)) { + + SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, getDefaultIndexLoadingConfig(), + null)) { processor.process(); } assertTrue(singleFileIndex.length() < initFileSize); @@ -1102,18 +1232,18 @@ public void testV3CleanupIndices() // Need to create two default columns with Bytes and JSON string for H3 and JSON index. // Other kinds of indices can all be put on column3 with String values. String strColumn = "column3"; - _indexLoadingConfig = new IndexLoadingConfig(); - _indexLoadingConfig.setInvertedIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); - _indexLoadingConfig.setRangeIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); - _indexLoadingConfig.setTextIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); - _indexLoadingConfig.setFSTIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); - _indexLoadingConfig.setBloomFilterConfigs(ImmutableMap.of(strColumn, new BloomFilterConfig(0.1, 1024, true))); + IndexLoadingConfig indexLoadingConfig = getDefaultIndexLoadingConfig(); + indexLoadingConfig.setInvertedIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); + indexLoadingConfig.setRangeIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); + indexLoadingConfig.setTextIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); + indexLoadingConfig.setFSTIndexColumns(new HashSet<>(Collections.singletonList(strColumn))); + indexLoadingConfig.setBloomFilterConfigs(ImmutableMap.of(strColumn, new BloomFilterConfig(0.1, 1024, true))); // Create all kinds of indices. try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); - SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, null)) { + SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, null)) { processor.process(); } @@ -1134,7 +1264,8 @@ public void testV3CleanupIndices() try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); - SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), null)) { + SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, getDefaultIndexLoadingConfig(), + null)) { processor.process(); } assertEquals(singleFileIndex.length(), initFileSize); @@ -1143,13 +1274,13 @@ public void testV3CleanupIndices() @Test public void testV1CleanupH3AndTextIndices() throws Exception { - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); // Remove all indices and add the two derived columns for H3 and Json index. try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); - SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), + SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, getDefaultIndexLoadingConfig(), _newColumnsSchemaWithH3Json)) { processor.process(); } @@ -1158,7 +1289,7 @@ public void testV1CleanupH3AndTextIndices() assertNotNull(segmentMetadata.getColumnMetadataFor("newH3Col")); assertNotNull(segmentMetadata.getColumnMetadataFor("newJsonCol")); - _indexLoadingConfig = new IndexLoadingConfig(); + _indexLoadingConfig = getDefaultIndexLoadingConfig(); _indexLoadingConfig.setH3IndexConfigs( ImmutableMap.of("newH3Col", new H3IndexConfig(ImmutableMap.of("resolutions", "5")))); _indexLoadingConfig.setJsonIndexColumns(new HashSet<>(Collections.singletonList("newJsonCol"))); @@ -1184,7 +1315,8 @@ public void testV1CleanupH3AndTextIndices() try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); - SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), null)) { + SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, getDefaultIndexLoadingConfig(), + null)) { processor.process(); } assertFalse(h3File.exists()); @@ -1207,7 +1339,7 @@ public void testV3CleanupH3AndTextIndices() try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); - SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), + SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, getDefaultIndexLoadingConfig(), _newColumnsSchemaWithH3Json)) { processor.process(); } @@ -1216,16 +1348,16 @@ public void testV3CleanupH3AndTextIndices() assertNotNull(segmentMetadata.getColumnMetadataFor("newJsonCol")); long initFileSize = singleFileIndex.length(); - _indexLoadingConfig = new IndexLoadingConfig(); - _indexLoadingConfig.setH3IndexConfigs( + IndexLoadingConfig indexLoadingConfig = getDefaultIndexLoadingConfig(); + indexLoadingConfig.setH3IndexConfigs( ImmutableMap.of("newH3Col", new H3IndexConfig(ImmutableMap.of("resolutions", "5")))); - _indexLoadingConfig.setJsonIndexColumns(new HashSet<>(Collections.singletonList("newJsonCol"))); + indexLoadingConfig.setJsonIndexColumns(new HashSet<>(Collections.singletonList("newJsonCol"))); // Create H3 and Json indices. try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); - SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, null)) { + SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, null)) { processor.process(); } @@ -1243,7 +1375,8 @@ public void testV3CleanupH3AndTextIndices() try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); - SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), null)) { + SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, getDefaultIndexLoadingConfig(), + null)) { processor.process(); } assertEquals(singleFileIndex.length(), initFileSize); @@ -1252,7 +1385,7 @@ public void testV3CleanupH3AndTextIndices() @Test public void testV1IfNeedProcess() throws Exception { - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir); assertEquals(segmentMetadata.getVersion(), SegmentVersion.v1); @@ -1275,7 +1408,8 @@ private void testIfNeedProcess() try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); - SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), null)) { + SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, getDefaultIndexLoadingConfig(), + null)) { assertTrue(processor.needProcess()); processor.process(); assertFalse(processor.needProcess()); @@ -1285,7 +1419,7 @@ private void testIfNeedProcess() try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); - SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, new IndexLoadingConfig(), + SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, getDefaultIndexLoadingConfig(), _newColumnsSchemaWithH3Json)) { assertTrue(processor.needProcess()); processor.process(); @@ -1295,7 +1429,7 @@ private void testIfNeedProcess() // No preprocessing needed if required to add certain index on non-existing, sorted or non-dictionary column. for (Map.Entry> entry : createConfigPrepFunctionNeedNoops().entrySet()) { String testCase = entry.getKey(); - IndexLoadingConfig config = new IndexLoadingConfig(); + IndexLoadingConfig config = getDefaultIndexLoadingConfig(); entry.getValue().accept(config); try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), @@ -1308,7 +1442,7 @@ private void testIfNeedProcess() // Require to add different types of indices. Add one new index a time // to test the index handlers separately. - IndexLoadingConfig config = new IndexLoadingConfig(); + IndexLoadingConfig config = getDefaultIndexLoadingConfig(); for (Map.Entry> entry : createConfigPrepFunctions().entrySet()) { String testCase = entry.getKey(); entry.getValue().accept(config); @@ -1329,6 +1463,10 @@ private void testIfNeedProcess() indexingConfig.setEnableDefaultStarTree(true); _tableConfig.setIndexingConfig(indexingConfig); IndexLoadingConfig configWithStarTreeIndex = new IndexLoadingConfig(null, _tableConfig); + // Set RAW columns. Otherwise, they will end up being converted to dict columns (default) during segment reload. + configWithStarTreeIndex.getNoDictionaryColumns().add(EXISTING_STRING_COL_RAW); + configWithStarTreeIndex.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW_MV); + configWithStarTreeIndex.getNoDictionaryColumns().add(EXISTING_INT_COL_RAW); createConfigPrepFunctions().forEach((k, v) -> v.accept(configWithStarTreeIndex)); try (SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(_indexDir.toURI(), @@ -1385,7 +1523,7 @@ public void testNeedAddMinMaxValue() SegmentDirectory segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader() .load(segment.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); - IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(); + IndexLoadingConfig indexLoadingConfig = getDefaultIndexLoadingConfig(); indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.ALL); SegmentPreProcessor processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, schema); assertFalse(processor.needProcess()); @@ -1395,7 +1533,7 @@ public void testNeedAddMinMaxValue() segment = buildTestSegmentForMinMax(tableConfig, schema, "invalidSegment", stringValuesInvalid, longValues); segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(segment.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); - indexLoadingConfig = new IndexLoadingConfig(); + indexLoadingConfig = getDefaultIndexLoadingConfig(); indexLoadingConfig.setColumnMinMaxValueGeneratorMode(ColumnMinMaxValueGeneratorMode.NONE); processor = new SegmentPreProcessor(segmentDirectory, indexLoadingConfig, schema); assertFalse(processor.needProcess()); @@ -1526,7 +1664,7 @@ public void testForwardIndexDisabledOnNewColumnsSV() _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 0, null, true, DataType.STRING, 100000); // Create a segment in V1, add a column with no forward index enabled - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); segmentMetadata = new SegmentMetadataImpl(_indexDir); columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV); // should be null since column does not exist in the schema @@ -1558,7 +1696,7 @@ public void testForwardIndexDisabledOnNewColumnsSV() + "forward index"); } - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); segmentMetadata = new SegmentMetadataImpl(_indexDir); columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV); // should be null since column does not exist in the schema @@ -1594,7 +1732,7 @@ public void testForwardIndexDisabledOnNewColumnsSV() NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV, 1, 1, _newColumnsSchemaWithForwardIndexDisabled, true, true, true, 4, true, 0, null, false, DataType.STRING, 100000); - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); segmentMetadata = new SegmentMetadataImpl(_indexDir); columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV); // should be null since column does not exist in the schema @@ -1629,7 +1767,7 @@ public void testForwardIndexDisabledOnNewColumnsSV() + "newForwardIndexDisabledColumnSV"); } - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); segmentMetadata = new SegmentMetadataImpl(_indexDir); columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_SV); // should be null since column does not exist in the schema @@ -1675,7 +1813,7 @@ public void testForwardIndexDisabledOnNewColumnsMV() _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, true, DataType.STRING, 100000); // Create a segment in V1, add a column with no forward index enabled - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); segmentMetadata = new SegmentMetadataImpl(_indexDir); columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV); // should be null since column does not exist in the schema @@ -1703,7 +1841,7 @@ public void testForwardIndexDisabledOnNewColumnsMV() _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, false, DataType.STRING, 100000)); - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); segmentMetadata = new SegmentMetadataImpl(_indexDir); columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV); // should be null since column does not exist in the schema @@ -1733,7 +1871,7 @@ public void testForwardIndexDisabledOnNewColumnsMV() _newColumnsSchemaWithForwardIndexDisabled, true, true, false, 4, false, 1, null, false, DataType.STRING, 100000)); - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); segmentMetadata = new SegmentMetadataImpl(_indexDir); columnMetadata = segmentMetadata.getColumnMetadataFor(NEWLY_ADDED_FORWARD_INDEX_DISABLED_COL_MV); // should be null since column does not exist in the schema @@ -1772,7 +1910,7 @@ public void testForwardIndexDisabledOnExistingColumnDictEncoded() new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchemaWithForwardIndexDisabled); expectThrows(RuntimeException.class, v3Processor::process); - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(_indexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); SegmentPreProcessor v1Processor = @@ -1805,7 +1943,7 @@ public void testForwardIndexDisabledOnExistingColumnRaw() new SegmentPreProcessor(segmentDirectory, _indexLoadingConfig, _newColumnsSchemaWithForwardIndexDisabled); expectThrows(RuntimeException.class, v3Processor::process); - constructV1Segment(); + constructV1Segment(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); segmentDirectory = SegmentDirectoryLoaderRegistry.getDefaultSegmentDirectoryLoader().load(_indexDir.toURI(), new SegmentDirectoryLoaderContext.Builder().setSegmentDirectoryConfigs(_configuration).build()); SegmentPreProcessor v1Processor =