Skip to content

Commit

Permalink
make index readers/loaders pluggable (#7897)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardstartin authored Dec 17, 2021
1 parent f9ab252 commit 77a7069
Show file tree
Hide file tree
Showing 20 changed files with 827 additions and 290 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.pinot.core.minion.RawIndexConverter;
import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor;
import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
import org.apache.pinot.segment.spi.creator.IndexCreatorProviders;
import org.apache.pinot.segment.spi.index.IndexingOverrides;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;


Expand All @@ -41,7 +41,7 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
new RawIndexConverter(rawTableName, indexDir, workingDir,
configs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY),
IndexCreatorProviders.getIndexCreatorProvider()).convert();
IndexingOverrides.getIndexCreatorProvider()).convert();
return new SegmentConversionResult.Builder().setFile(workingDir)
.setTableNameWithType(configs.get(MinionConstants.TABLE_NAME_KEY))
.setSegmentName(configs.get(MinionConstants.SEGMENT_NAME_KEY)).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.converter.SegmentFormatConverter;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.IndexingOverrides;
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader;
Expand Down Expand Up @@ -141,7 +142,8 @@ public static ImmutableSegment load(File indexDir, IndexLoadingConfig indexLoadi
for (Map.Entry<String, ColumnMetadata> entry : columnMetadataMap.entrySet()) {
// FIXME: text-index only works with local SegmentDirectory
indexContainerMap.put(entry.getKey(),
new PhysicalColumnIndexContainer(segmentReader, entry.getValue(), indexLoadingConfig, indexDir));
new PhysicalColumnIndexContainer(segmentReader, entry.getValue(), indexLoadingConfig, indexDir,
IndexingOverrides.getIndexReaderProvider()));
}

// Instantiate virtual columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.IndexCreatorProviders;
import org.apache.pinot.segment.spi.creator.SegmentCreator;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.index.IndexingOverrides;
import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator;
Expand Down Expand Up @@ -83,7 +83,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {

private SegmentGeneratorConfig _config;
private Map<String, ColumnIndexCreationInfo> _indexCreationInfoMap;
private final IndexCreatorProvider _indexCreatorProvider = IndexCreatorProviders.getIndexCreatorProvider();
private final IndexCreatorProvider _indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
private final Map<String, SegmentDictionaryCreator> _dictionaryCreatorMap = new HashMap<>();
private final Map<String, ForwardIndexCreator> _forwardIndexCreatorMap = new HashMap<>();
private final Map<String, DictionaryBasedInvertedIndexCreator> _invertedIndexCreatorMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,41 +22,20 @@
import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4;
import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator;
import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.readers.BaseImmutableDictionary;
import org.apache.pinot.segment.local.segment.index.readers.BitSlicedRangeIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.BitmapInvertedIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.BytesDictionary;
import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary;
import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary;
import org.apache.pinot.segment.local.segment.index.readers.IntDictionary;
import org.apache.pinot.segment.local.segment.index.readers.LongDictionary;
import org.apache.pinot.segment.local.segment.index.readers.LuceneFSTIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.NullValueVectorReaderImpl;
import org.apache.pinot.segment.local.segment.index.readers.OnHeapDoubleDictionary;
import org.apache.pinot.segment.local.segment.index.readers.OnHeapFloatDictionary;
import org.apache.pinot.segment.local.segment.index.readers.OnHeapIntDictionary;
import org.apache.pinot.segment.local.segment.index.readers.OnHeapLongDictionary;
import org.apache.pinot.segment.local.segment.index.readers.OnHeapStringDictionary;
import org.apache.pinot.segment.local.segment.index.readers.RangeIndexReaderImpl;
import org.apache.pinot.segment.local.segment.index.readers.StringDictionary;
import org.apache.pinot.segment.local.segment.index.readers.bloom.BloomFilterReaderFactory;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4;
import org.apache.pinot.segment.local.segment.index.readers.geospatial.ImmutableH3IndexReader;
import org.apache.pinot.segment.local.segment.index.readers.json.ImmutableJsonIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl;
import org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexReader;
import org.apache.pinot.segment.local.utils.nativefst.FSTHeader;
import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
Expand All @@ -68,6 +47,7 @@
import org.apache.pinot.segment.spi.index.reader.RangeIndexReader;
import org.apache.pinot.segment.spi.index.reader.SortedIndexReader;
import org.apache.pinot.segment.spi.index.reader.TextIndexReader;
import org.apache.pinot.segment.spi.index.reader.provider.IndexReaderProvider;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
Expand All @@ -93,7 +73,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer
private final NullValueVectorReaderImpl _nullValueVectorReader;

public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, ColumnMetadata metadata,
IndexLoadingConfig indexLoadingConfig, File segmentIndexDir)
IndexLoadingConfig indexLoadingConfig, File segmentIndexDir, IndexReaderProvider indexReaderProvider)
throws IOException {
String columnName = metadata.getColumnName();
boolean loadInvertedIndex = indexLoadingConfig.getInvertedIndexColumns().contains(columnName);
Expand All @@ -115,31 +95,30 @@ public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, Colum
if (loadTextIndex) {
Preconditions.checkState(segmentReader.hasIndexFor(columnName, ColumnIndexType.TEXT_INDEX));
Map<String, Map<String, String>> columnProperties = indexLoadingConfig.getColumnProperties();
_textIndex = new LuceneTextIndexReader(columnName, segmentIndexDir, metadata.getTotalDocs(),
columnProperties.get(columnName));
_textIndex = indexReaderProvider.newTextIndexReader(segmentIndexDir, metadata, columnProperties.get(columnName));
} else {
_textIndex = null;
}

if (loadJsonIndex) {
Preconditions.checkState(segmentReader.hasIndexFor(columnName, ColumnIndexType.JSON_INDEX));
PinotDataBuffer jsonIndexBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.JSON_INDEX);
_jsonIndex = new ImmutableJsonIndexReader(jsonIndexBuffer, metadata.getTotalDocs());
_jsonIndex = indexReaderProvider.newJsonIndexReader(jsonIndexBuffer, metadata);
} else {
_jsonIndex = null;
}

