diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java index ba3e61098558..53cc61068f2d 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/converttorawindex/ConvertToRawIndexTaskExecutor.java @@ -27,7 +27,7 @@ import org.apache.pinot.core.minion.RawIndexConverter; import org.apache.pinot.plugin.minion.tasks.BaseSingleSegmentConversionExecutor; import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult; -import org.apache.pinot.segment.spi.creator.IndexCreatorProviders; +import org.apache.pinot.segment.spi.index.IndexingOverrides; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -41,7 +41,7 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); new RawIndexConverter(rawTableName, indexDir, workingDir, configs.get(MinionConstants.ConvertToRawIndexTask.COLUMNS_TO_CONVERT_KEY), - IndexCreatorProviders.getIndexCreatorProvider()).convert(); + IndexingOverrides.getIndexCreatorProvider()).convert(); return new SegmentConversionResult.Builder().setFile(workingDir) .setTableNameWithType(configs.get(MinionConstants.TABLE_NAME_KEY)) .setSegmentName(configs.get(MinionConstants.SEGMENT_NAME_KEY)).build(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java index c0f00078a200..d1a31cb3800e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java @@ -37,6 +37,7 @@ import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.converter.SegmentFormatConverter; import org.apache.pinot.segment.spi.creator.SegmentVersion; +import org.apache.pinot.segment.spi.index.IndexingOverrides; import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader; @@ -141,7 +142,8 @@ public static ImmutableSegment load(File indexDir, IndexLoadingConfig indexLoadi for (Map.Entry entry : columnMetadataMap.entrySet()) { // FIXME: text-index only works with local SegmentDirectory indexContainerMap.put(entry.getKey(), - new PhysicalColumnIndexContainer(segmentReader, entry.getValue(), indexLoadingConfig, indexDir)); + new PhysicalColumnIndexContainer(segmentReader, entry.getValue(), indexLoadingConfig, indexDir, + IndexingOverrides.getIndexReaderProvider())); } // Instantiate virtual columns diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java index b01dd3f228d3..338dabf18795 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentColumnarIndexCreator.java @@ -40,9 +40,9 @@ import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo; import org.apache.pinot.segment.spi.creator.IndexCreationContext; import org.apache.pinot.segment.spi.creator.IndexCreatorProvider; -import org.apache.pinot.segment.spi.creator.IndexCreatorProviders; import org.apache.pinot.segment.spi.creator.SegmentCreator; import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.segment.spi.index.IndexingOverrides; import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator; import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator; @@ -83,7 +83,7 @@ public class SegmentColumnarIndexCreator implements SegmentCreator { private SegmentGeneratorConfig _config; private Map _indexCreationInfoMap; - private final IndexCreatorProvider _indexCreatorProvider = IndexCreatorProviders.getIndexCreatorProvider(); + private final IndexCreatorProvider _indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider(); private final Map _dictionaryCreatorMap = new HashMap<>(); private final Map _forwardIndexCreatorMap = new HashMap<>(); private final Map _invertedIndexCreatorMap = new HashMap<>(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java index d7de2026a879..f233ace5ff89 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/PhysicalColumnIndexContainer.java @@ -22,41 +22,20 @@ import java.io.File; import java.io.IOException; import java.util.Map; -import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4; -import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator; -import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.readers.BaseImmutableDictionary; -import org.apache.pinot.segment.local.segment.index.readers.BitSlicedRangeIndexReader; -import org.apache.pinot.segment.local.segment.index.readers.BitmapInvertedIndexReader; import org.apache.pinot.segment.local.segment.index.readers.BytesDictionary; import org.apache.pinot.segment.local.segment.index.readers.DoubleDictionary; import org.apache.pinot.segment.local.segment.index.readers.FloatDictionary; import org.apache.pinot.segment.local.segment.index.readers.IntDictionary; import org.apache.pinot.segment.local.segment.index.readers.LongDictionary; -import org.apache.pinot.segment.local.segment.index.readers.LuceneFSTIndexReader; import org.apache.pinot.segment.local.segment.index.readers.NullValueVectorReaderImpl; import org.apache.pinot.segment.local.segment.index.readers.OnHeapDoubleDictionary; import org.apache.pinot.segment.local.segment.index.readers.OnHeapFloatDictionary; import org.apache.pinot.segment.local.segment.index.readers.OnHeapIntDictionary; import org.apache.pinot.segment.local.segment.index.readers.OnHeapLongDictionary; import org.apache.pinot.segment.local.segment.index.readers.OnHeapStringDictionary; -import org.apache.pinot.segment.local.segment.index.readers.RangeIndexReaderImpl; import org.apache.pinot.segment.local.segment.index.readers.StringDictionary; -import org.apache.pinot.segment.local.segment.index.readers.bloom.BloomFilterReaderFactory; -import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader; -import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2; -import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader; -import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; -import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; -import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; -import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4; -import org.apache.pinot.segment.local.segment.index.readers.geospatial.ImmutableH3IndexReader; -import org.apache.pinot.segment.local.segment.index.readers.json.ImmutableJsonIndexReader; -import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl; -import org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexReader; -import org.apache.pinot.segment.local.utils.nativefst.FSTHeader; -import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexReader; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer; import org.apache.pinot.segment.spi.index.reader.BloomFilterReader; @@ -68,6 +47,7 @@ import org.apache.pinot.segment.spi.index.reader.RangeIndexReader; import org.apache.pinot.segment.spi.index.reader.SortedIndexReader; import org.apache.pinot.segment.spi.index.reader.TextIndexReader; +import org.apache.pinot.segment.spi.index.reader.provider.IndexReaderProvider; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.ColumnIndexType; import org.apache.pinot.segment.spi.store.SegmentDirectory; @@ -93,7 +73,7 @@ public final class PhysicalColumnIndexContainer implements ColumnIndexContainer private final NullValueVectorReaderImpl _nullValueVectorReader; public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, ColumnMetadata metadata, - IndexLoadingConfig indexLoadingConfig, File segmentIndexDir) + IndexLoadingConfig indexLoadingConfig, File segmentIndexDir, IndexReaderProvider indexReaderProvider) throws IOException { String columnName = metadata.getColumnName(); boolean loadInvertedIndex = indexLoadingConfig.getInvertedIndexColumns().contains(columnName); @@ -115,8 +95,7 @@ public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, Colum if (loadTextIndex) { Preconditions.checkState(segmentReader.hasIndexFor(columnName, ColumnIndexType.TEXT_INDEX)); Map> columnProperties = indexLoadingConfig.getColumnProperties(); - _textIndex = new LuceneTextIndexReader(columnName, segmentIndexDir, metadata.getTotalDocs(), - columnProperties.get(columnName)); + _textIndex = indexReaderProvider.newTextIndexReader(segmentIndexDir, metadata, columnProperties.get(columnName)); } else { _textIndex = null; } @@ -124,7 +103,7 @@ public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, Colum if (loadJsonIndex) { Preconditions.checkState(segmentReader.hasIndexFor(columnName, ColumnIndexType.JSON_INDEX)); PinotDataBuffer jsonIndexBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.JSON_INDEX); - _jsonIndex = new ImmutableJsonIndexReader(jsonIndexBuffer, metadata.getTotalDocs()); + _jsonIndex = indexReaderProvider.newJsonIndexReader(jsonIndexBuffer, metadata); } else { _jsonIndex = null; } @@ -132,14 +111,14 @@ public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, Colum if (loadH3Index) { Preconditions.checkState(segmentReader.hasIndexFor(columnName, ColumnIndexType.H3_INDEX)); PinotDataBuffer h3IndexBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.H3_INDEX); - _h3Index = new ImmutableH3IndexReader(h3IndexBuffer); + _h3Index = indexReaderProvider.newGeospatialIndexReader(h3IndexBuffer, metadata); } else { _h3Index = null; } if (bloomFilterConfig != null) { PinotDataBuffer bloomFilterBuffer = segmentReader.getIndexFor(columnName, ColumnIndexType.BLOOM_FILTER); - _bloomFilter = BloomFilterReaderFactory.getBloomFilterReader(bloomFilterBuffer, bloomFilterConfig.isLoadOnHeap()); + _bloomFilter = indexReaderProvider.newBloomFilterReader(bloomFilterBuffer, bloomFilterConfig.isLoadOnHeap()); } else { _bloomFilter = null; } @@ -153,60 +132,38 @@ public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, Colum // Single-value if (metadata.isSorted()) { // Sorted - SortedIndexReader sortedIndexReader = new SortedIndexReaderImpl(fwdIndexBuffer, metadata.getCardinality()); + SortedIndexReader sortedIndexReader = indexReaderProvider.newSortedIndexReader(fwdIndexBuffer, metadata); _forwardIndex = sortedIndexReader; _invertedIndex = sortedIndexReader; _rangeIndex = null; _fstIndex = null; return; - } else { - // Unsorted - _forwardIndex = - new FixedBitSVForwardIndexReaderV2(fwdIndexBuffer, metadata.getTotalDocs(), metadata.getBitsPerElement()); } - } else { - // Multi-value - _forwardIndex = new FixedBitMVForwardIndexReader(fwdIndexBuffer, metadata.getTotalDocs(), - metadata.getTotalNumberOfEntries(), metadata.getBitsPerElement()); } + _forwardIndex = indexReaderProvider.newForwardIndexReader(fwdIndexBuffer, metadata); if (loadInvertedIndex) { - _invertedIndex = - new BitmapInvertedIndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.INVERTED_INDEX), - metadata.getCardinality()); + _invertedIndex = indexReaderProvider.newInvertedIndexReader( + segmentReader.getIndexFor(columnName, ColumnIndexType.INVERTED_INDEX), metadata); } else { _invertedIndex = null; } if (loadFSTIndex) { PinotDataBuffer buffer = segmentReader.getIndexFor(columnName, ColumnIndexType.FST_INDEX); - int magicHeader = buffer.getInt(0); - if (magicHeader == FSTHeader.FST_MAGIC) { - _fstIndex = new NativeFSTIndexReader(buffer); - } else { - _fstIndex = new LuceneFSTIndexReader(buffer); - } + _fstIndex = indexReaderProvider.newFSTIndexReader(buffer, metadata); } else { _fstIndex = null; } if (loadRangeIndex) { PinotDataBuffer buffer = segmentReader.getIndexFor(columnName, ColumnIndexType.RANGE_INDEX); - int version = buffer.getInt(0); - if (version == RangeIndexCreator.VERSION) { - _rangeIndex = new RangeIndexReaderImpl(buffer); - } else if (version == BitSlicedRangeIndexCreator.VERSION) { - _rangeIndex = new BitSlicedRangeIndexReader(buffer, metadata); - } else { - LOGGER.warn("Unknown range index version: {}, skip loading range index for column: {}", version, - metadata.getColumnName()); - _rangeIndex = null; - } + _rangeIndex = indexReaderProvider.newRangeIndexReader(buffer, metadata); } else { _rangeIndex = null; } } else { // Raw index - _forwardIndex = loadRawForwardIndex(fwdIndexBuffer, metadata.getDataType(), metadata.isSingleValue()); + _forwardIndex = indexReaderProvider.newForwardIndexReader(fwdIndexBuffer, metadata); _dictionary = null; _rangeIndex = null; _invertedIndex = null; @@ -306,30 +263,6 @@ public static BaseImmutableDictionary loadDictionary(PinotDataBuffer dictionaryB } } - private static ForwardIndexReader loadRawForwardIndex(PinotDataBuffer forwardIndexBuffer, DataType dataType, - boolean isSingleValue) { - DataType storedType = dataType.getStoredType(); - switch (storedType) { - case INT: - case LONG: - case FLOAT: - case DOUBLE: - return isSingleValue ? new FixedByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType) - : new FixedByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType); - case STRING: - case BYTES: - if (isSingleValue) { - int version = forwardIndexBuffer.getInt(0); - return version < VarByteChunkSVForwardIndexWriterV4.VERSION - ? new VarByteChunkSVForwardIndexReader(forwardIndexBuffer, storedType) - : new VarByteChunkSVForwardIndexReaderV4(forwardIndexBuffer, storedType); - } - return new VarByteChunkMVForwardIndexReader(forwardIndexBuffer, storedType); - default: - throw new IllegalStateException("Illegal data type for raw forward index: " + dataType); - } - } - @Override public void close() throws IOException { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java index a6fe925280de..d06c1ba75c83 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexHandlerFactory.java @@ -27,7 +27,7 @@ import org.apache.pinot.segment.local.segment.index.loader.invertedindex.RangeIndexHandler; import org.apache.pinot.segment.local.segment.index.loader.invertedindex.TextIndexHandler; import org.apache.pinot.segment.spi.creator.IndexCreatorProvider; -import org.apache.pinot.segment.spi.creator.IndexCreatorProviders; +import org.apache.pinot.segment.spi.index.IndexingOverrides; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.store.ColumnIndexType; import org.apache.pinot.segment.spi.store.SegmentDirectory; @@ -42,7 +42,7 @@ private IndexHandlerFactory() { public static IndexHandler getIndexHandler(ColumnIndexType type, File indexDir, SegmentMetadataImpl segmentMetadata, IndexLoadingConfig indexLoadingConfig, SegmentDirectory.Writer segmentWriter) { - IndexCreatorProvider indexCreatorProvider = IndexCreatorProviders.getIndexCreatorProvider(); + IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider(); switch (type) { case INVERTED_INDEX: return new InvertedIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter, diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java index b7083d045b81..136e4660b65e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/LoaderUtils.java @@ -27,19 +27,12 @@ import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer; import org.apache.pinot.segment.local.segment.index.readers.BaseImmutableDictionary; -import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader; -import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2; -import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader; -import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; -import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; -import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; -import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl; import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.index.IndexingOverrides; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.ColumnIndexType; import org.apache.pinot.segment.spi.store.SegmentDirectory; -import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,28 +52,7 @@ public static ForwardIndexReader getForwardIndexReader(SegmentDirectory.Reade throws IOException { PinotDataBuffer dataBuffer = segmentReader.getIndexFor(columnMetadata.getColumnName(), ColumnIndexType.FORWARD_INDEX); - if (columnMetadata.hasDictionary()) { - if (columnMetadata.isSingleValue()) { - if (columnMetadata.isSorted()) { - return new SortedIndexReaderImpl(dataBuffer, columnMetadata.getCardinality()); - } else { - return new FixedBitSVForwardIndexReaderV2(dataBuffer, columnMetadata.getTotalDocs(), - columnMetadata.getBitsPerElement()); - } - } else { - return new FixedBitMVForwardIndexReader(dataBuffer, columnMetadata.getTotalDocs(), - columnMetadata.getTotalNumberOfEntries(), columnMetadata.getBitsPerElement()); - } - } else { - DataType storedType = columnMetadata.getDataType().getStoredType(); - if (columnMetadata.isSingleValue()) { - return storedType.isFixedWidth() ? new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType) - : new VarByteChunkSVForwardIndexReader(dataBuffer, storedType); - } else { - return storedType.isFixedWidth() ? new FixedByteChunkMVForwardIndexReader(dataBuffer, storedType) - : new VarByteChunkMVForwardIndexReader(dataBuffer, storedType); - } - } + return IndexingOverrides.getIndexReaderProvider().newForwardIndexReader(dataBuffer, columnMetadata); } /** diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DefaultIndexReaderProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DefaultIndexReaderProvider.java new file mode 100644 index 000000000000..79cb55da7a72 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/DefaultIndexReaderProvider.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.index.readers; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.segment.local.io.writer.impl.VarByteChunkSVForwardIndexWriterV4; +import org.apache.pinot.segment.local.segment.creator.impl.inv.BitSlicedRangeIndexCreator; +import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator; +import org.apache.pinot.segment.local.segment.index.readers.bloom.BloomFilterReaderFactory; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReaderV4; +import org.apache.pinot.segment.local.segment.index.readers.geospatial.ImmutableH3IndexReader; +import org.apache.pinot.segment.local.segment.index.readers.json.ImmutableJsonIndexReader; +import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl; +import org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexReader; +import org.apache.pinot.segment.local.utils.nativefst.FSTHeader; +import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexReader; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.index.reader.BloomFilterReader; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.index.reader.H3IndexReader; +import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader; +import org.apache.pinot.segment.spi.index.reader.JsonIndexReader; +import org.apache.pinot.segment.spi.index.reader.RangeIndexReader; +import org.apache.pinot.segment.spi.index.reader.SortedIndexReader; +import org.apache.pinot.segment.spi.index.reader.TextIndexReader; +import org.apache.pinot.segment.spi.index.reader.provider.IndexReaderProvider; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.FieldSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Default implementations of index reader provision logic. This class should not be + * instantiated but accessed via {@see IndexReaderProviders#getIndexReaderProvider} so + * this logic may be overridden by users of the SPI. Unless an override is specified, + * this is the logic which will be used to construct readers for data buffers. + */ +public class DefaultIndexReaderProvider implements IndexReaderProvider { + + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultIndexReaderProvider.class); + + @Override + public BloomFilterReader newBloomFilterReader(PinotDataBuffer dataBuffer, boolean onHeap) + throws IOException { + return BloomFilterReaderFactory.getBloomFilterReader(dataBuffer, onHeap); + } + + @Override + public ForwardIndexReader newForwardIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata columnMetadata) + throws IOException { + if (columnMetadata.hasDictionary()) { + if (columnMetadata.isSingleValue()) { + if (columnMetadata.isSorted()) { + return new SortedIndexReaderImpl(dataBuffer, columnMetadata.getCardinality()); + } else { + return new FixedBitSVForwardIndexReaderV2(dataBuffer, columnMetadata.getTotalDocs(), + columnMetadata.getBitsPerElement()); + } + } else { + return new FixedBitMVForwardIndexReader(dataBuffer, columnMetadata.getTotalDocs(), + columnMetadata.getTotalNumberOfEntries(), columnMetadata.getBitsPerElement()); + } + } else { + FieldSpec.DataType storedType = columnMetadata.getDataType().getStoredType(); + if (columnMetadata.isSingleValue()) { + if (storedType.isFixedWidth()) { + return new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType); + } + int version = dataBuffer.getInt(0); + if (version >= VarByteChunkSVForwardIndexWriterV4.VERSION) { + return new VarByteChunkSVForwardIndexReaderV4(dataBuffer, storedType); + } + return new VarByteChunkSVForwardIndexReader(dataBuffer, storedType); + } else { + return storedType.isFixedWidth() ? new FixedByteChunkMVForwardIndexReader(dataBuffer, storedType) + : new VarByteChunkMVForwardIndexReader(dataBuffer, storedType); + } + } + } + + @Override + public H3IndexReader newGeospatialIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata columnMetadata) + throws IOException { + return new ImmutableH3IndexReader(dataBuffer); + } + + @Override + public InvertedIndexReader newInvertedIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata columnMetadata) + throws IOException { + return new BitmapInvertedIndexReader(dataBuffer, columnMetadata.getCardinality()); + } + + @Override + public JsonIndexReader newJsonIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata columnMetadata) + throws IOException { + return new ImmutableJsonIndexReader(dataBuffer, columnMetadata.getTotalDocs()); + } + + @Override + public RangeIndexReader newRangeIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata columnMetadata) + throws IOException { + int version = dataBuffer.getInt(0); + if (version == RangeIndexCreator.VERSION) { + return new RangeIndexReaderImpl(dataBuffer); + } else if (version == BitSlicedRangeIndexCreator.VERSION) { + return new BitSlicedRangeIndexReader(dataBuffer, columnMetadata); + } + LOGGER.warn("Unknown range index version: {}, skip loading range index for column: {}", version, + columnMetadata.getColumnName()); + return null; + } + + @Override + public SortedIndexReader newSortedIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata columnMetadata) + throws IOException { + return new SortedIndexReaderImpl(dataBuffer, columnMetadata.getCardinality()); + } + + @Override + public TextIndexReader newFSTIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata columnMetadata) + throws IOException { + int magicHeader = dataBuffer.getInt(0); + if (magicHeader == FSTHeader.FST_MAGIC) { + return new NativeFSTIndexReader(dataBuffer); + } else { + return new LuceneFSTIndexReader(dataBuffer); + } + } + + @Override + public TextIndexReader newTextIndexReader(File file, ColumnMetadata columnMetadata, @Nullable + Map textIndexProperties) { + return new LuceneTextIndexReader(columnMetadata.getColumnName(), file, columnMetadata.getTotalDocs(), + textIndexProperties); + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/IndexCreatorOverrideTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/IndexOverridesTest.java similarity index 63% rename from pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/IndexCreatorOverrideTest.java rename to pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/IndexOverridesTest.java index 6e32884f3b03..e96fe4d98640 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/creator/impl/IndexCreatorOverrideTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/IndexOverridesTest.java @@ -16,20 +16,24 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.segment.creator.impl; +package org.apache.pinot.segment.local.segment.index; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.util.UUID; import org.apache.pinot.segment.local.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator; +import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.creator.IndexCreationContext; import org.apache.pinot.segment.spi.creator.IndexCreatorProvider; -import org.apache.pinot.segment.spi.creator.IndexCreatorProviders; +import org.apache.pinot.segment.spi.index.IndexingOverrides; import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator; import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl; +import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; +import org.mockito.MockedStatic; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; @@ -37,18 +41,19 @@ import static org.apache.commons.io.FileUtils.deleteQuietly; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -public class IndexCreatorOverrideTest { +public class IndexOverridesTest { private File _file; @BeforeTest public void before() throws IOException { - _file = Files.createTempFile("IndexCreatorOverrideTest", UUID.randomUUID().toString()).toFile(); + _file = Files.createTempFile("IndexOverridesTest", UUID.randomUUID().toString()).toFile(); } @AfterTest @@ -60,7 +65,8 @@ public void cleanup() { public void testOverrideInvertedIndexCreation() throws IOException { DictionaryBasedInvertedIndexCreator highCardinalityInvertedIndex = mock(DictionaryBasedInvertedIndexCreator.class); - IndexCreatorProvider provider = new IndexCreatorProviders.Default() { + InvertedIndexReader highCardinalityInvertedIndexReader = mock(InvertedIndexReader.class); + IndexCreatorProvider provider = new IndexingOverrides.Default() { @Override public DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreationContext.Inverted context) throws IOException { @@ -69,14 +75,30 @@ public DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreation } return super.newInvertedIndexCreator(context); } + + @Override + public InvertedIndexReader newInvertedIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IOException { + if (metadata.getCardinality() >= 10000) { + return highCardinalityInvertedIndexReader; + } + return super.newInvertedIndexReader(dataBuffer, metadata); + } }; - mockStatic(IndexCreatorProviders.class).when(IndexCreatorProviders::getIndexCreatorProvider).thenReturn(provider); + MockedStatic overrides = mockStatic(IndexingOverrides.class); + overrides.when(IndexingOverrides::getIndexCreatorProvider).thenReturn(provider); IndexCreationContext.Inverted highCardinalityContext = newContext(Integer.MAX_VALUE); - assertEquals(IndexCreatorProviders.getIndexCreatorProvider().newInvertedIndexCreator(highCardinalityContext), + assertEquals(IndexingOverrides.getIndexCreatorProvider().newInvertedIndexCreator(highCardinalityContext), highCardinalityInvertedIndex); IndexCreationContext.Inverted lowCardinalityContext = newContext(1); - assertTrue(IndexCreatorProviders.getIndexCreatorProvider() + assertTrue(IndexingOverrides.getIndexCreatorProvider() .newInvertedIndexCreator(lowCardinalityContext) instanceof OffHeapBitmapInvertedIndexCreator); + overrides.when(IndexingOverrides::getIndexReaderProvider).thenReturn(provider); + ColumnMetadata highCardinalityMetadata = mock(ColumnMetadata.class); + when(highCardinalityMetadata.getCardinality()).thenReturn(100_000); + assertEquals(IndexingOverrides.getIndexReaderProvider() + .newInvertedIndexReader(mock(PinotDataBuffer.class), highCardinalityMetadata), + highCardinalityInvertedIndexReader); } private IndexCreationContext.Inverted newContext(int cardinality) { diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProviders.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProviders.java deleted file mode 100644 index 40466f11e8a4..000000000000 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/IndexCreatorProviders.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.segment.spi.creator; - -import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.MethodType; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator; -import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator; -import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator; -import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; -import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator; -import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator; -import org.apache.pinot.segment.spi.index.creator.TextIndexCreator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * Plugin registration point to allow index creation logic to be swapped out. - * Plugins should not reimplement Pinot's default index creation logic. - * Users provide an override to Pinot's index creation logic. This is simplified - * by extending {@see IndexCreatorProviders.Default} - */ -public final class IndexCreatorProviders { - - private static final Logger LOGGER = LoggerFactory.getLogger(IndexCreatorProviders.class); - - private static final IndexCreatorProvider DEFAULT = defaultProvider(); - private static final AtomicReference REGISTRATION = new AtomicReference<>(DEFAULT); - - private IndexCreatorProviders() { - } - - /** - * The caller provides a decorator to wrap the default provider, which allows plugins to create - * a delegation chain. - * @param provider index creation override - * @return true if this is the first invocation and the provider has not yet been used. - */ - public static boolean registerProvider(IndexCreatorProvider provider) { - return REGISTRATION.compareAndSet(DEFAULT, provider); - } - - /** - * Obtain the registered index creator provider. If the user has provided an override, then it will be used instead. - * If the user has not provided an override yet, then this action will prevent them from doing so. - * @return the global index provision logic. - */ - public static IndexCreatorProvider getIndexCreatorProvider() { - return Holder.PROVIDER; - } - - private static final class Holder { - public static final IndexCreatorProvider PROVIDER = REGISTRATION.get(); - } - - private static IndexCreatorProvider defaultProvider() { - // use MethodHandle to break circular dependency and keep implementation details encapsulated within - // pinot-segment-local - String className = "org.apache.pinot.segment.local.segment.creator.impl.DefaultIndexCreatorProvider"; - try { - Class clazz = Class.forName(className, false, IndexCreatorProviders.class.getClassLoader()); - return (IndexCreatorProvider) MethodHandles.publicLookup() - .findConstructor(clazz, MethodType.methodType(void.class)).invoke(); - } catch (Throwable missing) { - LOGGER.error("could not construct MethodHandle for {}", className, missing); - // this means pinot-segment-local isn't on the classpath, but this means - // no indexes will be created, so it's ok to return null - return null; - } - } - - /** - * Extend this class to override index creation - */ - public static class Default implements IndexCreatorProvider { - - @Override - public BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter context) - throws IOException { - if (DEFAULT == null) { - throw new UnsupportedOperationException("default implementation not present on classpath"); - } - return DEFAULT.newBloomFilterCreator(context); - } - - @Override - public ForwardIndexCreator newForwardIndexCreator(IndexCreationContext.Forward context) - throws Exception { - if (DEFAULT == null) { - throw new UnsupportedOperationException("default implementation not present on classpath"); - } - return DEFAULT.newForwardIndexCreator(context); - } - - @Override - public GeoSpatialIndexCreator newGeoSpatialIndexCreator(IndexCreationContext.Geospatial context) - throws IOException { - if (DEFAULT == null) { - throw new UnsupportedOperationException("default implementation not present on classpath"); - } - return DEFAULT.newGeoSpatialIndexCreator(context); - } - - @Override - public DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreationContext.Inverted context) - throws IOException { - if (DEFAULT == null) { - throw new UnsupportedOperationException("default implementation not present on classpath"); - } - return DEFAULT.newInvertedIndexCreator(context); - } - - @Override - public JsonIndexCreator newJsonIndexCreator(IndexCreationContext.Json context) - throws IOException { - if (DEFAULT == null) { - throw new UnsupportedOperationException("default implementation not present on classpath"); - } - return DEFAULT.newJsonIndexCreator(context); - } - - @Override - public CombinedInvertedIndexCreator newRangeIndexCreator(IndexCreationContext.Range context) - throws IOException { - if (DEFAULT == null) { - throw new UnsupportedOperationException("default implementation not present on classpath"); - } - return DEFAULT.newRangeIndexCreator(context); - } - - @Override - public TextIndexCreator newTextIndexCreator(IndexCreationContext.Text context) - throws IOException { - if (DEFAULT == null) { - throw new UnsupportedOperationException("default implementation not present on classpath"); - } - return DEFAULT.newTextIndexCreator(context); - } - } -} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/IndexingOverrides.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/IndexingOverrides.java new file mode 100644 index 000000000000..9df465074a03 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/IndexingOverrides.java @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.index; + +import java.io.File; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.creator.IndexCreationContext; +import org.apache.pinot.segment.spi.creator.IndexCreatorProvider; +import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator; +import org.apache.pinot.segment.spi.index.creator.CombinedInvertedIndexCreator; +import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator; +import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; +import org.apache.pinot.segment.spi.index.creator.GeoSpatialIndexCreator; +import org.apache.pinot.segment.spi.index.creator.JsonIndexCreator; +import org.apache.pinot.segment.spi.index.creator.TextIndexCreator; +import org.apache.pinot.segment.spi.index.reader.BloomFilterReader; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.index.reader.H3IndexReader; +import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader; +import org.apache.pinot.segment.spi.index.reader.JsonIndexReader; +import org.apache.pinot.segment.spi.index.reader.RangeIndexReader; +import org.apache.pinot.segment.spi.index.reader.SortedIndexReader; +import org.apache.pinot.segment.spi.index.reader.TextIndexReader; +import org.apache.pinot.segment.spi.index.reader.provider.IndexReaderProvider; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class IndexingOverrides { + + public interface IndexingOverride extends IndexCreatorProvider, IndexReaderProvider { + } + + private static final Logger LOGGER = LoggerFactory.getLogger(IndexingOverrides.class); + + private static final IndexCreatorProvider CREATOR_DEFAULTS = createDefaultCreatorProvider(); + private static final IndexReaderProvider READER_DEFAULTS = createDefaultReaderProvider(); + private static final AtomicReference REGISTRATION = new AtomicReference<>(null); + + + private IndexingOverrides() { + } + + /** + * The caller provides a decorator to wrap the default provider, which allows plugins to create + * a delegation chain. + * @param provider indexing override + * @return true if this is the first invocation and the provider has not yet been used. + */ + public static boolean registerProvider(IndexingOverride provider) { + return REGISTRATION.compareAndSet(null, provider); + } + + /** + * Gets the registered {@see IndexReaderProvider} or the default if none was registered yet. + * @return an index reader provier. + */ + public static IndexReaderProvider getIndexReaderProvider() { + return Holder.PROVIDER; + } + + /** + * Obtain the registered index creator provider. If the user has provided an override, then it will be used instead. + * If the user has not provided an override yet, then this action will prevent them from doing so. + * @return the global index provision logic. + */ + public static IndexCreatorProvider getIndexCreatorProvider() { + return Holder.PROVIDER; + } + + private static final class Holder { + public static final IndexingOverride PROVIDER = Optional.ofNullable(REGISTRATION.get()).orElseGet(Default::new); + } + + private static IndexCreatorProvider createDefaultCreatorProvider() { + return invokeDefaultConstructor("org.apache.pinot.segment.local.segment.creator.impl.DefaultIndexCreatorProvider"); + } + + private static IndexReaderProvider createDefaultReaderProvider() { + return invokeDefaultConstructor("org.apache.pinot.segment.local.segment.index.readers.DefaultIndexReaderProvider"); + } + + @SuppressWarnings("unchecked") + private static T invokeDefaultConstructor(String className) { + try { + Class clazz = Class.forName(className, false, IndexingOverrides.class.getClassLoader()); + return (T) MethodHandles.publicLookup() + .findConstructor(clazz, MethodType.methodType(void.class)).invoke(); + } catch (Throwable missing) { + LOGGER.error("could not construct MethodHandle for {}", className, missing); + return null; + } + } + + /** + * Extend this class to override index creation + */ + public static class Default implements IndexingOverride { + + @Override + public BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter context) + throws IOException { + ensureCreatorPresent(); + return CREATOR_DEFAULTS.newBloomFilterCreator(context); + } + + @Override + public ForwardIndexCreator newForwardIndexCreator(IndexCreationContext.Forward context) + throws Exception { + ensureCreatorPresent(); + return CREATOR_DEFAULTS.newForwardIndexCreator(context); + } + + @Override + public GeoSpatialIndexCreator newGeoSpatialIndexCreator(IndexCreationContext.Geospatial context) + throws IOException { + ensureCreatorPresent(); + return CREATOR_DEFAULTS.newGeoSpatialIndexCreator(context); + } + + @Override + public DictionaryBasedInvertedIndexCreator newInvertedIndexCreator(IndexCreationContext.Inverted context) + throws IOException { + ensureCreatorPresent(); + return CREATOR_DEFAULTS.newInvertedIndexCreator(context); + } + + @Override + public JsonIndexCreator newJsonIndexCreator(IndexCreationContext.Json context) + throws IOException { + ensureCreatorPresent(); + return CREATOR_DEFAULTS.newJsonIndexCreator(context); + } + + @Override + public CombinedInvertedIndexCreator newRangeIndexCreator(IndexCreationContext.Range context) + throws IOException { + ensureCreatorPresent(); + return CREATOR_DEFAULTS.newRangeIndexCreator(context); + } + + @Override + public TextIndexCreator newTextIndexCreator(IndexCreationContext.Text context) + throws IOException { + ensureCreatorPresent(); + return CREATOR_DEFAULTS.newTextIndexCreator(context); + } + + @Override + public BloomFilterReader newBloomFilterReader(PinotDataBuffer dataBuffer, boolean onHeap) + throws IOException { + ensureReaderPresent(); + return READER_DEFAULTS.newBloomFilterReader(dataBuffer, onHeap); + } + + @Override + public ForwardIndexReader newForwardIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IOException { + ensureReaderPresent(); + return READER_DEFAULTS.newForwardIndexReader(dataBuffer, metadata); + } + + @Override + public H3IndexReader newGeospatialIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IOException { + ensureReaderPresent(); + return READER_DEFAULTS.newGeospatialIndexReader(dataBuffer, metadata); + } + + @Override + public InvertedIndexReader newInvertedIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IOException { + ensureReaderPresent(); + return READER_DEFAULTS.newInvertedIndexReader(dataBuffer, metadata); + } + + @Override + public JsonIndexReader newJsonIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IOException { + ensureReaderPresent(); + return READER_DEFAULTS.newJsonIndexReader(dataBuffer, metadata); + } + + @Override + public RangeIndexReader newRangeIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IOException { + ensureReaderPresent(); + return READER_DEFAULTS.newRangeIndexReader(dataBuffer, metadata); + } + + @Override + public SortedIndexReader newSortedIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IOException { + ensureReaderPresent(); + return READER_DEFAULTS.newSortedIndexReader(dataBuffer, metadata); + } + + @Override + public TextIndexReader newFSTIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IOException { + ensureReaderPresent(); + return READER_DEFAULTS.newFSTIndexReader(dataBuffer, metadata); + } + + @Override + public TextIndexReader newTextIndexReader(File file, ColumnMetadata columnMetadata, + @Nullable Map textIndexProperties) { + ensureReaderPresent(); + return READER_DEFAULTS.newTextIndexReader(file, columnMetadata, textIndexProperties); + } + + private void ensureReaderPresent() { + if (READER_DEFAULTS == null) { + throw new UnsupportedOperationException("default implementation not present on classpath"); + } + } + + private void ensureCreatorPresent() { + if (CREATOR_DEFAULTS == null) { + throw new UnsupportedOperationException("default implementation not present on classpath"); + } + } + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/BloomFilterReaderProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/BloomFilterReaderProvider.java new file mode 100644 index 000000000000..fb6221cff8b1 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/BloomFilterReaderProvider.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.index.reader.provider; + +import java.io.IOException; +import org.apache.pinot.segment.spi.index.reader.BloomFilterReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; + + +public interface BloomFilterReaderProvider { + + /** + * Creates a {@see BloomFilterReader} + * @param dataBuffer the buffer, the caller is responsible for closing it + * @param onHeap whether to duplicate on heap. + * @return a bloom filter reader + * @throws IOException if reading from the buffer fails. + */ + BloomFilterReader newBloomFilterReader(PinotDataBuffer dataBuffer, boolean onHeap) + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/ForwardIndexReaderProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/ForwardIndexReaderProvider.java new file mode 100644 index 000000000000..cc90c230f345 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/ForwardIndexReaderProvider.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.index.reader.provider; + +import java.io.IOException; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; + + +public interface ForwardIndexReaderProvider { + + /** + * Creates a {@see ForwardIndexReader} + * @param dataBuffer the buffer, the caller is responsible for closing it + * @param metadata the column metadata, may be used to select a reader if the buffer does not start with a magic byte. + * @return a forward index reader + * @throws IOException if reading from the buffer fails. + */ + ForwardIndexReader newForwardIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/GeospatialIndexReaderProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/GeospatialIndexReaderProvider.java new file mode 100644 index 000000000000..0de4d63e08e9 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/GeospatialIndexReaderProvider.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.index.reader.provider; + +import java.io.IOException; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.index.reader.H3IndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; + + +public interface GeospatialIndexReaderProvider { + /** + * Creates a {@see H3IndexReader} + * @param dataBuffer the buffer, the caller is responsible for closing it + * @param metadata the column metadata, may be used to select a reader if the buffer does not start with a magic byte. + * @return a geospatial index reader + * @throws IOException if reading from the buffer fails. + */ + H3IndexReader newGeospatialIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/IndexReaderProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/IndexReaderProvider.java new file mode 100644 index 000000000000..0db3e4d789b5 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/IndexReaderProvider.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.index.reader.provider; + +public interface IndexReaderProvider + extends BloomFilterReaderProvider, ForwardIndexReaderProvider, GeospatialIndexReaderProvider, + InvertedIndexReaderProvider, JsonIndexReaderProvider, RangeIndexReaderProvider, SortedIndexReaderProvider, + TextIndexReaderProvider { +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/InvertedIndexReaderProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/InvertedIndexReaderProvider.java new file mode 100644 index 000000000000..0aad43dd3b85 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/InvertedIndexReaderProvider.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.index.reader.provider; + +import java.io.IOException; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; + + +public interface InvertedIndexReaderProvider { + + /** + * Creates a {@see InvertedIndexReader} + * @param dataBuffer the buffer, the caller is responsible for closing it + * @param metadata the column metadata, may be used to select a reader if the buffer does not start with a magic byte. + * @return an inverted index reader + * @throws IOException if reading from the buffer fails. + */ + InvertedIndexReader newInvertedIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/JsonIndexReaderProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/JsonIndexReaderProvider.java new file mode 100644 index 000000000000..8f048613b825 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/JsonIndexReaderProvider.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.index.reader.provider; + +import java.io.IOException; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.index.reader.JsonIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; + + +public interface JsonIndexReaderProvider { + + /** + * Creates a {@see JsonIndexReader} + * @param dataBuffer the buffer, the caller is responsible for closing it + * @param metadata the column metadata, may be used to select a reader if the buffer does not start with a magic byte. + * @return a JSON index reader + * @throws IOException if reading from the buffer fails. + */ + JsonIndexReader newJsonIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/RangeIndexReaderProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/RangeIndexReaderProvider.java new file mode 100644 index 000000000000..80fee404890e --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/RangeIndexReaderProvider.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.index.reader.provider; + +import java.io.IOException; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.index.reader.RangeIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; + + +public interface RangeIndexReaderProvider { + + /** + * Creates a {@see RangeIndexReader} + * @param dataBuffer the buffer, the caller is responsible for closing it + * @param metadata the column metadata, may be used to select a reader if the buffer does not start with a magic byte. + * @return a range index reader + * @throws IOException if reading from the buffer fails. + */ + RangeIndexReader newRangeIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/SortedIndexReaderProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/SortedIndexReaderProvider.java new file mode 100644 index 000000000000..d04ed1dd0100 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/SortedIndexReaderProvider.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.index.reader.provider; + +import java.io.IOException; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.index.reader.SortedIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; + + +public interface SortedIndexReaderProvider { + + /** + * Creates a {@see SortedIndexReader} + * @param dataBuffer the buffer, the caller is responsible for closing it + * @param metadata the column metadata, may be used to select a reader if the buffer does not start with a magic byte. + * @return a sorted index reader + * @throws IOException if reading from the buffer fails. + */ + SortedIndexReader newSortedIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/TextIndexReaderProvider.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/TextIndexReaderProvider.java new file mode 100644 index 000000000000..bc2e60007692 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/provider/TextIndexReaderProvider.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.index.reader.provider; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.index.reader.TextIndexReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; + + +public interface TextIndexReaderProvider { + + /** + * Creates a {@see TextIndexReader} + * @param dataBuffer the buffer, the caller is responsible for closing it + * @param metadata the column metadata, may be used to select a reader if the buffer does not start with a magic byte. + * @return a text index reader + * @throws IOException if reading from the buffer fails. + */ + TextIndexReader newFSTIndexReader(PinotDataBuffer dataBuffer, ColumnMetadata metadata) + throws IOException; + + /** + * Creates a {@see TextIndexReader} + * @param file the file + * @param metadata the column metadata, may be used to select a reader if the buffer does not start with a magic byte. + * @return a text index reader + */ + TextIndexReader newTextIndexReader(File file, ColumnMetadata metadata, + @Nullable Map textIndexProperties); +} diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvidersTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexingOverridesTest.java similarity index 57% rename from pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvidersTest.java rename to pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexingOverridesTest.java index bfaebb32f892..e6be3bf3ec48 100644 --- a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexCreatorProvidersTest.java +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/creator/IndexingOverridesTest.java @@ -19,7 +19,10 @@ package org.apache.pinot.segment.spi.creator; import java.io.IOException; +import org.apache.pinot.segment.spi.index.IndexingOverrides; import org.apache.pinot.segment.spi.index.creator.BloomFilterCreator; +import org.apache.pinot.segment.spi.index.reader.BloomFilterReader; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.testng.annotations.Test; import static org.mockito.Mockito.mock; @@ -27,27 +30,42 @@ import static org.testng.Assert.assertTrue; -public class IndexCreatorProvidersTest { +public class IndexingOverridesTest { @Test - public void indexCreatorProvidersLoadableWithoutDefaultImplementation() + public void indexingOverridesLoadableWithoutDefaultImplementation() throws IOException { BloomFilterCreator mockBloomFilterCreator = mock(BloomFilterCreator.class); - assertTrue(IndexCreatorProviders.registerProvider(new IndexCreatorProviders.Default() { + BloomFilterReader mockBloomFilterReader = mock(BloomFilterReader.class); + assertTrue(IndexingOverrides.registerProvider(new IndexingOverrides.Default() { @Override public BloomFilterCreator newBloomFilterCreator(IndexCreationContext.BloomFilter context) { return mockBloomFilterCreator; } + + @Override + public BloomFilterReader newBloomFilterReader(PinotDataBuffer dataBuffer, boolean onHeap) { + return mockBloomFilterReader; + } })); // it's ok to load external overrides without an internal implementation present, e.g. for testing - assertEquals(mockBloomFilterCreator, IndexCreatorProviders.getIndexCreatorProvider() + assertEquals(mockBloomFilterCreator, IndexingOverrides.getIndexCreatorProvider() .newBloomFilterCreator(mock(IndexCreationContext.BloomFilter.class))); + assertEquals(mockBloomFilterReader, IndexingOverrides.getIndexReaderProvider() + .newBloomFilterReader(mock(PinotDataBuffer.class), false)); + } + + @Test(expectedExceptions = UnsupportedOperationException.class) + public void whenDefaultImplementationMissingThrowUnsupportedOperationExceptionCreator() + throws IOException { + // the implementation is missing so no indexes will be created anyway... + new IndexingOverrides.Default().newBloomFilterCreator(mock(IndexCreationContext.BloomFilter.class)); } @Test(expectedExceptions = UnsupportedOperationException.class) - public void whenDefaultImplementationMissingThrowUnsupportedOperationException() + public void whenDefaultImplementationMissingThrowUnsupportedOperationExceptionReader() throws IOException { // the implementation is missing so no indexes will be created anyway... - new IndexCreatorProviders.Default().newBloomFilterCreator(mock(IndexCreationContext.BloomFilter.class)); + new IndexingOverrides.Default().newBloomFilterReader(mock(PinotDataBuffer.class), true); } }