Skip to content

Commit

Permalink
ForwardIndexHandler: Change compressionType during segmentReload (apa…
Browse files Browse the repository at this point in the history
…che#9454)

* FrwardIndexHandler: Allow changing compressionType for SV columns on reload

* Address review comments.

* Add more tests and address review comments
  • Loading branch information
vvivekiyer authored and Yao Liu committed Oct 3, 2022
1 parent ce078f1 commit 0926bfe
Show file tree
Hide file tree
Showing 10 changed files with 969 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.segment.index.loader;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.creator.IndexCreationContext;
import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Helper class used by {@link SegmentPreProcessor} to make changes to forward index and dictionary configs. Note
* that this handler only works for segment versions >= 3.0. Support for segment version < 3.0 is not added because
* majority of the usecases are in versions >= 3.0 and this avoids adding tech debt. The currently supported
* operations are:
* 1. Change compression on raw SV columns.
*
* TODO: Add support for the following:
* 1. Change compression for raw MV columns
* 2. Enable dictionary
* 3. Disable dictionary
*/
public class ForwardIndexHandler implements IndexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ForwardIndexHandler.class);

private final SegmentMetadata _segmentMetadata;
IndexLoadingConfig _indexLoadingConfig;

protected enum Operation {
// TODO: Add other operations like ENABLE_DICTIONARY, DISABLE_DICTIONARY.
CHANGE_RAW_INDEX_COMPRESSION_TYPE,
}

public ForwardIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
_segmentMetadata = segmentMetadata;
_indexLoadingConfig = indexLoadingConfig;
}

@Override
public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader)
throws Exception {
Map<String, Operation> columnOperationMap = computeOperation(segmentReader);
return !columnOperationMap.isEmpty();
}

@Override
public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider indexCreatorProvider)
throws Exception {
Map<String, Operation> columnOperationMap = computeOperation(segmentWriter);
if (columnOperationMap.isEmpty()) {
return;
}

for (Map.Entry<String, Operation> entry : columnOperationMap.entrySet()) {
String column = entry.getKey();
Operation operation = entry.getValue();

switch (operation) {
case CHANGE_RAW_INDEX_COMPRESSION_TYPE:
rewriteRawForwardIndex(column, segmentWriter, indexCreatorProvider);
break;
// TODO: Add other operations here.
default:
throw new IllegalStateException("Unsupported operation for column " + column);
}
}
}

@VisibleForTesting
Map<String, Operation> computeOperation(SegmentDirectory.Reader segmentReader)
throws Exception {
Map<String, Operation> columnOperationMap = new HashMap<>();

// Does not work for segment versions < V3
if (_segmentMetadata.getVersion().compareTo(SegmentVersion.v3) < 0) {
return columnOperationMap;
}

// From existing column config.
Set<String> existingAllColumns = _segmentMetadata.getAllColumns();
Set<String> existingDictColumns =
segmentReader.toSegmentDirectory().getColumnsWithIndex(ColumnIndexType.DICTIONARY);
Set<String> existingNoDictColumns = new HashSet<>();
for (String column : existingAllColumns) {
if (!existingDictColumns.contains(column)) {
existingNoDictColumns.add(column);
}
}

// From new column config.
Set<String> newNoDictColumns = _indexLoadingConfig.getNoDictionaryColumns();

for (String column : existingAllColumns) {
if (existingNoDictColumns.contains(column) && newNoDictColumns.contains(column)) {
// Both existing and new column is RAW forward index encoded. Check if compression needs to be changed.
if (shouldChangeCompressionType(column, segmentReader)) {
columnOperationMap.put(column, Operation.CHANGE_RAW_INDEX_COMPRESSION_TYPE);
}
}
}

return columnOperationMap;
}

private boolean shouldChangeCompressionType(String column, SegmentDirectory.Reader segmentReader) throws Exception {
ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column);

// TODO: Remove this MV column limitation.
if (!existingColMetadata.isSingleValue()) {
return false;
}

// The compression type for an existing segment can only be determined by reading the forward index header.
try (ForwardIndexReader fwdIndexReader = LoaderUtils.getForwardIndexReader(segmentReader, existingColMetadata)) {
ChunkCompressionType existingCompressionType = fwdIndexReader.getCompressionType();
Preconditions.checkState(existingCompressionType != null,
"Existing compressionType cannot be null for raw forward index column=" + column);

// Get the new compression type.
ChunkCompressionType newCompressionType = null;
Map<String, ChunkCompressionType> newCompressionConfigs = _indexLoadingConfig.getCompressionConfigs();
if (newCompressionConfigs.containsKey(column)) {
newCompressionType = newCompressionConfigs.get(column);
}

// Note that default compression type (PASS_THROUGH for metric and LZ4 for dimension) is not considered if the
// compressionType is not explicitly provided in tableConfig. This is to avoid incorrectly rewriting the all
// forward indexes during segmentReload when the default compressionType changes.
if (newCompressionType == null || existingCompressionType == newCompressionType) {
return false;
}

return true;
}
}