if (loadH3Index) {
Preconditions.checkState(segmentReader.hasIndexFor(columnName, ColumnIndexType.H3_INDEX));
PinotDataBuffer h3IndexBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.H3_INDEX);
_h3Index = new ImmutableH3IndexReader(h3IndexBuffer);
_h3Index = indexReaderProvider.newGeospatialIndexReader(h3IndexBuffer, metadata);
} else {
_h3Index = null;
}

if (bloomFilterConfig != null) {
PinotDataBuffer bloomFilterBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.BLOOM_FILTER);
_bloomFilter = BloomFilterReaderFactory.getBloomFilterReader(bloomFilterBuffer, bloomFilterConfig.isLoadOnHeap());
_bloomFilter = indexReaderProvider.newBloomFilterReader(bloomFilterBuffer, bloomFilterConfig.isLoadOnHeap());
} else {
_bloomFilter = null;
}
Expand All @@ -153,60 +132,38 @@ public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, Colum
// Single-value
if (metadata.isSorted()) {
// Sorted
SortedIndexReader<?> sortedIndexReader = new SortedIndexReaderImpl(fwdIndexBuffer, metadata.getCardinality());
SortedIndexReader<?> sortedIndexReader = indexReaderProvider.newSortedIndexReader(fwdIndexBuffer, metadata);
_forwardIndex = sortedIndexReader;
_invertedIndex = sortedIndexReader;
_rangeIndex = null;
_fstIndex = null;
return;
} else {
// Unsorted
_forwardIndex =
new FixedBitSVForwardIndexReaderV2(fwdIndexBuffer, metadata.getTotalDocs(), metadata.getBitsPerElement());
}
} else {
// Multi-value
_forwardIndex = new FixedBitMVForwardIndexReader(fwdIndexBuffer, metadata.getTotalDocs(),
metadata.getTotalNumberOfEntries(), metadata.getBitsPerElement());
}
_forwardIndex = indexReaderProvider.newForwardIndexReader(fwdIndexBuffer, metadata);
if (loadInvertedIndex) {
_invertedIndex =
new BitmapInvertedIndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.INVERTED_INDEX),
metadata.getCardinality());
_invertedIndex = indexReaderProvider.newInvertedIndexReader(
segmentReader.getIndexFor(columnName, ColumnIndexType.INVERTED_INDEX), metadata);
} else {
_invertedIndex = null;
}

if (loadFSTIndex) {
PinotDataBuffer buffer = segmentReader.getIndexFor(columnName, ColumnIndexType.FST_INDEX);
int magicHeader = buffer.getInt(0);
if (magicHeader == FSTHeader.FST_MAGIC) {
_fstIndex = new NativeFSTIndexReader(buffer);
} else {
_fstIndex = new LuceneFSTIndexReader(buffer);
}
_fstIndex = indexReaderProvider.newFSTIndexReader(buffer, metadata);
} else {
_fstIndex = null;
}

