Skip to content

Commit

Permalink
Default column handling of noForwardIndex and regeneration of forward…
Browse files Browse the repository at this point in the history
… index on reload path (#9810)

* Add code to handle default column and add a utility to construct forward index from inverted index and dictionary

* Empty-Commit

* Fix compilation error

* Address review comments

* Empty-Commit

* Empty-Commit

* Empty-Commit

* Address review comments

* Empty-Commit

* Address latest review comments
  • Loading branch information
somandal authored Dec 12, 2022
1 parent 9c64672 commit 86fe730
Show file tree
Hide file tree
Showing 20 changed files with 2,608 additions and 235 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.segment.index.loader;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.creator.IndexCreatorProvider;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Base class for all of the {@link IndexHandler} classes. This class provides a mechanism to rebuild the forward
* index if the forward index does not exist and is required to rebuild the index of interest. It also handles cleaning
* up the forward index if temporarily built once all handlers have completed via overriding the
* postUpdateIndicesCleanup() method. For {@link IndexHandler} classes which do not utilize the forward index or do not
* need this behavior, the postUpdateIndicesCleanup() method can be overridden to be a no-op.
*/
public abstract class BaseIndexHandler implements IndexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseIndexHandler.class);

protected final SegmentMetadata _segmentMetadata;
protected final IndexLoadingConfig _indexLoadingConfig;
protected final Set<String> _tmpForwardIndexColumns;

public BaseIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig) {
_segmentMetadata = segmentMetadata;
_indexLoadingConfig = indexLoadingConfig;
_tmpForwardIndexColumns = new HashSet<>();
}

@Override
public void postUpdateIndicesCleanup(SegmentDirectory.Writer segmentWriter)
throws Exception {
// Delete the forward index for columns which have it disabled. Perform this as a post-processing step after all
// IndexHandlers have updated their indexes as some of them need to temporarily create a forward index to
// generate other indexes off of.
for (String column : _tmpForwardIndexColumns) {
segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
}
}