private void rewriteRawForwardIndex(String column, SegmentDirectory.Writer segmentWriter,
IndexCreatorProvider indexCreatorProvider)
throws Exception {
Preconditions.checkState(_segmentMetadata.getVersion() == SegmentVersion.v3);

ColumnMetadata existingColMetadata = _segmentMetadata.getColumnMetadataFor(column);
File indexDir = _segmentMetadata.getIndexDir();
String segmentName = _segmentMetadata.getName();
File inProgress = new File(indexDir, column + ".fwd.inprogress");
File fwdIndexFile = new File(indexDir, column + V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);

if (!inProgress.exists()) {
// Marker file does not exist, which means last run ended normally.
// Create a marker file.
FileUtils.touch(inProgress);
} else {
// Marker file exists, which means last run was interrupted.
// Remove forward index if exists.
FileUtils.deleteQuietly(fwdIndexFile);
}

LOGGER.info("Creating new forward index for segment={} and column={}", segmentName, column);

Map<String, ChunkCompressionType> compressionConfigs = _indexLoadingConfig.getCompressionConfigs();
Preconditions.checkState(compressionConfigs.containsKey(column));
// At this point, compressionConfigs is guaranteed to contain the column. If there's no entry in the map, we
// wouldn't have computed the CHANGE_RAW_COMPRESSION_TYPE operation for this column as compressionType changes
// are processed only if a valid compressionType is specified in fieldConfig.
ChunkCompressionType newCompressionType = compressionConfigs.get(column);

int numDocs = existingColMetadata.getTotalDocs();

try (ForwardIndexReader reader = LoaderUtils.getForwardIndexReader(segmentWriter, existingColMetadata)) {
int lengthOfLongestEntry = reader.getLengthOfLongestEntry();
Preconditions.checkState(lengthOfLongestEntry >= 0,
"lengthOfLongestEntry cannot be negative. segment=" + segmentName + " column={}" + column);

IndexCreationContext.Forward context =
IndexCreationContext.builder().withIndexDir(indexDir).withColumnMetadata(existingColMetadata)
.withLengthOfLongestEntry(lengthOfLongestEntry).build()
.forForwardIndex(newCompressionType, _indexLoadingConfig.getColumnProperties());

try (ForwardIndexCreator creator = indexCreatorProvider.newForwardIndexCreator(context)) {
// If creator stored type and the reader stored type do not match, throw an exception.
if (!reader.getStoredType().equals(creator.getValueType())) {
String failureMsg =
"Unsupported operation to change datatype for column=" + column + " from " + reader.getStoredType()
.toString() + " to " + creator.getValueType().toString();
throw new UnsupportedOperationException(failureMsg);
}

PinotSegmentColumnReader columnReader =
new PinotSegmentColumnReader(reader, null, null, existingColMetadata.getMaxNumberOfMultiValues());

for (int i = 0; i < numDocs; i++) {
Object val = columnReader.getValue(i);

// JSON fields are either stored as string or bytes. No special handling is needed because we make this
// decision based on the storedType of the reader.
switch (reader.getStoredType()) {
case INT:
creator.putInt((int) val);
break;
case LONG:
creator.putLong((long) val);
break;
case FLOAT:
creator.putFloat((float) val);
break;
case DOUBLE:
creator.putDouble((double) val);
break;
case STRING:
creator.putString((String) val);
break;
case BYTES:
creator.putBytes((byte[]) val);
break;
case BIG_DECIMAL:
creator.putBigDecimal((BigDecimal) val);
break;
default:
throw new IllegalStateException();
}
}
}
}

// We used the existing forward index to generate a new forward index. The existing forward index will be in V3
// format and the new forward index will be in V1 format. Remove the existing forward index as it is not needed
// anymore. Note that removeIndex() will only mark an index for removal and remove the in-memory state. The
// actual cleanup from columns.psf file will happen when singleFileIndexDirectory.cleanupRemovedIndices() is
// called during segmentWriter.close().
segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
LoaderUtils.writeIndexToV3Format(segmentWriter, column, fwdIndexFile, ColumnIndexType.FORWARD_INDEX);

// Delete the marker file.
FileUtils.deleteQuietly(inProgress);

LOGGER.info("Created forward index for segment: {}, column: {}", segmentName, column);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorProvider i
* Check if there is a need to add new indices or removes obsolete indices.
* @return true if there is a need to update.
*/
boolean needUpdateIndices(SegmentDirectory.Reader segmentReader);
boolean needUpdateIndices(SegmentDirectory.Reader segmentReader)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public static IndexHandler getIndexHandler(ColumnIndexType type, SegmentMetadata
return new H3IndexHandler(segmentMetadata, indexLoadingConfig);
case BLOOM_FILTER:
return new BloomFilterHandler(segmentMetadata, indexLoadingConfig);
case FORWARD_INDEX:
return new ForwardIndexHandler(segmentMetadata, indexLoadingConfig);
default:
return NO_OP_HANDLER;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pinot.segment.local.segment.index.column.PhysicalColumnIndexContainer;
import org.apache.pinot.segment.local.segment.index.loader.columnminmaxvalue.ColumnMinMaxValueGeneratorMode;
import org.apache.pinot.segment.local.segment.store.TextIndexUtils;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.H3IndexConfig;
Expand Down Expand Up @@ -75,6 +76,7 @@ public class IndexLoadingConfig {
private boolean _enableDynamicStarTreeCreation;
private List<StarTreeIndexConfig> _starTreeIndexConfigs;
private boolean _enableDefaultStarTree;
private Map<String, ChunkCompressionType> _compressionConfigs = new HashMap<>();

private SegmentVersion _segmentVersion;
private ColumnMinMaxValueGeneratorMode _columnMinMaxValueGeneratorMode = ColumnMinMaxValueGeneratorMode.DEFAULT_MODE;
Expand Down Expand Up @@ -153,6 +155,7 @@ private void extractFromTableConfig(TableConfig tableConfig) {
}
}

extractCompressionConfigs(tableConfig);
extractTextIndexColumnsFromTableConfig(tableConfig);
extractFSTIndexColumnsFromTableConfig(tableConfig);
extractH3IndexConfigsFromTableConfig(tableConfig);
Expand Down Expand Up @@ -215,6 +218,28 @@ private void extractFromTableConfig(TableConfig tableConfig) {
}
}

/**
* Extracts compressionType for each column. Populates a map containing column name as key and compression type as
* value. This map will only contain the compressionType overrides, and it does not correspond to the default value
* of compressionType (derived using SegmentColumnarIndexCreator.getColumnCompressionType()) used for a column.
* Note that only RAW forward index columns will be populated in this map.
* @param tableConfig table config
*/
private void extractCompressionConfigs(TableConfig tableConfig) {
List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList();
if (fieldConfigList == null) {
return;
}

for (FieldConfig fieldConfig : fieldConfigList) {
String column = fieldConfig.getName();
if (fieldConfig.getCompressionCodec() != null) {
ChunkCompressionType compressionType = ChunkCompressionType.valueOf(fieldConfig.getCompressionCodec().name());
_compressionConfigs.put(column, compressionType);
}
}
}

/**
* Text index creation info for each column is specified
* using {@link FieldConfig} model of indicating per column
Expand Down Expand Up @@ -371,6 +396,24 @@ public void setInvertedIndexColumns(Set<String> invertedIndexColumns) {
_invertedIndexColumns = invertedIndexColumns;
}

/**
* For tests only.
* Used by segmentPreProcessorTest to set raw columns.
*/
@VisibleForTesting
public void setNoDictionaryColumns(Set<String> noDictionaryColumns) {
_noDictionaryColumns = noDictionaryColumns;
}

/**
* For tests only.
* Used by segmentPreProcessorTest to set compression configs.
*/
@VisibleForTesting
public void setCompressionConfigs(Map<String, ChunkCompressionType> compressionConfigs) {
_compressionConfigs = compressionConfigs;
}

/**
* For tests only.
*/
Expand Down Expand Up @@ -424,6 +467,18 @@ public Set<String> getNoDictionaryColumns() {
return _noDictionaryColumns;
}

/**
* Populates a map containing column name as key and compression type as value. This map will only contain the
* compressionType overrides, and it does not correspond to the default value of compressionType (derived using
* SegmentColumnarIndexCreator.getColumnCompressionType()) used for a column. Note that only RAW forward index
* columns will be populated in this map.
*
* @return a map containing column name as key and compressionType as value.
*/
public Map<String, ChunkCompressionType> getCompressionConfigs() {
return _compressionConfigs;
}

public Map<String, String> getnoDictionaryConfig() {
return _noDictionaryConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ public void process()
// Update single-column indices, like inverted index, json index etc.
IndexCreatorProvider indexCreatorProvider = IndexingOverrides.getIndexCreatorProvider();
for (ColumnIndexType type : ColumnIndexType.values()) {
IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig)
.updateIndices(segmentWriter, indexCreatorProvider);
IndexHandler handler = IndexHandlerFactory.getIndexHandler(type, _segmentMetadata, _indexLoadingConfig);
handler.updateIndices(segmentWriter, indexCreatorProvider);
}

// Create/modify/remove star-trees if required.
Expand Down
Loading

0 comments on commit 0926bfe

Please sign in to comment.