Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extend SegmentDirectory.Reader iface to abstract access to index buffers from StarTree index #10158

Merged
merged 4 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public static ImmutableSegment load(SegmentDirectory segmentDirectory, IndexLoad

// Load star-tree index if it exists
StarTreeIndexContainer starTreeIndexContainer = null;
if (segmentMetadata.getStarTreeV2MetadataList() != null && segmentReader.getStarTreeIndex() != null) {
if (segmentReader.hasStarTreeIndex()) {
starTreeIndexContainer = new StarTreeIndexContainer(segmentReader, segmentMetadata, indexContainerMap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteOrder;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -33,7 +31,6 @@
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
Expand All @@ -45,9 +42,7 @@ class FilePerIndexDirectory extends ColumnIndexDirectory {
private SegmentMetadataImpl _segmentMetadata;
private final ReadMode _readMode;
private final Map<IndexKey, PinotDataBuffer> _indexBuffers = new HashMap<>();
// Different from the other column-index entries, starTree index is multi-column index and has its own index map,
// thus manage it separately.
private PinotDataBuffer _starTreeIndexDataBuffer;

/**
* @param segmentDirectory File pointing to segment directory
* @param segmentMetadata segment metadata. Metadata must be fully initialized
Expand Down Expand Up @@ -80,29 +75,6 @@ public PinotDataBuffer getBuffer(String column, ColumnIndexType type)
return getReadBufferFor(key);
}

@Override
public PinotDataBuffer getStarTreeIndex()
throws IOException {
if (_starTreeIndexDataBuffer != null) {
return _starTreeIndexDataBuffer;
}
File indexFile = new File(_segmentDirectory, StarTreeV2Constants.INDEX_FILE_NAME);
if (_readMode == ReadMode.heap) {
_starTreeIndexDataBuffer =
PinotDataBuffer.loadFile(indexFile, 0, indexFile.length(), ByteOrder.BIG_ENDIAN, "Star-tree V2 data buffer");
} else {
_starTreeIndexDataBuffer = PinotDataBuffer.mapFile(indexFile, true, 0, indexFile.length(), ByteOrder.BIG_ENDIAN,
"Star-tree V2 data buffer");
}
return _starTreeIndexDataBuffer;
}

@Override
public InputStream getStarTreeIndexMap()
throws IOException {
return new FileInputStream(new File(_segmentDirectory, StarTreeV2Constants.INDEX_MAP_FILE_NAME));
}

@Override
public PinotDataBuffer newBuffer(String column, ColumnIndexType type, long sizeBytes)
throws IOException {
Expand All @@ -122,9 +94,6 @@ public void close()
for (PinotDataBuffer dataBuffer : _indexBuffers.values()) {
dataBuffer.close();
}
if (_starTreeIndexDataBuffer != null) {
_starTreeIndexDataBuffer.close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
Expand Down Expand Up @@ -64,6 +64,7 @@ public class SegmentLocalFSDirectory extends SegmentDirectory {
private final ReadMode _readMode;
private SegmentMetadataImpl _segmentMetadata;
private ColumnIndexDirectory _columnIndexDirectory;
private StarTreeIndexReader _starTreeIndexReader;
private String _tier;

// Create an empty SegmentLocalFSDirectory object mainly used to
Expand Down Expand Up @@ -251,7 +252,6 @@ private synchronized void loadData()
if (_columnIndexDirectory != null) {
return;
}

switch (_segmentMetadata.getVersion()) {
case v1:
case v2:
Expand All @@ -268,6 +268,9 @@ private synchronized void loadData()
default:
break;
}
if (CollectionUtils.isNotEmpty(_segmentMetadata.getStarTreeV2MetadataList())) {
_starTreeIndexReader = new StarTreeIndexReader(_segmentDirectory, _segmentMetadata, _readMode);
}
}

@Override
Expand All @@ -279,6 +282,10 @@ public void close()
_columnIndexDirectory.close();
_columnIndexDirectory = null;
}
if (_starTreeIndexReader != null) {
_starTreeIndexReader.close();
_starTreeIndexReader = null;
}
}
}

Expand Down Expand Up @@ -350,26 +357,46 @@ public boolean hasIndexFor(String column, ColumnIndexType type) {
}

@Override
public void close() {
// do nothing here
_segmentLock.unlock();
public boolean hasStarTreeIndex() {
return _starTreeIndexReader != null;
}

@Override
public String toString() {
return _segmentDirectory.toString();
public SegmentDirectory.Reader getStarTreeIndexReader(int starTreeId) {
return new SegmentDirectory.Reader() {
@Override
public PinotDataBuffer getIndexFor(String column, ColumnIndexType type)
throws IOException {
return _starTreeIndexReader.getBuffer(starTreeId, column, type);
}

@Override
public boolean hasIndexFor(String column, ColumnIndexType type) {
return _starTreeIndexReader.hasIndexFor(starTreeId, column, type);
}

@Override
public String toString() {
return _starTreeIndexReader.toString() + " for " + starTreeId;
}

@Override
public void close()
throws IOException {
// Noop as _starTreeIndexReader is owned by the top level Reader
}
};
}

@Override
public PinotDataBuffer getStarTreeIndex()
throws IOException {
return _columnIndexDirectory.getStarTreeIndex();
public void close() {
// do nothing here
_segmentLock.unlock();
}

@Override
public InputStream getStarTreeIndexMap()
throws IOException {
return _columnIndexDirectory.getStarTreeIndexMap();
public String toString() {
return _segmentDirectory.toString();
}
}

Expand Down Expand Up @@ -413,8 +440,12 @@ public void close()
_segmentLock.unlock();
if (_columnIndexDirectory != null) {
_columnIndexDirectory.close();
_columnIndexDirectory = null;
}
if (_starTreeIndexReader != null) {
_starTreeIndexReader.close();
_starTreeIndexReader = null;
}
_columnIndexDirectory = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import com.google.common.base.Preconditions;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.nio.ByteOrder;
Expand All @@ -43,7 +41,6 @@
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2Constants;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
Expand Down Expand Up @@ -87,9 +84,6 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory {
private final File _indexFile;
private final Map<IndexKey, IndexEntry> _columnEntries;
private final List<PinotDataBuffer> _allocBuffers;
// Different from the other column-index entries, starTree index is multi-column index and has its own index map,
// thus manage it separately.
private PinotDataBuffer _starTreeIndexDataBuffer;

// For V3 segment format, the index cleanup consists of two steps: mark and sweep.
// The removeIndex() method marks an index to be removed; and the index info is
Expand Down Expand Up @@ -213,21 +207,6 @@ private void load()
throws IOException, ConfigurationException {
loadMap();
mapBufferEntries();
if (_segmentMetadata.getStarTreeV2MetadataList() != null) {
loadStarTreeIndex();
}
}

private void loadStarTreeIndex()
throws IOException {
File indexFile = new File(_segmentDirectory, StarTreeV2Constants.INDEX_FILE_NAME);
if (_readMode == ReadMode.heap) {
_starTreeIndexDataBuffer =
PinotDataBuffer.loadFile(indexFile, 0, indexFile.length(), ByteOrder.BIG_ENDIAN, "Star-tree V2 data buffer");
} else {
_starTreeIndexDataBuffer = PinotDataBuffer.mapFile(indexFile, true, 0, indexFile.length(), ByteOrder.BIG_ENDIAN,
"Star-tree V2 data buffer");
}
}

private void loadMap()
Expand Down Expand Up @@ -367,9 +346,6 @@ public void close()
for (PinotDataBuffer buf : _allocBuffers) {
buf.close();
}
if (_starTreeIndexDataBuffer != null) {
_starTreeIndexDataBuffer.close();
}
// Cleanup removed indices after closing and flushing buffers, so
// that potential index updates can be persisted across cleanups.
if (_shouldCleanupRemovedIndices) {
Expand Down Expand Up @@ -413,21 +389,9 @@ public Set<String> getColumnsWithIndex(ColumnIndexType type) {
return columns;
}

@Override
public PinotDataBuffer getStarTreeIndex()
throws IOException {
return _starTreeIndexDataBuffer;
}

@Override
public InputStream getStarTreeIndexMap()
throws IOException {
return new FileInputStream(new File(_segmentDirectory, StarTreeV2Constants.INDEX_MAP_FILE_NAME));
}

@Override
public String toString() {
return _segmentDirectory.toString() + "/" + _indexFile.toString();
return _indexFile.toString();
}

/**
Expand Down
Loading