protected void createForwardIndexIfNeeded(SegmentDirectory.Writer segmentWriter, ColumnMetadata columnMetadata,
IndexCreatorProvider indexCreatorProvider, boolean isTemporaryForwardIndex)
throws IOException {
String columnName = columnMetadata.getColumnName();
if (segmentWriter.hasIndexFor(columnName, ColumnIndexType.FORWARD_INDEX)) {
LOGGER.info("Forward index already exists for column: {}, skip trying to create it", columnName);
return;
}

// If forward index is disabled it means that it has to be dictionary based and the inverted index must exist.
Preconditions.checkState(segmentWriter.hasIndexFor(columnName, ColumnIndexType.DICTIONARY),
String.format("Forward index disabled column %s must have a dictionary", columnName));
Preconditions.checkState(segmentWriter.hasIndexFor(columnName, ColumnIndexType.INVERTED_INDEX),
String.format("Forward index disabled column %s must have an inverted index", columnName));

LOGGER.info("Rebuilding the forward index for column: {}, is temporary: {}", columnName, isTemporaryForwardIndex);
InvertedIndexAndDictionaryBasedForwardIndexCreator invertedIndexAndDictionaryBasedForwardIndexCreator =
new InvertedIndexAndDictionaryBasedForwardIndexCreator(columnName, _segmentMetadata, _indexLoadingConfig,
segmentWriter, indexCreatorProvider, isTemporaryForwardIndex);
invertedIndexAndDictionaryBasedForwardIndexCreator.regenerateForwardIndex();

// Validate that the forward index is created.
if (!segmentWriter.hasIndexFor(columnName, ColumnIndexType.FORWARD_INDEX)) {
throw new IOException(String.format("Forward index was not created for column: %s, is temporary: %s", columnName,
isTemporaryForwardIndex ? "true" : "false"));
}

if (isTemporaryForwardIndex) {
_tmpForwardIndexColumns.add(columnName);
}

LOGGER.info("Rebuilt the forward index for column: {}, is temporary: {}", columnName, isTemporaryForwardIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -79,33 +82,33 @@
* 2. Enable dictionary
* 3. Disable dictionary
* 4. Disable forward index
* 5. Rebuild the forward index for a forwardIndexDisabled column
*
* TODO: Add support for the following:
* 1. Segment versions < V3
* 2. Enable forward index on a forward index disabled column
*/
public class ForwardIndexHandler implements IndexHandler {
public class ForwardIndexHandler extends BaseIndexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ForwardIndexHandler.class);

private final SegmentMetadata _segmentMetadata;
private final IndexLoadingConfig _indexLoadingConfig;
// This should contain a list of all indexes that need to be rewritten if the dictionary is enabled or disabled
private static final List<ColumnIndexType> DICTIONARY_BASED_INDEXES_TO_REWRITE =
Arrays.asList(ColumnIndexType.RANGE_INDEX, ColumnIndexType.FST_INDEX, ColumnIndexType.INVERTED_INDEX);

private final Schema _schema;
private final Set<String> _forwardIndexDisabledColumnsToCleanup;

protected enum Operation {
// TODO: Add other operations like ADD_FORWARD_INDEX_FOR_DICT_COLUMN, ADD_FORWARD_INDEX_FOR_RAW_COLUMN
DISABLE_FORWARD_INDEX_FOR_DICT_COLUMN,
DISABLE_FORWARD_INDEX_FOR_RAW_COLUMN,
ENABLE_FORWARD_INDEX_FOR_DICT_COLUMN,
ENABLE_FORWARD_INDEX_FOR_RAW_COLUMN,
ENABLE_DICTIONARY,
DISABLE_DICTIONARY,
CHANGE_RAW_INDEX_COMPRESSION_TYPE,
}

public ForwardIndexHandler(SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig, Schema schema) {
_segmentMetadata = segmentMetadata;
_indexLoadingConfig = indexLoadingConfig;
super(segmentMetadata, indexLoadingConfig);
_schema = schema;
_forwardIndexDisabledColumnsToCleanup = new HashSet<>();
}

@Override
Expand All @@ -132,7 +135,7 @@ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorPro
// Deletion of the forward index will be handled outside the index handler to ensure that other index
// handlers that need the forward index to construct their own indexes will have it available.
// The existing forward index must be in dictionary format for this to be a no-op.
_forwardIndexDisabledColumnsToCleanup.add(column);
_tmpForwardIndexColumns.add(column);
break;
}
case DISABLE_FORWARD_INDEX_FOR_RAW_COLUMN: {
Expand All @@ -142,9 +145,29 @@ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorPro
// forward index here which is dictionary based and allow the post deletion step handle the actual deletion
// of the forward index.
createDictBasedForwardIndex(column, segmentWriter, indexCreatorProvider);
Preconditions.checkState(segmentWriter.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX),
String.format("Temporary forward index was not created for column: %s", column));
_forwardIndexDisabledColumnsToCleanup.add(column);
if (!segmentWriter.hasIndexFor(column, ColumnIndexType.FORWARD_INDEX)) {
throw new IOException(String.format("Temporary forward index was not created for column: %s", column));
}
_tmpForwardIndexColumns.add(column);
break;
}
case ENABLE_FORWARD_INDEX_FOR_DICT_COLUMN: {
createForwardIndexIfNeeded(segmentWriter, _segmentMetadata.getColumnMetadataFor(column), indexCreatorProvider,
false);
if (!segmentWriter.hasIndexFor(column, ColumnIndexType.DICTIONARY)) {
throw new IOException(
String.format("Dictionary should still exist after rebuilding forward index for dictionary column: %s",
column));
}
break;
}
case ENABLE_FORWARD_INDEX_FOR_RAW_COLUMN: {
createForwardIndexIfNeeded(segmentWriter, _segmentMetadata.getColumnMetadataFor(column), indexCreatorProvider,
false);
if (segmentWriter.hasIndexFor(column, ColumnIndexType.DICTIONARY)) {
throw new IOException(
String.format("Dictionary should not exist after rebuilding forward index for raw column: %s", column));
}
break;
}
case ENABLE_DICTIONARY: {
Expand All @@ -165,17 +188,6 @@ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorPro
}
}

@Override
public void postUpdateIndicesCleanup(SegmentDirectory.Writer segmentWriter)
throws Exception {
// Delete the forward index for columns which have it disabled. Perform this as a post-processing step after all
// IndexHandlers have updated their indexes as some of them need to temporarily create a forward index to
// generate other indexes off of.
for (String column : _forwardIndexDisabledColumnsToCleanup) {
segmentWriter.removeIndex(column, ColumnIndexType.FORWARD_INDEX);
}
}

