From 0fc2506e90619ed23b66cabe1e9afc2337a546f6 Mon Sep 17 00:00:00 2001 From: Xiaobing Li Date: Fri, 20 Jan 2023 11:32:07 -0800 Subject: [PATCH] extend SegmentDirectory.Reader ifact to abstract access to index buffers from StarTree index --- .../immutable/ImmutableSegmentLoader.java | 3 +- .../segment/store/FilePerIndexDirectory.java | 33 +-- .../store/SegmentLocalFSDirectory.java | 92 ++++++--- .../store/SingleFileIndexDirectory.java | 38 +--- .../segment/store/StarTreeIndexReader.java | 190 ++++++++++++++++++ .../v2/store/StarTreeIndexContainer.java | 7 +- .../v2/store/StarTreeLoaderUtils.java | 37 ++-- .../store/SingleFileIndexDirectoryTest.java | 12 -- .../store/StarTreeIndexReaderTest.java | 172 ++++++++++++++++ .../spi/store/ColumnIndexDirectory.java | 14 -- .../segment/spi/store/SegmentDirectory.java | 31 +-- .../segment/spi/store/SegmentIndexType.java | 45 +++++ 12 files changed, 502 insertions(+), 172 deletions(-) create mode 100644 pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/StarTreeIndexReader.java create mode 100644 pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/StarTreeIndexReaderTest.java create mode 100644 pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentIndexType.java diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java index fbe87f94bc6a..4abbcd4ded49 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java @@ -46,6 +46,7 @@ import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.segment.spi.store.SegmentIndexType; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; @@ -200,7 +201,7 @@ public static ImmutableSegment load(SegmentDirectory segmentDirectory, IndexLoad // Load star-tree index if it exists StarTreeIndexContainer starTreeIndexContainer = null; - if (segmentMetadata.getStarTreeV2MetadataList() != null && segmentReader.getStarTreeIndex() != null) { + if (segmentReader.hasSegmentIndex(SegmentIndexType.STARTREE_INDEX)) { starTreeIndexContainer = new StarTreeIndexContainer(segmentReader, segmentMetadata, indexContainerMap); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java index a85e5b629cef..15b02d6d2f66 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/FilePerIndexDirectory.java @@ -21,9 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; -import java.io.InputStream; import java.nio.ByteOrder; import java.util.HashMap; import java.util.HashSet; @@ -33,7 +31,6 @@ import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; -import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.ColumnIndexDirectory; import org.apache.pinot.segment.spi.store.ColumnIndexType; @@ -45,9 +42,7 @@ class FilePerIndexDirectory extends ColumnIndexDirectory { private SegmentMetadataImpl _segmentMetadata; private final ReadMode _readMode; private final Map _indexBuffers = new HashMap<>(); - // Different from the other column-index entries, starTree index is multi-column index and has its own index map, - // thus manage it separately. - private PinotDataBuffer _starTreeIndexDataBuffer; + /** * @param segmentDirectory File pointing to segment directory * @param segmentMetadata segment metadata. Metadata must be fully initialized @@ -80,29 +75,6 @@ public PinotDataBuffer getBuffer(String column, ColumnIndexType type) return getReadBufferFor(key); } - @Override - public PinotDataBuffer getStarTreeIndex() - throws IOException { - if (_starTreeIndexDataBuffer != null) { - return _starTreeIndexDataBuffer; - } - File indexFile = new File(_segmentDirectory, StarTreeV2Constants.INDEX_FILE_NAME); - if (_readMode == ReadMode.heap) { - _starTreeIndexDataBuffer = - PinotDataBuffer.loadFile(indexFile, 0, indexFile.length(), ByteOrder.BIG_ENDIAN, "Star-tree V2 data buffer"); - } else { - _starTreeIndexDataBuffer = PinotDataBuffer.mapFile(indexFile, true, 0, indexFile.length(), ByteOrder.BIG_ENDIAN, - "Star-tree V2 data buffer"); - } - return _starTreeIndexDataBuffer; - } - - @Override - public InputStream getStarTreeIndexMap() - throws IOException { - return new FileInputStream(new File(_segmentDirectory, StarTreeV2Constants.INDEX_MAP_FILE_NAME)); - } - @Override public PinotDataBuffer newBuffer(String column, ColumnIndexType type, long sizeBytes) throws IOException { @@ -122,9 +94,6 @@ public void close() for (PinotDataBuffer dataBuffer : _indexBuffers.values()) { dataBuffer.close(); } - if (_starTreeIndexDataBuffer != null) { - _starTreeIndexDataBuffer.close(); - } } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java index 4edfdd2b7097..6ec3180727fc 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SegmentLocalFSDirectory.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.net.URI; import java.nio.file.Path; import java.util.Collections; @@ -38,6 +37,7 @@ import org.apache.pinot.segment.spi.store.ColumnIndexType; import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; +import org.apache.pinot.segment.spi.store.SegmentIndexType; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.ReadMode; import org.slf4j.Logger; @@ -64,6 +64,7 @@ public class SegmentLocalFSDirectory extends SegmentDirectory { private final ReadMode _readMode; private SegmentMetadataImpl _segmentMetadata; private ColumnIndexDirectory _columnIndexDirectory; + private StarTreeIndexReader _starTreeIndexReader; private String _tier; // Create an empty SegmentLocalFSDirectory object mainly used to @@ -248,25 +249,26 @@ protected void load() private synchronized void loadData() throws IOException { - if (_columnIndexDirectory != null) { - return; + if (_columnIndexDirectory == null) { + switch (_segmentMetadata.getVersion()) { + case v1: + case v2: + _columnIndexDirectory = new FilePerIndexDirectory(_segmentDirectory, _segmentMetadata, _readMode); + break; + case v3: + try { + _columnIndexDirectory = new SingleFileIndexDirectory(_segmentDirectory, _segmentMetadata, _readMode); + } catch (ConfigurationException e) { + LOGGER.error("Failed to create columnar index directory", e); + throw new RuntimeException(e); + } + break; + default: + break; + } } - - switch (_segmentMetadata.getVersion()) { - case v1: - case v2: - _columnIndexDirectory = new FilePerIndexDirectory(_segmentDirectory, _segmentMetadata, _readMode); - break; - case v3: - try { - _columnIndexDirectory = new SingleFileIndexDirectory(_segmentDirectory, _segmentMetadata, _readMode); - } catch (ConfigurationException e) { - LOGGER.error("Failed to create columnar index directory", e); - throw new RuntimeException(e); - } - break; - default: - break; + if (_starTreeIndexReader == null && _segmentMetadata.getStarTreeV2MetadataList() != null) { + _starTreeIndexReader = new StarTreeIndexReader(_segmentDirectory, _segmentMetadata, _readMode); } } @@ -279,6 +281,10 @@ public void close() _columnIndexDirectory.close(); _columnIndexDirectory = null; } + if (_starTreeIndexReader != null) { + _starTreeIndexReader.close(); + } + _starTreeIndexReader = null; } } @@ -349,6 +355,42 @@ public boolean hasIndexFor(String column, ColumnIndexType type) { return _columnIndexDirectory.hasIndexFor(column, type); } + public boolean hasSegmentIndex(SegmentIndexType type) { + if (type != SegmentIndexType.STARTREE_INDEX) { + throw new IllegalArgumentException("Unknown SegmentIndexType: " + type); + } + return _starTreeIndexReader != null; + } + + public SegmentDirectory.Reader getSegmentIndexReaderFor(String indexName, SegmentIndexType type) { + if (type != SegmentIndexType.STARTREE_INDEX) { + throw new IllegalArgumentException("Unknown SegmentIndexType: " + type); + } + return new SegmentDirectory.Reader() { + @Override + public PinotDataBuffer getIndexFor(String column, ColumnIndexType type) + throws IOException { + return _starTreeIndexReader.getBuffer(indexName, column, type); + } + + @Override + public boolean hasIndexFor(String column, ColumnIndexType type) { + return _starTreeIndexReader.hasIndexFor(indexName, column, type); + } + + @Override + public String toString() { + return _starTreeIndexReader.toString() + " for " + indexName; + } + + @Override + public void close() + throws IOException { + // Noop as _starTreeIndexReader is owned by the top level Reader + } + }; + } + @Override public void close() { // do nothing here @@ -359,18 +401,6 @@ public void close() { public String toString() { return _segmentDirectory.toString(); } - - @Override - public PinotDataBuffer getStarTreeIndex() - throws IOException { - return _columnIndexDirectory.getStarTreeIndex(); - } - - @Override - public InputStream getStarTreeIndexMap() - throws IOException { - return _columnIndexDirectory.getStarTreeIndexMap(); - } } /*************************** SegmentDirectory Writer *********************/ diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java index f9824dd5a230..28a1c75eeebd 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java @@ -22,10 +22,8 @@ import com.google.common.base.Preconditions; import java.io.BufferedWriter; import java.io.File; -import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; -import java.io.InputStream; import java.io.PrintWriter; import java.io.RandomAccessFile; import java.nio.ByteOrder; @@ -43,7 +41,6 @@ import org.apache.commons.io.FileUtils; import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; -import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; import org.apache.pinot.segment.spi.store.ColumnIndexDirectory; import org.apache.pinot.segment.spi.store.ColumnIndexType; @@ -87,9 +84,6 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory { private final File _indexFile; private final Map _columnEntries; private final List _allocBuffers; - // Different from the other column-index entries, starTree index is multi-column index and has its own index map, - // thus manage it separately. - private PinotDataBuffer _starTreeIndexDataBuffer; // For V3 segment format, the index cleanup consists of two steps: mark and sweep. // The removeIndex() method marks an index to be removed; and the index info is @@ -213,21 +207,6 @@ private void load() throws IOException, ConfigurationException { loadMap(); mapBufferEntries(); - if (_segmentMetadata.getStarTreeV2MetadataList() != null) { - loadStarTreeIndex(); - } - } - - private void loadStarTreeIndex() - throws IOException { - File indexFile = new File(_segmentDirectory, StarTreeV2Constants.INDEX_FILE_NAME); - if (_readMode == ReadMode.heap) { - _starTreeIndexDataBuffer = - PinotDataBuffer.loadFile(indexFile, 0, indexFile.length(), ByteOrder.BIG_ENDIAN, "Star-tree V2 data buffer"); - } else { - _starTreeIndexDataBuffer = PinotDataBuffer.mapFile(indexFile, true, 0, indexFile.length(), ByteOrder.BIG_ENDIAN, - "Star-tree V2 data buffer"); - } } private void loadMap() @@ -367,9 +346,6 @@ public void close() for (PinotDataBuffer buf : _allocBuffers) { buf.close(); } - if (_starTreeIndexDataBuffer != null) { - _starTreeIndexDataBuffer.close(); - } // Cleanup removed indices after closing and flushing buffers, so // that potential index updates can be persisted across cleanups. if (_shouldCleanupRemovedIndices) { @@ -413,21 +389,9 @@ public Set getColumnsWithIndex(ColumnIndexType type) { return columns; } - @Override - public PinotDataBuffer getStarTreeIndex() - throws IOException { - return _starTreeIndexDataBuffer; - } - - @Override - public InputStream getStarTreeIndexMap() - throws IOException { - return new FileInputStream(new File(_segmentDirectory, StarTreeV2Constants.INDEX_MAP_FILE_NAME)); - } - @Override public String toString() { - return _segmentDirectory.toString() + "/" + _indexFile.toString(); + return _indexFile.toString(); } /** diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/StarTreeIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/StarTreeIndexReader.java new file mode 100644 index 000000000000..9b6cd79ddbe3 --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/StarTreeIndexReader.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.store; + +import com.google.common.base.Preconditions; +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteOrder; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexMapUtils; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; +import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants; +import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.segment.spi.store.ColumnIndexType; +import org.apache.pinot.spi.utils.ReadMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This class provides access to the StarTree index data in a segment directory. The StarTree index data is stored in + * star_tree_index file, whose content can be parsed according to offset/size from star_tree_index_map file. + */ +public class StarTreeIndexReader implements Closeable { + private static final Logger LOGGER = LoggerFactory.getLogger(StarTreeIndexReader.class); + + private final File _segmentDirectory; + private final SegmentMetadataImpl _segmentMetadata; + private final ReadMode _readMode; + private final File _indexFile; + private final int _numStarTrees; + + // StarTree index can contain multiple index instances, identified by ids like 0, 1, etc. + private final Map> _indexColumnEntries; + private PinotDataBuffer _dataBuffer; + + /** + * @param segmentDirectory the segment directory contains StarTree index + * @param segmentMetadata segment metadata must be fully initialized + * @param readMode mmap vs heap mode + */ + public StarTreeIndexReader(File segmentDirectory, SegmentMetadataImpl segmentMetadata, ReadMode readMode) + throws IOException { + Preconditions.checkNotNull(segmentDirectory); + Preconditions.checkArgument(segmentDirectory.exists(), "SegmentDirectory: " + segmentDirectory + " does not exist"); + Preconditions.checkArgument(segmentDirectory.isDirectory(), + "SegmentDirectory: " + segmentDirectory + " is not a directory"); + Preconditions.checkNotNull(segmentMetadata); + Preconditions.checkNotNull(readMode); + + _segmentDirectory = segmentDirectory; + _segmentMetadata = segmentMetadata; + _readMode = readMode; + _numStarTrees = _segmentMetadata.getStarTreeV2MetadataList().size(); + _indexFile = new File(_segmentDirectory, StarTreeV2Constants.INDEX_FILE_NAME); + _indexColumnEntries = new HashMap<>(_numStarTrees); + load(); + } + + private void load() + throws IOException { + List> indexMapList; + try (InputStream inputStream = new FileInputStream( + new File(_segmentDirectory, StarTreeV2Constants.INDEX_MAP_FILE_NAME))) { + indexMapList = StarTreeIndexMapUtils.loadFromInputStream(inputStream, _numStarTrees); + } + if (_readMode == ReadMode.heap) { + _dataBuffer = PinotDataBuffer.loadFile(_indexFile, 0, _indexFile.length(), ByteOrder.LITTLE_ENDIAN, + "StarTree V2 data buffer from: " + _indexFile); + } else { + _dataBuffer = PinotDataBuffer.mapFile(_indexFile, true, 0, _indexFile.length(), ByteOrder.LITTLE_ENDIAN, + "StarTree V2 data buffer from: " + _indexFile); + } + for (int i = 0; i < _numStarTrees; i++) { + mapBufferEntries(i, indexMapList.get(i)); + } + LOGGER.debug("Loaded StarTree index data buffers: {} in segment: {}", _indexColumnEntries, _segmentDirectory); + } + + private void mapBufferEntries(int starTreeId, + Map indexMap) { + String idxName = String.valueOf(starTreeId); + Map columnEntries = + _indexColumnEntries.computeIfAbsent(idxName, k -> new HashMap<>()); + // Load star-tree index. The index tree doesn't have corresponding column name or column index type to create an + // IndexKey. As it's a kind of inverted index, we uniquely identify it with index id and inverted index type. + columnEntries.computeIfAbsent(new IndexKey(idxName, ColumnIndexType.INVERTED_INDEX), + k -> new StarTreeIndexEntry(indexMap.get(StarTreeIndexMapUtils.STAR_TREE_INDEX_KEY), _dataBuffer, + ByteOrder.LITTLE_ENDIAN)); + List starTreeMetadataList = _segmentMetadata.getStarTreeV2MetadataList(); + StarTreeV2Metadata starTreeMetadata = starTreeMetadataList.get(starTreeId); + // Load dimension forward indexes + for (String dimension : starTreeMetadata.getDimensionsSplitOrder()) { + IndexKey indexKey = new IndexKey(dimension, ColumnIndexType.FORWARD_INDEX); + columnEntries.computeIfAbsent(indexKey, k -> new StarTreeIndexEntry( + indexMap.get(new StarTreeIndexMapUtils.IndexKey(StarTreeIndexMapUtils.IndexType.FORWARD_INDEX, dimension)), + _dataBuffer, ByteOrder.BIG_ENDIAN)); + } + // Load metric (function-column pair) forward indexes + for (AggregationFunctionColumnPair functionColumnPair : starTreeMetadata.getFunctionColumnPairs()) { + String metric = functionColumnPair.toColumnName(); + IndexKey indexKey = new IndexKey(metric, ColumnIndexType.FORWARD_INDEX); + columnEntries.computeIfAbsent(indexKey, k -> new StarTreeIndexEntry( + indexMap.get(new StarTreeIndexMapUtils.IndexKey(StarTreeIndexMapUtils.IndexType.FORWARD_INDEX, metric)), + _dataBuffer, ByteOrder.BIG_ENDIAN)); + } + } + + public PinotDataBuffer getBuffer(String indexName, String column, ColumnIndexType type) + throws IOException { + Map columnEntries = _indexColumnEntries.get(indexName); + if (columnEntries == null) { + throw new RuntimeException( + String.format("Could not find StarTree index: %s in segment: %s", indexName, _segmentDirectory.toString())); + } + StarTreeIndexEntry entry = columnEntries.get(new IndexKey(column, type)); + if (entry != null && entry._buffer != null) { + return entry._buffer; + } + throw new RuntimeException( + String.format("Could not find index for column: %s, type: %s in StarTree index: %s in segment: %s", column, + type, indexName, _segmentDirectory.toString())); + } + + public boolean hasIndexFor(String indexName, String column, ColumnIndexType type) { + Map columnEntries = _indexColumnEntries.get(indexName); + if (columnEntries == null) { + return false; + } + return columnEntries.containsKey(new IndexKey(column, type)); + } + + @Override + public String toString() { + return _indexFile.toString(); + } + + @Override + public void close() + throws IOException { + _indexColumnEntries.clear(); + _dataBuffer.close(); + } + + private static class StarTreeIndexEntry { + private final long _offset; + private final long _size; + private final PinotDataBuffer _buffer; + + public StarTreeIndexEntry(long offset, long size, PinotDataBuffer buffer) { + _offset = offset; + _size = size; + _buffer = buffer; + } + + public StarTreeIndexEntry(StarTreeIndexMapUtils.IndexValue indexValue, PinotDataBuffer dataBuffer, + ByteOrder byteOrder) { + this(indexValue._offset, indexValue._size, + dataBuffer.view(indexValue._offset, indexValue._offset + indexValue._size, byteOrder)); + } + + @Override + public String toString() { + return "StarTreeIndexEntry{" + "_offset=" + _offset + ", _size=" + _size + '}'; + } + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexContainer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexContainer.java index 3a7220c95321..2dace24d8113 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexContainer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeIndexContainer.java @@ -20,7 +20,6 @@ import java.io.Closeable; import java.io.IOException; -import java.io.InputStream; import java.util.List; import java.util.Map; import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer; @@ -38,11 +37,7 @@ public class StarTreeIndexContainer implements Closeable { public StarTreeIndexContainer(SegmentDirectory.Reader segmentReader, SegmentMetadataImpl segmentMetadata, Map indexContainerMap) throws IOException { - try (InputStream inputStream = segmentReader.getStarTreeIndexMap()) { - _starTrees = StarTreeLoaderUtils.loadStarTreeV2(segmentReader.getStarTreeIndex(), - StarTreeIndexMapUtils.loadFromInputStream(inputStream, segmentMetadata.getStarTreeV2MetadataList().size()), - segmentMetadata, indexContainerMap); - } + _starTrees = StarTreeLoaderUtils.loadStarTreeV2(segmentReader, segmentMetadata, indexContainerMap); } public List getStarTrees() { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java index 899d736542fa..b1c648334789 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java @@ -19,7 +19,6 @@ package org.apache.pinot.segment.local.startree.v2.store; import java.io.IOException; -import java.nio.ByteOrder; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -39,15 +38,13 @@ import org.apache.pinot.segment.spi.index.startree.StarTreeV2; import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.segment.spi.store.ColumnIndexType; +import org.apache.pinot.segment.spi.store.SegmentDirectory; +import org.apache.pinot.segment.spi.store.SegmentIndexType; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.MetricFieldSpec; -import static org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexMapUtils.IndexKey; -import static org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexMapUtils.IndexType; -import static org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexMapUtils.IndexValue; -import static org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexMapUtils.STAR_TREE_INDEX_KEY; - /** * The {@code StarTreeLoaderUtils} class provides utility methods to load star-tree indexes. @@ -56,21 +53,18 @@ public class StarTreeLoaderUtils { private StarTreeLoaderUtils() { } - public static List loadStarTreeV2(PinotDataBuffer dataBuffer, - List> indexMapList, SegmentMetadataImpl segmentMetadata, - Map indexContainerMap) { + public static List loadStarTreeV2(SegmentDirectory.Reader segmentReader, + SegmentMetadataImpl segmentMetadata, Map indexContainerMap) + throws IOException { List starTreeMetadataList = segmentMetadata.getStarTreeV2MetadataList(); int numStarTrees = starTreeMetadataList.size(); List starTrees = new ArrayList<>(numStarTrees); - for (int i = 0; i < numStarTrees; i++) { - Map indexMap = indexMapList.get(i); - + String indexName = String.valueOf(i); + SegmentDirectory.Reader indexReader = + segmentReader.getSegmentIndexReaderFor(indexName, SegmentIndexType.STARTREE_INDEX); // Load star-tree index - IndexValue indexValue = indexMap.get(STAR_TREE_INDEX_KEY); - long start = indexValue._offset; - long end = start + indexValue._size; - StarTree starTree = new OffHeapStarTree(dataBuffer.view(start, end, ByteOrder.LITTLE_ENDIAN)); + StarTree starTree = new OffHeapStarTree(indexReader.getIndexFor(indexName, ColumnIndexType.INVERTED_INDEX)); StarTreeV2Metadata starTreeMetadata = starTreeMetadataList.get(i); int numDocs = starTreeMetadata.getNumDocs(); @@ -78,10 +72,7 @@ public static List loadStarTreeV2(PinotDataBuffer dataBuffer, // Load dimension forward indexes for (String dimension : starTreeMetadata.getDimensionsSplitOrder()) { - indexValue = indexMap.get(new IndexKey(IndexType.FORWARD_INDEX, dimension)); - start = indexValue._offset; - end = start + indexValue._size; - PinotDataBuffer forwardIndexDataBuffer = dataBuffer.view(start, end, ByteOrder.BIG_ENDIAN); + PinotDataBuffer forwardIndexDataBuffer = indexReader.getIndexFor(dimension, ColumnIndexType.FORWARD_INDEX); ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(dimension); FixedBitSVForwardIndexReaderV2 forwardIndex = new FixedBitSVForwardIndexReaderV2(forwardIndexDataBuffer, numDocs, columnMetadata.getBitsPerElement()); @@ -92,10 +83,7 @@ public static List loadStarTreeV2(PinotDataBuffer dataBuffer, // Load metric (function-column pair) forward indexes for (AggregationFunctionColumnPair functionColumnPair : starTreeMetadata.getFunctionColumnPairs()) { String metric = functionColumnPair.toColumnName(); - indexValue = indexMap.get(new IndexKey(IndexType.FORWARD_INDEX, metric)); - start = indexValue._offset; - end = start + indexValue._size; - PinotDataBuffer forwardIndexDataBuffer = dataBuffer.view(start, end, ByteOrder.BIG_ENDIAN); + PinotDataBuffer forwardIndexDataBuffer = indexReader.getIndexFor(metric, ColumnIndexType.FORWARD_INDEX); DataType dataType = ValueAggregatorFactory.getAggregatedValueType(functionColumnPair.getFunctionType()); FieldSpec fieldSpec = new MetricFieldSpec(metric, dataType); BaseChunkForwardIndexReader forwardIndex; @@ -134,7 +122,6 @@ public void close() } }); } - return starTrees; } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java index 98c32a93b087..62e29e8afd32 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectoryTest.java @@ -21,7 +21,6 @@ import com.google.common.collect.ImmutableMap; import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.util.Arrays; @@ -55,7 +54,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; public class SingleFileIndexDirectoryTest { @@ -394,14 +392,4 @@ public void testGetColumnIndices() new HashSet<>(Collections.singletonList("bar"))); } } - - @Test(expectedExceptions = FileNotFoundException.class, expectedExceptionsMessageRegExp = ".*star_tree_index.*") - public void testLoadStarTreeIndex() - throws Exception { - Mockito.when(_segmentMetadata.getStarTreeV2MetadataList()).thenReturn(Collections.emptyList()); - try (SingleFileIndexDirectory ignore = new SingleFileIndexDirectory(TEMP_DIR, _segmentMetadata, ReadMode.mmap)) { - // Trying to load startree index but not able to find the file. - fail(); - } - } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/StarTreeIndexReaderTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/StarTreeIndexReaderTest.java new file mode 100644 index 000000000000..74f8ebc1cdb6 --- /dev/null +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/store/StarTreeIndexReaderTest.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.segment.store; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexMapUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.segment.spi.creator.SegmentVersion; +import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; +import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; +import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants; +import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.segment.spi.store.ColumnIndexType; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class StarTreeIndexReaderTest { + private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), StarTreeIndexReaderTest.class.toString()); + + private SegmentMetadataImpl _segmentMetadata; + + @BeforeMethod + public void setUp() + throws IOException { + TestUtils.ensureDirectoriesExistAndEmpty(TEMP_DIR); + writeMetadata(); + } + + @AfterMethod + public void tearDown() + throws IOException { + FileUtils.deleteDirectory(TEMP_DIR); + } + + void writeMetadata() { + SegmentMetadataImpl meta = mock(SegmentMetadataImpl.class); + when(meta.getVersion()).thenReturn(SegmentVersion.v3); + when(meta.getStarTreeV2MetadataList()).thenReturn(null); + _segmentMetadata = meta; + } + + @Test + public void testLoadStarTreeIndexBuffers() + throws IOException { + // Test with 2 index trees. + StarTreeV2Metadata stMeta1 = mock(StarTreeV2Metadata.class); + when(stMeta1.getDimensionsSplitOrder()).thenReturn(Arrays.asList("dim0", "dim1")); + when(stMeta1.getFunctionColumnPairs()).thenReturn( + Collections.singleton(new AggregationFunctionColumnPair(AggregationFunctionType.COUNT, "*"))); + StarTreeV2Metadata stMeta2 = mock(StarTreeV2Metadata.class); + when(stMeta2.getDimensionsSplitOrder()).thenReturn(Arrays.asList("dimX", "dimY")); + when(stMeta2.getFunctionColumnPairs()).thenReturn( + Collections.singleton(new AggregationFunctionColumnPair(AggregationFunctionType.SUM, "dimX"))); + when(_segmentMetadata.getStarTreeV2MetadataList()).thenReturn(Arrays.asList(stMeta1, stMeta2)); + // Mock the offset/sizes for the index buffers. + List> indexMaps = new ArrayList<>(); + Map indexMap = new HashMap<>(); + indexMap.put(new StarTreeIndexMapUtils.IndexKey(StarTreeIndexMapUtils.IndexType.STAR_TREE, null), + new StarTreeIndexMapUtils.IndexValue(0, 1)); + indexMap.put(new StarTreeIndexMapUtils.IndexKey(StarTreeIndexMapUtils.IndexType.FORWARD_INDEX, "dim0"), + new StarTreeIndexMapUtils.IndexValue(1, 1)); + indexMap.put(new StarTreeIndexMapUtils.IndexKey(StarTreeIndexMapUtils.IndexType.FORWARD_INDEX, "dim1"), + new StarTreeIndexMapUtils.IndexValue(2, 1)); + indexMap.put(new StarTreeIndexMapUtils.IndexKey(StarTreeIndexMapUtils.IndexType.FORWARD_INDEX, "count__*"), + new StarTreeIndexMapUtils.IndexValue(3, 1)); + indexMaps.add(indexMap); + indexMap = new HashMap<>(); + indexMap.put(new StarTreeIndexMapUtils.IndexKey(StarTreeIndexMapUtils.IndexType.STAR_TREE, null), + new StarTreeIndexMapUtils.IndexValue(10, 3)); + indexMap.put(new StarTreeIndexMapUtils.IndexKey(StarTreeIndexMapUtils.IndexType.FORWARD_INDEX, "dimX"), + new StarTreeIndexMapUtils.IndexValue(13, 3)); + indexMap.put(new StarTreeIndexMapUtils.IndexKey(StarTreeIndexMapUtils.IndexType.FORWARD_INDEX, "dimY"), + new StarTreeIndexMapUtils.IndexValue(16, 3)); + indexMap.put(new StarTreeIndexMapUtils.IndexKey(StarTreeIndexMapUtils.IndexType.FORWARD_INDEX, "sum__dimX"), + new StarTreeIndexMapUtils.IndexValue(19, 3)); + indexMaps.add(indexMap); + File indexMapFile = new File(TEMP_DIR, StarTreeV2Constants.INDEX_MAP_FILE_NAME); + StarTreeIndexMapUtils.storeToFile(indexMaps, indexMapFile); + + File indexFile = new File(TEMP_DIR, StarTreeV2Constants.INDEX_FILE_NAME); + if (!indexFile.exists()) { + indexFile.createNewFile(); + } + byte[] data = new byte[32]; + for (int i = 0; i < data.length; i++) { + data[i] = (byte) i; + } + try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapFile(indexFile, false, 0, data.length, ByteOrder.LITTLE_ENDIAN, + "StarTree V2 data buffer from: " + indexFile)) { + dataBuffer.readFrom(0, data); + } + + try (StarTreeIndexReader reader = new StarTreeIndexReader(TEMP_DIR, _segmentMetadata, ReadMode.mmap)) { + // Check the bytes of the 1st ST index + assertTrue(reader.hasIndexFor("0", "0", ColumnIndexType.INVERTED_INDEX)); + PinotDataBuffer buf = reader.getBuffer("0", "0", ColumnIndexType.INVERTED_INDEX); + assertEquals(buf.size(), 1); + assertEquals(buf.getByte(0), 0); + + assertTrue(reader.hasIndexFor("0", "dim0", ColumnIndexType.FORWARD_INDEX)); + buf = reader.getBuffer("0", "dim0", ColumnIndexType.FORWARD_INDEX); + assertEquals(buf.size(), 1); + assertEquals(buf.getByte(0), 1); + + assertTrue(reader.hasIndexFor("0", "dim1", ColumnIndexType.FORWARD_INDEX)); + buf = reader.getBuffer("0", "dim1", ColumnIndexType.FORWARD_INDEX); + assertEquals(buf.size(), 1); + assertEquals(buf.getByte(0), 2); + + assertTrue(reader.hasIndexFor("0", "count__*", ColumnIndexType.FORWARD_INDEX)); + buf = reader.getBuffer("0", "count__*", ColumnIndexType.FORWARD_INDEX); + assertEquals(buf.size(), 1); + assertEquals(buf.getByte(0), 3); + + // Check the bytes of the 2nd ST index + assertTrue(reader.hasIndexFor("1", "1", ColumnIndexType.INVERTED_INDEX)); + buf = reader.getBuffer("1", "1", ColumnIndexType.INVERTED_INDEX); + assertEquals(buf.size(), 3); + assertEquals(buf.getByte(2), 12); + + assertTrue(reader.hasIndexFor("1", "dimX", ColumnIndexType.FORWARD_INDEX)); + buf = reader.getBuffer("1", "dimX", ColumnIndexType.FORWARD_INDEX); + assertEquals(buf.size(), 3); + assertEquals(buf.getByte(2), 15); + + assertTrue(reader.hasIndexFor("1", "dimY", ColumnIndexType.FORWARD_INDEX)); + buf = reader.getBuffer("1", "dimY", ColumnIndexType.FORWARD_INDEX); + assertEquals(buf.size(), 3); + assertEquals(buf.getByte(2), 18); + + assertTrue(reader.hasIndexFor("1", "sum__dimX", ColumnIndexType.FORWARD_INDEX)); + buf = reader.getBuffer("1", "sum__dimX", ColumnIndexType.FORWARD_INDEX); + assertEquals(buf.size(), 3); + assertEquals(buf.getByte(2), 21); + } + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java index eee9e90990cb..ca343dbfa225 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/ColumnIndexDirectory.java @@ -21,7 +21,6 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.util.Set; import org.apache.pinot.segment.spi.FetchContext; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; @@ -105,17 +104,4 @@ public void acquireBuffer(FetchContext fetchContext) { */ public void releaseBuffer(FetchContext fetchContext) { } - - public PinotDataBuffer getStarTreeIndex() - throws IOException { - throw new UnsupportedOperationException(); - } - - /** - * The caller should close the input stream. - */ - public InputStream getStarTreeIndexMap() - throws IOException { - throw new UnsupportedOperationException(); - } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java index 7ba897a01dd0..354d54de5450 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentDirectory.java @@ -21,7 +21,6 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.net.URI; import java.nio.file.Path; import java.util.Set; @@ -181,24 +180,28 @@ public abstract PinotDataBuffer getIndexFor(String column, ColumnIndexType type) public abstract boolean hasIndexFor(String column, ColumnIndexType type); - public SegmentDirectory toSegmentDirectory() { - return SegmentDirectory.this; - } - - public abstract String toString(); - - public PinotDataBuffer getStarTreeIndex() - throws IOException { - throw new UnsupportedOperationException(); + public boolean hasSegmentIndex(SegmentIndexType type) { + return false; } /** - * The caller should close the input stream. + * Segment index types like StarTree index is modelled like those single-column indices kept in columns.psf and + * index_map file, so access to their index data can be abstracted with SegmentDirectory.Reader interface too. + * + * @param indexName to look for a specific index instance to access, e.g. the StarTree index can contain multiple + * index instances, each one of them has a tree and a set of fwd indices referred to by the tree. + * @param type like today's StarTree index or other segment index types e.g. materialized views. + * @return SegmentDirectory.Reader object to access the index buffers inside the specified segment index. */ - public InputStream getStarTreeIndexMap() - throws IOException { - throw new UnsupportedOperationException(); + public Reader getSegmentIndexReaderFor(String indexName, SegmentIndexType type) { + return null; } + + public SegmentDirectory toSegmentDirectory() { + return SegmentDirectory.this; + } + + public abstract String toString(); } /** diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentIndexType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentIndexType.java new file mode 100644 index 000000000000..369b9d07c363 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/store/SegmentIndexType.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.store; + +/** + * Different from column index, segment level index involves multiple columns like StarTree index. + */ +public enum SegmentIndexType { + STARTREE_INDEX("startree_index"); + + private final String _indexName; + + SegmentIndexType(String name) { + _indexName = name; + } + + public String getIndexName() { + return _indexName; + } + + public static SegmentIndexType getValue(String val) { + for (SegmentIndexType type : values()) { + if (type.getIndexName().equalsIgnoreCase(val)) { + return type; + } + } + throw new IllegalArgumentException("Unknown value: " + val); + } +}