Skip to content

Commit

Permalink
simplify per comments
Browse files Browse the repository at this point in the history
  • Loading branch information
klsince committed Jan 24, 2023
1 parent 987b820 commit 7d099d0
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.segment.spi.store.SegmentIndexType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand Down Expand Up @@ -201,7 +200,7 @@ public static ImmutableSegment load(SegmentDirectory segmentDirectory, IndexLoad

// Load star-tree index if it exists
StarTreeIndexContainer starTreeIndexContainer = null;
if (segmentReader.hasSegmentIndex(SegmentIndexType.STAR_TREE_INDEX)) {
if (segmentReader.hasStarTreeIndex()) {
starTreeIndexContainer = new StarTreeIndexContainer(segmentReader, segmentMetadata, indexContainerMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.segment.spi.store.SegmentIndexType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.ReadMode;
import org.slf4j.Logger;
Expand Down Expand Up @@ -250,25 +249,26 @@ protected void load()

private synchronized void loadData()
throws IOException {
if (_columnIndexDirectory == null) {
switch (_segmentMetadata.getVersion()) {
case v1:
case v2:
_columnIndexDirectory = new FilePerIndexDirectory(_segmentDirectory, _segmentMetadata, _readMode);
break;
case v3:
try {
_columnIndexDirectory = new SingleFileIndexDirectory(_segmentDirectory, _segmentMetadata, _readMode);
} catch (ConfigurationException e) {
LOGGER.error("Failed to create columnar index directory", e);
throw new RuntimeException(e);
}
break;
default:
break;
}
if (_columnIndexDirectory != null) {
return;
}
if (_starTreeIndexReader == null && CollectionUtils.isNotEmpty(_segmentMetadata.getStarTreeV2MetadataList())) {
switch (_segmentMetadata.getVersion()) {
case v1:
case v2:
_columnIndexDirectory = new FilePerIndexDirectory(_segmentDirectory, _segmentMetadata, _readMode);
break;
case v3:
try {
_columnIndexDirectory = new SingleFileIndexDirectory(_segmentDirectory, _segmentMetadata, _readMode);
} catch (ConfigurationException e) {
LOGGER.error("Failed to create columnar index directory", e);
throw new RuntimeException(e);
}
break;
default:
break;
}
if (CollectionUtils.isNotEmpty(_segmentMetadata.getStarTreeV2MetadataList())) {
_starTreeIndexReader = new StarTreeIndexReader(_segmentDirectory, _segmentMetadata, _readMode);
}
}
Expand Down Expand Up @@ -356,32 +356,28 @@ public boolean hasIndexFor(String column, ColumnIndexType type) {
return _columnIndexDirectory.hasIndexFor(column, type);
}

public boolean hasSegmentIndex(SegmentIndexType type) {
if (type != SegmentIndexType.STAR_TREE_INDEX) {
throw new IllegalArgumentException("Unknown SegmentIndexType: " + type);
}
@Override
public boolean hasStarTreeIndex() {
return _starTreeIndexReader != null;
}

public SegmentDirectory.Reader getSegmentIndexReaderFor(String indexName, SegmentIndexType type) {
if (type != SegmentIndexType.STAR_TREE_INDEX) {
throw new IllegalArgumentException("Unknown SegmentIndexType: " + type);
}
@Override
public SegmentDirectory.Reader getStarTreeIndexReader(int starTreeId) {
return new SegmentDirectory.Reader() {
@Override
public PinotDataBuffer getIndexFor(String column, ColumnIndexType type)
throws IOException {
return _starTreeIndexReader.getBuffer(indexName, column, type);
return _starTreeIndexReader.getBuffer(starTreeId, column, type);
}

@Override
public boolean hasIndexFor(String column, ColumnIndexType type) {
return _starTreeIndexReader.hasIndexFor(indexName, column, type);
return _starTreeIndexReader.hasIndexFor(starTreeId, column, type);
}

@Override
public String toString() {
return _starTreeIndexReader.toString() + " for " + indexName;
return _starTreeIndexReader.toString() + " for " + starTreeId;
}

@Override
Expand Down Expand Up @@ -446,6 +442,10 @@ public void close()
_columnIndexDirectory.close();
_columnIndexDirectory = null;
}
if (_starTreeIndexReader != null) {
_starTreeIndexReader.close();
_starTreeIndexReader = null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class StarTreeIndexReader implements Closeable {
private final int _numStarTrees;

// StarTree index can contain multiple index instances, identified by ids like 0, 1, etc.
private final Map<String, Map<IndexKey, StarTreeIndexEntry>> _indexColumnEntries;
private final Map<Integer, Map<IndexKey, StarTreeIndexEntry>> _indexColumnEntries;
private PinotDataBuffer _dataBuffer;

/**
Expand Down Expand Up @@ -102,12 +102,11 @@ private void load()

private void mapBufferEntries(int starTreeId,
Map<StarTreeIndexMapUtils.IndexKey, StarTreeIndexMapUtils.IndexValue> indexMap) {
String idxName = String.valueOf(starTreeId);
Map<IndexKey, StarTreeIndexEntry> columnEntries =
_indexColumnEntries.computeIfAbsent(idxName, k -> new HashMap<>());
_indexColumnEntries.computeIfAbsent(starTreeId, k -> new HashMap<>());
// Load star-tree index. The index tree doesn't have corresponding column name or column index type to create an
// IndexKey. As it's a kind of inverted index, we uniquely identify it with index id and inverted index type.
columnEntries.computeIfAbsent(new IndexKey(idxName, ColumnIndexType.INVERTED_INDEX),
columnEntries.computeIfAbsent(new IndexKey(String.valueOf(starTreeId), ColumnIndexType.INVERTED_INDEX),
k -> new StarTreeIndexEntry(indexMap.get(StarTreeIndexMapUtils.STAR_TREE_INDEX_KEY), _dataBuffer,
ByteOrder.LITTLE_ENDIAN));
List<StarTreeV2Metadata> starTreeMetadataList = _segmentMetadata.getStarTreeV2MetadataList();
Expand All @@ -129,24 +128,24 @@ private void mapBufferEntries(int starTreeId,
}
}

public PinotDataBuffer getBuffer(String indexName, String column, ColumnIndexType type)
public PinotDataBuffer getBuffer(int starTreeId, String column, ColumnIndexType type)
throws IOException {
Map<IndexKey, StarTreeIndexEntry> columnEntries = _indexColumnEntries.get(indexName);
Map<IndexKey, StarTreeIndexEntry> columnEntries = _indexColumnEntries.get(starTreeId);
if (columnEntries == null) {
throw new RuntimeException(
String.format("Could not find StarTree index: %s in segment: %s", indexName, _segmentDirectory.toString()));
String.format("Could not find StarTree index: %s in segment: %s", starTreeId, _segmentDirectory.toString()));
}
StarTreeIndexEntry entry = columnEntries.get(new IndexKey(column, type));
if (entry != null && entry._buffer != null) {
return entry._buffer;
}
throw new RuntimeException(
String.format("Could not find index for column: %s, type: %s in StarTree index: %s in segment: %s", column,
type, indexName, _segmentDirectory.toString()));
type, starTreeId, _segmentDirectory.toString()));
}

public boolean hasIndexFor(String indexName, String column, ColumnIndexType type) {
Map<IndexKey, StarTreeIndexEntry> columnEntries = _indexColumnEntries.get(indexName);
public boolean hasIndexFor(int starTreeId, String column, ColumnIndexType type) {
Map<IndexKey, StarTreeIndexEntry> columnEntries = _indexColumnEntries.get(starTreeId);
if (columnEntries == null) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentIndexType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.MetricFieldSpec;
Expand All @@ -60,11 +59,10 @@ public static List<StarTreeV2> loadStarTreeV2(SegmentDirectory.Reader segmentRea
int numStarTrees = starTreeMetadataList.size();
List<StarTreeV2> starTrees = new ArrayList<>(numStarTrees);
for (int i = 0; i < numStarTrees; i++) {
String indexName = String.valueOf(i);
SegmentDirectory.Reader indexReader =
segmentReader.getSegmentIndexReaderFor(indexName, SegmentIndexType.STAR_TREE_INDEX);
SegmentDirectory.Reader indexReader = segmentReader.getStarTreeIndexReader(i);
// Load star-tree index
StarTree starTree = new OffHeapStarTree(indexReader.getIndexFor(indexName, ColumnIndexType.INVERTED_INDEX));
StarTree starTree =
new OffHeapStarTree(indexReader.getIndexFor(String.valueOf(i), ColumnIndexType.INVERTED_INDEX));

StarTreeV2Metadata starTreeMetadata = starTreeMetadataList.get(i);
int numDocs = starTreeMetadata.getNumDocs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,44 +127,44 @@ public void testLoadStarTreeIndexBuffers()

try (StarTreeIndexReader reader = new StarTreeIndexReader(TEMP_DIR, _segmentMetadata, ReadMode.mmap)) {
// Check the bytes of the 1st ST index
assertTrue(reader.hasIndexFor("0", "0", ColumnIndexType.INVERTED_INDEX));
PinotDataBuffer buf = reader.getBuffer("0", "0", ColumnIndexType.INVERTED_INDEX);
assertTrue(reader.hasIndexFor(0, "0", ColumnIndexType.INVERTED_INDEX));
PinotDataBuffer buf = reader.getBuffer(0, "0", ColumnIndexType.INVERTED_INDEX);
assertEquals(buf.size(), 1);
assertEquals(buf.getByte(0), 0);

assertTrue(reader.hasIndexFor("0", "dim0", ColumnIndexType.FORWARD_INDEX));
buf = reader.getBuffer("0", "dim0", ColumnIndexType.FORWARD_INDEX);
assertTrue(reader.hasIndexFor(0, "dim0", ColumnIndexType.FORWARD_INDEX));
buf = reader.getBuffer(0, "dim0", ColumnIndexType.FORWARD_INDEX);
assertEquals(buf.size(), 1);
assertEquals(buf.getByte(0), 1);

assertTrue(reader.hasIndexFor("0", "dim1", ColumnIndexType.FORWARD_INDEX));
buf = reader.getBuffer("0", "dim1", ColumnIndexType.FORWARD_INDEX);
assertTrue(reader.hasIndexFor(0, "dim1", ColumnIndexType.FORWARD_INDEX));
buf = reader.getBuffer(0, "dim1", ColumnIndexType.FORWARD_INDEX);
assertEquals(buf.size(), 1);
assertEquals(buf.getByte(0), 2);

assertTrue(reader.hasIndexFor("0", "count__*", ColumnIndexType.FORWARD_INDEX));
buf = reader.getBuffer("0", "count__*", ColumnIndexType.FORWARD_INDEX);
assertTrue(reader.hasIndexFor(0, "count__*", ColumnIndexType.FORWARD_INDEX));
buf = reader.getBuffer(0, "count__*", ColumnIndexType.FORWARD_INDEX);
assertEquals(buf.size(), 1);
assertEquals(buf.getByte(0), 3);

// Check the bytes of the 2nd ST index
assertTrue(reader.hasIndexFor("1", "1", ColumnIndexType.INVERTED_INDEX));
buf = reader.getBuffer("1", "1", ColumnIndexType.INVERTED_INDEX);
assertTrue(reader.hasIndexFor(1, "1", ColumnIndexType.INVERTED_INDEX));
buf = reader.getBuffer(1, "1", ColumnIndexType.INVERTED_INDEX);
assertEquals(buf.size(), 3);
assertEquals(buf.getByte(2), 12);

assertTrue(reader.hasIndexFor("1", "dimX", ColumnIndexType.FORWARD_INDEX));
buf = reader.getBuffer("1", "dimX", ColumnIndexType.FORWARD_INDEX);
assertTrue(reader.hasIndexFor(1, "dimX", ColumnIndexType.FORWARD_INDEX));
buf = reader.getBuffer(1, "dimX", ColumnIndexType.FORWARD_INDEX);
assertEquals(buf.size(), 3);
assertEquals(buf.getByte(2), 15);

assertTrue(reader.hasIndexFor("1", "dimY", ColumnIndexType.FORWARD_INDEX));
buf = reader.getBuffer("1", "dimY", ColumnIndexType.FORWARD_INDEX);
assertTrue(reader.hasIndexFor(1, "dimY", ColumnIndexType.FORWARD_INDEX));
buf = reader.getBuffer(1, "dimY", ColumnIndexType.FORWARD_INDEX);
assertEquals(buf.size(), 3);
assertEquals(buf.getByte(2), 18);

assertTrue(reader.hasIndexFor("1", "sum__dimX", ColumnIndexType.FORWARD_INDEX));
buf = reader.getBuffer("1", "sum__dimX", ColumnIndexType.FORWARD_INDEX);
assertTrue(reader.hasIndexFor(1, "sum__dimX", ColumnIndexType.FORWARD_INDEX));
buf = reader.getBuffer(1, "sum__dimX", ColumnIndexType.FORWARD_INDEX);
assertEquals(buf.size(), 3);
assertEquals(buf.getByte(2), 21);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,20 +180,21 @@ public abstract PinotDataBuffer getIndexFor(String column, ColumnIndexType type)

public abstract boolean hasIndexFor(String column, ColumnIndexType type);

public boolean hasSegmentIndex(SegmentIndexType type) {
public boolean hasStarTreeIndex() {
return false;
}

/**
* Segment index types like StarTree index is modelled like those single-column indices kept in columns.psf and
* index_map file, so access to their index data can be abstracted with SegmentDirectory.Reader interface too.
* The StarTree index is a multi-column index but modelled like those single-column indices kept in columns.psf and
* index_map files, so access to its index data can be abstracted with SegmentDirectory.Reader interface too. In the
* future, if new kinds of multi-column index e.g. materialized views is added, this interface can be generalized to
* access them too.
*
* @param indexName to look for a specific index instance to access, e.g. the StarTree index can contain multiple
* index instances, each one of them has a tree and a set of fwd indices referred to by the tree.
* @param type like today's StarTree index or other segment index types e.g. materialized views.
* @return SegmentDirectory.Reader object to access the index buffers inside the specified segment index.
* @param starTreeId to look for a specific index instance. The StarTree index can contain multiple index instances,
* each one of them has a tree and a set of fwd indices referred to by the tree.
* @return SegmentDirectory.Reader object to access the index buffers inside the specific index instance.
*/
public Reader getSegmentIndexReaderFor(String indexName, SegmentIndexType type) {
public Reader getStarTreeIndexReader(int starTreeId) {
return null;
}

Expand Down

This file was deleted.

0 comments on commit 7d099d0

Please sign in to comment.