@VisibleForTesting
Map<String, Operation> computeOperation(SegmentDirectory.Reader segmentReader)
throws Exception {
Expand Down Expand Up @@ -232,14 +244,27 @@ Map<String, Operation> computeOperation(SegmentDirectory.Reader segmentReader)
} else {
columnOperationMap.put(column, Operation.DISABLE_FORWARD_INDEX_FOR_RAW_COLUMN);
}
} else if (existingForwardIndexDisabledColumns.contains(column) && !newForwardIndexDisabledColumns.contains(
column)) {
// TODO: Add support: existing column has its forward index disabled. New column config enables the forward
// index
throw new UnsupportedOperationException(String.format("Recreating forward index for column: %s is not yet "
+ "supported. Please backfill or refresh the data for now.", column));
} else if (existingForwardIndexDisabledColumns.contains(column) && newForwardIndexDisabledColumns.contains(
column)) {
} else if (existingForwardIndexDisabledColumns.contains(column)
&& !newForwardIndexDisabledColumns.contains(column)) {
// Existing column does not have a forward index. New column config enables the forward index
ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
if (columnMetadata != null && columnMetadata.isSorted()) {
// Check if the column is sorted. If sorted, disabling forward index should be a no-op and forward index
// should already exist. Do not return an operation for this column related to enabling forward index.
LOGGER.warn("Trying to enable the forward index for a sorted column {}, ignoring", column);
continue;
}

if (newNoDictColumns.contains(column)) {
Preconditions.checkState(!_indexLoadingConfig.getInvertedIndexColumns().contains(column),
String.format("Must disable inverted index to enable the forward index as noDictionary for column: %s",
column));
columnOperationMap.put(column, Operation.ENABLE_FORWARD_INDEX_FOR_RAW_COLUMN);
} else {
columnOperationMap.put(column, Operation.ENABLE_FORWARD_INDEX_FOR_DICT_COLUMN);
}
} else if (existingForwardIndexDisabledColumns.contains(column)
&& newForwardIndexDisabledColumns.contains(column)) {
// Forward index is disabled for the existing column and should remain disabled based on the latest config
Preconditions.checkState(existingDictColumns.contains(column) && !newNoDictColumns.contains(column),
String.format("Not allowed to disable the dictionary for a column: %s without forward index", column));
Expand Down Expand Up @@ -317,11 +342,7 @@ private boolean shouldChangeCompressionType(String column, SegmentDirectory.Read
// Note that default compression type (PASS_THROUGH for metric and LZ4 for dimension) is not considered if the
// compressionType is not explicitly provided in tableConfig. This is to avoid incorrectly rewriting all the
// forward indexes during segmentReload when the default compressionType changes.
if (newCompressionType == null || existingCompressionType == newCompressionType) {
return false;
}

return true;
return newCompressionType != null && existingCompressionType != newCompressionType;
}
}

Expand Down Expand Up @@ -803,17 +824,15 @@ private void writeDictEnabledForwardIndex(String column, ColumnMetadata existing
}
}

private void removeDictRelatedIndexes(String column, SegmentDirectory.Writer segmentWriter) {
static void removeDictRelatedIndexes(String column, SegmentDirectory.Writer segmentWriter) {
// TODO: Move this logic as a static function in each index creator.

// Remove all dictionary related indexes. They will be recreated if necessary by the respective handlers. Note that
// the remove index call will be a no-op if the index doesn't exist.
segmentWriter.removeIndex(column, ColumnIndexType.RANGE_INDEX);
segmentWriter.removeIndex(column, ColumnIndexType.FST_INDEX);
segmentWriter.removeIndex(column, ColumnIndexType.INVERTED_INDEX);
DICTIONARY_BASED_INDEXES_TO_REWRITE.forEach((index) -> segmentWriter.removeIndex(column, index));
}

private void updateMetadataProperties(File indexDir, Map<String, String> metadataProperties)
static void updateMetadataProperties(File indexDir, Map<String, String> metadataProperties)
throws Exception {
File v3Dir = SegmentDirectoryPaths.segmentDirectoryFor(indexDir, SegmentVersion.v3);
File metadataFile = new File(v3Dir, V1Constants.MetadataKeys.METADATA_FILE_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ boolean needUpdateIndices(SegmentDirectory.Reader segmentReader)
* Performs any cleanup actions required after the indexes have been updated.
* Should be called only after all IndexHandlers have run.
*/
default void postUpdateIndicesCleanup(SegmentDirectory.Writer segmentWriter)
throws Exception {
}
void postUpdateIndicesCleanup(SegmentDirectory.Writer segmentWriter)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public void updateIndices(SegmentDirectory.Writer segmentWriter, IndexCreatorPro
public boolean needUpdateIndices(SegmentDirectory.Reader segmentReader) {
return false;
}

@Override
public void postUpdateIndicesCleanup(SegmentDirectory.Writer segmentWriter) {
}
};

public static IndexHandler getIndexHandler(ColumnIndexType type, SegmentMetadataImpl segmentMetadata,
Expand Down
Loading

0 comments on commit 86fe730

Please sign in to comment.