Skip to content

Commit

Permalink
extend SegmentDirectory.Reader ifact to abstract access to index buff…
Browse files Browse the repository at this point in the history
…ers from StarTree index
  • Loading branch information
klsince committed Jan 20, 2023
1 parent 4d7da0a commit 0fc2506
Show file tree
Hide file tree
Showing 12 changed files with 502 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.segment.spi.store.SegmentIndexType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand Down Expand Up @@ -200,7 +201,7 @@ public static ImmutableSegment load(SegmentDirectory segmentDirectory, IndexLoad

// Load star-tree index if it exists
StarTreeIndexContainer starTreeIndexContainer = null;
if (segmentMetadata.getStarTreeV2MetadataList() != null && segmentReader.getStarTreeIndex() != null) {
if (segmentReader.hasSegmentIndex(SegmentIndexType.STARTREE_INDEX)) {
starTreeIndexContainer = new StarTreeIndexContainer(segmentReader, segmentMetadata, indexContainerMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteOrder;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -33,7 +31,6 @@
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
Expand All @@ -45,9 +42,7 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
private SegmentMetadataImpl _segmentMetadata;
private final ReadMode _readMode;
private final Map<IndexKey, PinotDataBuffer> _indexBuffers = new HashMap<>();
// Different from the other column-index entries, starTree index is multi-column index and has its own index map,
// thus manage it separately.
private PinotDataBuffer _starTreeIndexDataBuffer;

/**
* @param segmentDirectory File pointing to segment directory
* @param segmentMetadata segment metadata. Metadata must be fully initialized
Expand Down Expand Up @@ -80,29 +75,6 @@ public PinotDataBuffer getBuffer(String column, ColumnIndexType type)
return getReadBufferFor(key);
}

@Override
public PinotDataBuffer getStarTreeIndex()
throws IOException {
if (_starTreeIndexDataBuffer != null) {
return _starTreeIndexDataBuffer;
}
File indexFile = new File(_segmentDirectory, StarTreeV2Constants.INDEX_FILE_NAME);
if (_readMode == ReadMode.heap) {
_starTreeIndexDataBuffer =
PinotDataBuffer.loadFile(indexFile, 0, indexFile.length(), ByteOrder.BIG_ENDIAN, "Star-tree V2 data buffer");
} else {
_starTreeIndexDataBuffer = PinotDataBuffer.mapFile(indexFile, true, 0, indexFile.length(), ByteOrder.BIG_ENDIAN,
"Star-tree V2 data buffer");
}
return _starTreeIndexDataBuffer;
}

@Override
public InputStream getStarTreeIndexMap()
throws IOException {
return new FileInputStream(new File(_segmentDirectory, StarTreeV2Constants.INDEX_MAP_FILE_NAME));
}

@Override
public PinotDataBuffer newBuffer(String column, ColumnIndexType type, long sizeBytes)
throws IOException {
Expand All @@ -122,9 +94,6 @@ public void close()
for (PinotDataBuffer dataBuffer : _indexBuffers.values()) {
dataBuffer.close();
}
if (_starTreeIndexDataBuffer != null) {
_starTreeIndexDataBuffer.close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Path;
import java.util.Collections;
Expand All @@ -38,6 +37,7 @@
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths;
import org.apache.pinot.segment.spi.store.SegmentIndexType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.ReadMode;
import org.slf4j.Logger;
Expand All @@ -64,6 +64,7 @@ public class SegmentLocalFSDirectory extends SegmentDirectory {
private final ReadMode _readMode;
private SegmentMetadataImpl _segmentMetadata;
private ColumnIndexDirectory _columnIndexDirectory;
private StarTreeIndexReader _starTreeIndexReader;
private String _tier;

// Create an empty SegmentLocalFSDirectory object mainly used to
Expand Down Expand Up @@ -248,25 +249,26 @@ protected void load()

private synchronized void loadData()
throws IOException {
if (_columnIndexDirectory != null) {
return;
if (_columnIndexDirectory == null) {
switch (_segmentMetadata.getVersion()) {
case v1:
case v2:
_columnIndexDirectory = new FilePerIndexDirectory(_segmentDirectory, _segmentMetadata, _readMode);
break;
case v3:
try {
_columnIndexDirectory = new SingleFileIndexDirectory(_segmentDirectory, _segmentMetadata, _readMode);
} catch (ConfigurationException e) {
LOGGER.error("Failed to create columnar index directory", e);
throw new RuntimeException(e);
}
break;
default:
break;
}
}

switch (_segmentMetadata.getVersion()) {
case v1:
case v2:
_columnIndexDirectory = new FilePerIndexDirectory(_segmentDirectory, _segmentMetadata, _readMode);
break;
case v3:
try {
_columnIndexDirectory = new SingleFileIndexDirectory(_segmentDirectory, _segmentMetadata, _readMode);
} catch (ConfigurationException e) {
LOGGER.error("Failed to create columnar index directory", e);
throw new RuntimeException(e);
}
break;
default:
break;
if (_starTreeIndexReader == null && _segmentMetadata.getStarTreeV2MetadataList() != null) {
_starTreeIndexReader = new StarTreeIndexReader(_segmentDirectory, _segmentMetadata, _readMode);
}
}

Expand All @@ -279,6 +281,10 @@ public void close()
_columnIndexDirectory.close();
_columnIndexDirectory = null;
}
if (_starTreeIndexReader != null) {
_starTreeIndexReader.close();
}
_starTreeIndexReader = null;
}
}

Expand Down Expand Up @@ -349,6 +355,42 @@ public boolean hasIndexFor(String column, ColumnIndexType type) {
return _columnIndexDirectory.hasIndexFor(column, type);
}

public boolean hasSegmentIndex(SegmentIndexType type) {
if (type != SegmentIndexType.STARTREE_INDEX) {
throw new IllegalArgumentException("Unknown SegmentIndexType: " + type);
}
return _starTreeIndexReader != null;
}

public SegmentDirectory.Reader getSegmentIndexReaderFor(String indexName, SegmentIndexType type) {
if (type != SegmentIndexType.STARTREE_INDEX) {
throw new IllegalArgumentException("Unknown SegmentIndexType: " + type);
}
return new SegmentDirectory.Reader() {
@Override
public PinotDataBuffer getIndexFor(String column, ColumnIndexType type)
throws IOException {
return _starTreeIndexReader.getBuffer(indexName, column, type);
}

@Override
public boolean hasIndexFor(String column, ColumnIndexType type) {
return _starTreeIndexReader.hasIndexFor(indexName, column, type);
}

@Override
public String toString() {
return _starTreeIndexReader.toString() + " for " + indexName;
}

@Override
public void close()
throws IOException {
// Noop as _starTreeIndexReader is owned by the top level Reader
}
};
}

@Override
public void close() {
// do nothing here
Expand All @@ -359,18 +401,6 @@ public void close() {
public String toString() {
return _segmentDirectory.toString();
}

@Override
public PinotDataBuffer getStarTreeIndex()
throws IOException {
return _columnIndexDirectory.getStarTreeIndex();
}

@Override
public InputStream getStarTreeIndexMap()
throws IOException {
return _columnIndexDirectory.getStarTreeIndexMap();
}
}

/*************************** SegmentDirectory Writer *********************/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import com.google.common.base.Preconditions;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.nio.ByteOrder;
Expand All @@ -43,7 +41,6 @@
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
Expand Down Expand Up @@ -87,9 +84,6 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory {
private final File _indexFile;
private final Map<IndexKey, IndexEntry> _columnEntries;
private final List<PinotDataBuffer> _allocBuffers;
// Different from the other column-index entries, starTree index is multi-column index and has its own index map,
// thus manage it separately.
private PinotDataBuffer _starTreeIndexDataBuffer;

// For V3 segment format, the index cleanup consists of two steps: mark and sweep.
// The removeIndex() method marks an index to be removed; and the index info is
Expand Down Expand Up @@ -213,21 +207,6 @@ private void load()
throws IOException, ConfigurationException {
loadMap();
mapBufferEntries();
if (_segmentMetadata.getStarTreeV2MetadataList() != null) {
loadStarTreeIndex();
}
}

private void loadStarTreeIndex()
throws IOException {
File indexFile = new File(_segmentDirectory, StarTreeV2Constants.INDEX_FILE_NAME);
if (_readMode == ReadMode.heap) {
_starTreeIndexDataBuffer =
PinotDataBuffer.loadFile(indexFile, 0, indexFile.length(), ByteOrder.BIG_ENDIAN, "Star-tree V2 data buffer");
} else {
_starTreeIndexDataBuffer = PinotDataBuffer.mapFile(indexFile, true, 0, indexFile.length(), ByteOrder.BIG_ENDIAN,
"Star-tree V2 data buffer");
}
}

private void loadMap()
Expand Down Expand Up @@ -367,9 +346,6 @@ public void close()
for (PinotDataBuffer buf : _allocBuffers) {
buf.close();
}
if (_starTreeIndexDataBuffer != null) {
_starTreeIndexDataBuffer.close();
}
// Cleanup removed indices after closing and flushing buffers, so
// that potential index updates can be persisted across cleanups.
if (_shouldCleanupRemovedIndices) {
Expand Down Expand Up @@ -413,21 +389,9 @@ public Set<String> getColumnsWithIndex(ColumnIndexType type) {
return columns;
}

@Override
public PinotDataBuffer getStarTreeIndex()
throws IOException {
return _starTreeIndexDataBuffer;
}

@Override
public InputStream getStarTreeIndexMap()
throws IOException {
return new FileInputStream(new File(_segmentDirectory, StarTreeV2Constants.INDEX_MAP_FILE_NAME));
}

@Override
public String toString() {
return _segmentDirectory.toString() + "/" + _indexFile.toString();
return _indexFile.toString();
}

/**
Expand Down
Loading

0 comments on commit 0fc2506

Please sign in to comment.