Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vvivekiyer committed Oct 14, 2022
1 parent a0e7a34 commit e763bb6
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,8 @@ public MultiValueVarByteRawIndexCreator(File baseIndexDir, ChunkCompressionType
int totalDocs, DataType valueType, int writerVersion, int maxRowLengthInBytes, int maxNumberOfElements)
throws IOException {
//we will prepend the actual content with numElements and length array containing length of each element
int maxLengthPrefixes = Integer.BYTES * maxNumberOfElements;
int totalMaxLength = Integer.BYTES + maxRowLengthInBytes + maxLengthPrefixes;
Preconditions.checkArgument((maxLengthPrefixes | maxRowLengthInBytes | totalMaxLength | maxNumberOfElements) > 0,
"integer overflow detected");
int totalMaxLength = getTotalRowStorageBytes(maxNumberOfElements, maxRowLengthInBytes);

File file = new File(baseIndexDir,
column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
int numDocsPerChunk = Math.max(
Expand Down Expand Up @@ -119,4 +117,55 @@ public void close()
throws IOException {
_indexWriter.close();
}

/**
* The actual content in an MV array is prepended with 2 prefixes:
* 1. elementLengthStoragePrefix - bytes required to store the length of each array element
* 2. numberOfElementsStoragePrefix - Number of elements in the array
*
* This function returns the total bytes needed to store (1) elementLengthStoragePrefix
*/
public static int getElementLengthStoragePrefix(int maxNumberOfElements) {
return Integer.BYTES * maxNumberOfElements;
}

/**
* The actual content in an MV array is prepended with 2 prefixes:
* 1. elementLengthStoragePrefix - bytes required to store the length of each array element
* 2. numberOfElementsStoragePrefix - Number of elements in the array
*
* This function returns the bytes needed to store (2) numberOfElementsStoragePrefix
*/
public static int getNumberOfElementsStoragePrefix() {
return Integer.BYTES;
}

/**
* The actual content in an MV array is prepended with 2 prefixes:
* 1. elementLengthStoragePrefix - bytes required to store the length of each array element
* 2. numberOfElementsStoragePrefix - Number of elements in the array
*
* This function returns the bytes needed to store the (1), (2) and the actual content.
*/
public static int getTotalRowStorageBytes(int maxNumberOfElements, int maxRowDataLengthInBytes) {
int elementLengthStoragePrefix = getElementLengthStoragePrefix(maxNumberOfElements);
int numberOfElementsStoragePrefix = getNumberOfElementsStoragePrefix();
int totalMaxLength = elementLengthStoragePrefix + numberOfElementsStoragePrefix + maxRowDataLengthInBytes;
Preconditions.checkArgument(
(elementLengthStoragePrefix | maxRowDataLengthInBytes | totalMaxLength | maxNumberOfElements) > 0,
"integer overflow detected");

return totalMaxLength;
}

/**
* The actual content in an MV array is prepended with 2 prefixes:
* 1. elementLengthStoragePrefix - bytes required to store the length of each array element
* 2. numberOfElementsStoragePrefix - Number of elements in the array
*
* This function returns the bytes needed to store the actual content.
*/
public static int getMaxRowDataLengthInBytes(int totalMaxLength, int maxNumberOfElements) {
return totalMaxLength - getNumberOfElementsStoragePrefix() - getElementLengthStoragePrefix(maxNumberOfElements);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.segment.creator.impl.fwd.MultiValueVarByteRawIndexCreator;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
Expand All @@ -37,6 +37,7 @@
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.slf4j.Logger;
Expand All @@ -53,6 +54,7 @@
* TODO: Add support for the following:
* 1. Enable dictionary
* 2. Disable dictionary
* 3. Segment versions < V3
*/
public class ForwardIndexHandler implements IndexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ForwardIndexHandler.class);
Expand Down Expand Up @@ -106,7 +108,6 @@ Map<String, Operation> computeOperation(SegmentDirectory.Reader segmentReader)
Map<String, Operation> columnOperationMap = new HashMap<>();

// Does not work for segment versions < V3.
// TODO: Remove this limitation.
if (_segmentMetadata.getVersion().compareTo(SegmentVersion.v3) < 0) {
return columnOperationMap;
}
Expand Down Expand Up @@ -198,19 +199,28 @@ private void rewriteRawMVForwardIndex(String column, SegmentDirectory.Writer seg
}

LOGGER.info("Creating new forward index for segment={} and column={}", segmentName, column);
Map<String, ChunkCompressionType> compressionConfigs = _indexLoadingConfig.getCompressionConfigs();
Preconditions.checkState(compressionConfigs.containsKey(column));
Map<String, ChunkCompressionType> compressionConfigs = _indexLoadingConfig.getCompressionConfigs();
// At this point, compressionConfigs is guaranteed to contain the column.
Preconditions.checkState(compressionConfigs.containsKey(column));
ChunkCompressionType newCompressionType = compressionConfigs.get(column);

int numDocs = existingColMetadata.getTotalDocs();

try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
// For VarByte MV columns like String and Bytes, the storage representation of each row contains the following
// components:
// 1. bytes required to store the actual elements of the MV row (A)
// 2. bytes required to store the number of elements in the MV row (B)
// 3. bytes required to store the length of each MV element (C)
//
// lengthOfLongestEntry = A + B + C
// maxRowLengthInBytes = A
int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
Preconditions.checkState(lengthOfLongestEntry >= 0,
"lengthOfLongestEntry cannot be negative. segment=" + segmentName + " column={}" + column);
"lengthOfLongestEntry cannot be negative. segment=" + segmentName + " column=" + column);
int maxNumberOfMVEntries = existingColMetadata.getMaxNumberOfMultiValues();
int maxRowLengthInBytes = lengthOfLongestEntry - (Integer.BYTES * maxNumberOfMVEntries) - Integer.BYTES;
int maxRowLengthInBytes =
MultiValueVarByteRawIndexCreator.getMaxRowDataLengthInBytes(lengthOfLongestEntry, maxNumberOfMVEntries);

IndexCreationContext.Forward context =
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
Expand All @@ -226,58 +236,73 @@ private void rewriteRawMVForwardIndex(String column, SegmentDirectory.Writer seg
throw new UnsupportedOperationException(failureMsg);
}

PinotSegmentColumnReader columnReader = new PinotSegmentColumnReader(reader, null, null, maxNumberOfMVEntries);
ForwardIndexReaderContext readerContext = reader.createContext();

for (int i = 0; i < numDocs; i++) {
Object[] values = (Object[]) columnReader.getValue(i);
int length = values.length;

// JSON fields are either stored as string or bytes. No special handling is needed.
switch (reader.getStoredType()) {
case INT: {
int[] buffer = new int[maxNumberOfMVEntries];
int length = reader.getIntMV(i, buffer, readerContext);

int[] ints = new int[length];
for (int j = 0; j < length; j++) {
ints[j] = (Integer) values[j];
ints[j] = buffer[j];
}
creator.putIntMV(ints);
break;
}
case LONG: {
long[] buffer = new long[maxNumberOfMVEntries];
int length = reader.getLongMV(i, buffer, readerContext);

long[] longs = new long[length];
for (int j = 0; j < length; j++) {
longs[j] = (Long) values[j];
longs[j] = buffer[j];
}
creator.putLongMV(longs);
break;
}
case FLOAT: {
float[] buffer = new float[maxNumberOfMVEntries];
int length = reader.getFloatMV(i, buffer, readerContext);

float[] floats = new float[length];
for (int j = 0; j < length; j++) {
floats[j] = (Float) values[j];
floats[j] = buffer[j];
}
creator.putFloatMV(floats);
break;
}
case DOUBLE: {
double[] buffer = new double[maxNumberOfMVEntries];
int length = reader.getDoubleMV(i, buffer, readerContext);

double[] doubles = new double[length];
for (int j = 0; j < length; j++) {
doubles[j] = (Double) values[j];
doubles[j] = buffer[j];
}
creator.putDoubleMV(doubles);
break;
}
case STRING: {
String[] buffer = new String[maxNumberOfMVEntries];
int length = reader.getStringMV(i, buffer, readerContext);

String[] strings = new String[length];
for (int j = 0; j < length; j++) {
strings[j] = (String) values[j];
strings[j] = buffer[j];
}
creator.putStringMV(strings);
break;
}
case BYTES: {
byte[][] buffer = new byte[maxNumberOfMVEntries][];
int length = reader.getBytesMV(i, buffer, readerContext);

byte[][] bytesArray = new byte[length][];
for (int j = 0; j < length; j++) {
bytesArray[j] = (byte[]) values[j];
bytesArray[j] = buffer[j];
}
creator.putBytesMV(bytesArray);
break;
Expand Down Expand Up @@ -335,7 +360,7 @@ private void rewriteRawSVForwardIndex(String column, SegmentDirectory.Writer seg
try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
Preconditions.checkState(lengthOfLongestEntry >= 0,
"lengthOfLongestEntry cannot be negative. segment=" + segmentName + " column={}" + column);
"lengthOfLongestEntry cannot be negative. segment=" + segmentName + " column=" + column);

IndexCreationContext.Forward context =
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
Expand All @@ -351,36 +376,47 @@ private void rewriteRawSVForwardIndex(String column, SegmentDirectory.Writer seg
throw new UnsupportedOperationException(failureMsg);
}

PinotSegmentColumnReader columnReader =
new PinotSegmentColumnReader(reader, null, null, existingColMetadata.getMaxNumberOfMultiValues());
ForwardIndexReaderContext readerContext = reader.createContext();

for (int i = 0; i < numDocs; i++) {
Object val = columnReader.getValue(i);

// JSON fields are either stored as string or bytes. No special handling is needed because we make this
// decision based on the storedType of the reader.
switch (reader.getStoredType()) {
case INT:
creator.putInt((int) val);
case INT: {
int val = reader.getInt(i, readerContext);
creator.putInt(val);
break;
case LONG:
creator.putLong((long) val);
}
case LONG: {
long val = reader.getLong(i, readerContext);
creator.putLong(val);
break;
case FLOAT:
creator.putFloat((float) val);
}
case FLOAT: {
float val = reader.getFloat(i, readerContext);
creator.putFloat(val);
break;
case DOUBLE:
creator.putDouble((double) val);
}
case DOUBLE: {
double val = reader.getDouble(i, readerContext);
creator.putDouble(val);
break;
case STRING:
creator.putString((String) val);
}
case STRING: {
String val = reader.getString(i, readerContext);
creator.putString(val);
break;
case BYTES:
creator.putBytes((byte[]) val);
}
case BYTES: {
byte[] val = reader.getBytes(i, readerContext);
creator.putBytes(val);
break;
case BIG_DECIMAL:
creator.putBigDecimal((BigDecimal) val);
}
case BIG_DECIMAL: {
BigDecimal val = reader.getBigDecimal(i, readerContext);
creator.putBigDecimal(val);
break;
}
default:
throw new IllegalStateException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public PinotSegmentColumnReader(IndexSegment indexSegment, String column) {
_maxNumValuesPerMVEntry = -1;
} else {
_maxNumValuesPerMVEntry = dataSource.getDataSourceMetadata().getMaxNumValuesPerMVEntry();
Preconditions.checkState(_maxNumValuesPerMVEntry >= 0, "maxNumValuesPerMVEntry is negative for an MV column.");
_dictIdBuffer = new int[_maxNumValuesPerMVEntry];
}
}
Expand Down Expand Up @@ -119,8 +120,6 @@ public Object getValue(int docId) {
throw new IllegalStateException();
}
} else {
Preconditions.checkState(_maxNumValuesPerMVEntry >= 0, "maxNumValuesPerMVEntry is negative for an MV column.");

switch (_forwardIndexReader.getStoredType()) {
case INT: {
int[] buffer = new int[_maxNumValuesPerMVEntry];
Expand Down
Loading

0 comments on commit e763bb6

Please sign in to comment.