if (loadRangeIndex) {
PinotDataBuffer buffer = segmentReader.getIndexFor(columnName, ColumnIndexType.RANGE_INDEX);
int version = buffer.getInt(0);
if (version == RangeIndexCreator.VERSION) {
_rangeIndex = new RangeIndexReaderImpl(buffer);
} else if (version == BitSlicedRangeIndexCreator.VERSION) {
_rangeIndex = new BitSlicedRangeIndexReader(buffer, metadata);
} else {
LOGGER.warn("Unknown range index version: {}, skip loading range index for column: {}", version,
metadata.getColumnName());
_rangeIndex = null;
}
_rangeIndex = indexReaderProvider.newRangeIndexReader(buffer, metadata);
} else {
_rangeIndex = null;
}
} else {
// Raw index
_forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType(), metadata.isSingleValue());
_forwardIndex = indexReaderProvider.newForwardIndexReader(fwdIndexBuffer, metadata);
_dictionary = null;
_rangeIndex = null;
_invertedIndex = null;
Expand Down Expand Up @@ -306,30 +263,6 @@ public static BaseImmutableDictionary loadDictionary(PinotDataBuffer dictionaryB
}
}

private static ForwardIndexReader<?> loadRawForwardIndex(PinotDataBuffer forwardIndexBuffer, DataType dataType,
boolean isSingleValue) {
DataType storedType = dataType.getStoredType();
switch (storedType) {
case INT:
case LONG:
case FLOAT:
case DOUBLE:
return isSingleValue ? new FixedByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType)
: new FixedByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType);
case STRING:
case BYTES:
if (isSingleValue) {
int version = forwardIndexBuffer.getInt(0);
return version < VarByteChunkSVForwardIndexWriterV4.VERSION
? new VarByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType)
: new VarByteChunkSVForwardIndexReaderV4(forwardIndexBuffer, storedType);
}
return new VarByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType);
default:
throw new IllegalStateException("Illegal data type for raw forward index: " + dataType);
}
}

@Override
public void close()
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.RangeIndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.TextIndexHandler;
import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.IndexCreatorProviders;
import org.apache.pinot.segment.spi.index.IndexingOverrides;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
Expand All @@ -42,7 +42,7 @@ private IndexHandlerFactory() {

public static IndexHandler getIndexHandler(ColumnIndexType type, File indexDir, SegmentMetadataImpl segmentMetadata,
IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer segmentWriter) {
IndexCreatorProvider indexCreatorProvider = IndexCreatorProviders.getIndexCreatorProvider();
IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
switch (type) {
case INVERTED_INDEX:
return new InvertedIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,12 @@
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
import org.apache.pinot.segment.local.segment.index.readers.BaseImmutableDictionary;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.index.IndexingOverrides;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -59,28 +52,7 @@ public static ForwardIndexReader<?> getForwardIndexReader(SegmentDirectory.Reade
throws IOException {
PinotDataBuffer dataBuffer =
segmentReader.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.FORWARD_INDEX);
if (columnMetadata.hasDictionary()) {
if (columnMetadata.isSingleValue()) {
if (columnMetadata.isSorted()) {
return new SortedIndexReaderImpl(dataBuffer, columnMetadata.getCardinality());
} else {
return new FixedBitSVForwardIndexReaderV2(dataBuffer, columnMetadata.getTotalDocs(),
columnMetadata.getBitsPerElement());
}
} else {
return new FixedBitMVForwardIndexReader(dataBuffer, columnMetadata.getTotalDocs(),
columnMetadata.getTotalNumberOfEntries(), columnMetadata.getBitsPerElement());
}
} else {
DataType storedType = columnMetadata.getDataType().getStoredType();
if (columnMetadata.isSingleValue()) {
return storedType.isFixedWidth() ? new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType)
: new VarByteChunkSVForwardIndexReader(dataBuffer, storedType);
} else {
return storedType.isFixedWidth() ? new FixedByteChunkMVForwardIndexReader(dataBuffer, storedType)
: new VarByteChunkMVForwardIndexReader(dataBuffer, storedType);
}
}
return IndexingOverrides.getIndexReaderProvider().newForwardIndexReader(dataBuffer, columnMetadata);
}

/**
Expand Down
Loading

0 comments on commit 77a7069

Please sign in to comment.