Skip to content

Commit

Permalink
[HUDI-8371] Fix column stats index with MDT for few scenarios (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan authored Oct 28, 2024
1 parent f4a8e01 commit 588e011
Show file tree
Hide file tree
Showing 29 changed files with 896 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,13 @@ public List<MetadataPartitionType> getEnabledPartitionTypes() {
protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
Option<String> inflightInstantTimestamp) throws IOException {
HoodieTimer timer = HoodieTimer.start();
List<MetadataPartitionType> partitionsToInit = new ArrayList<>(MetadataPartitionType.getValidValues().length);
List<MetadataPartitionType> metadataPartitionsToInit = new ArrayList<>(MetadataPartitionType.getValidValues().length);

try {
boolean exists = metadataTableExists(dataMetaClient);
if (!exists) {
// FILES partition is always required
partitionsToInit.add(FILES);
metadataPartitionsToInit.add(FILES);
}

// check if any of the enabled partition types needs to be initialized
Expand All @@ -255,10 +255,10 @@ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
LOG.info("Async metadata indexing disabled and following partitions already initialized: {}", completedPartitions);
this.enabledPartitionTypes.stream()
.filter(p -> !completedPartitions.contains(p.getPartitionPath()) && !FILES.equals(p))
.forEach(partitionsToInit::add);
.forEach(metadataPartitionsToInit::add);
}

if (partitionsToInit.isEmpty()) {
if (metadataPartitionsToInit.isEmpty()) {
// No partitions left to initialize, since all the metadata enabled partitions are either initialized before
// or current in the process of initialization.
initMetadataReader();
Expand All @@ -268,13 +268,7 @@ protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
// If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the latest completed action.
String initializationTime = dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);

// Initialize partitions for the first time using data from the files on the file system
if (!initializeFromFilesystem(initializationTime, partitionsToInit, inflightInstantTimestamp)) {
LOG.error("Failed to initialize MDT from filesystem");
return false;
}

initializeFromFilesystem(initializationTime, metadataPartitionsToInit, inflightInstantTimestamp);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
return true;
} catch (IOException e) {
Expand Down Expand Up @@ -344,7 +338,7 @@ private boolean isBootstrapNeeded(Option<HoodieInstant> latestMetadataInstant) {
* @param partitionsToInit - List of MDT partitions to initialize
* @param inflightInstantTimestamp - Current action instant responsible for this initialization
*/
private boolean initializeFromFilesystem(String initializationTime, List<MetadataPartitionType> partitionsToInit,
private void initializeFromFilesystem(String initializationTime, List<MetadataPartitionType> partitionsToInit,
Option<String> inflightInstantTimestamp) throws IOException {
Set<String> pendingDataInstants = getPendingDataInstants(dataMetaClient);

Expand Down Expand Up @@ -461,8 +455,7 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
}

if (LOG.isInfoEnabled()) {
LOG.info("Initializing {} index with {} mappings and {} file groups.", partitionTypeName, fileGroupCountAndRecordsPair.getKey(),
fileGroupCountAndRecordsPair.getValue().count());
LOG.info("Initializing {} index with {} mappings", partitionTypeName, fileGroupCountAndRecordsPair.getKey());
}
HoodieTimer partitionInitTimer = HoodieTimer.start();

Expand All @@ -482,8 +475,6 @@ private boolean initializeFromFilesystem(String initializationTime, List<Metadat
long totalInitTime = partitionInitTimer.endTimer();
LOG.info("Initializing {} index in metadata table took {} in ms", partitionTypeName, totalInitTime);
}

return true;
}

/**
Expand Down Expand Up @@ -520,9 +511,11 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializePartitionStatsIndex(Li
}

private Pair<Integer, HoodieData<HoodieRecord>> initializeColumnStatsPartition(Map<String, Map<String, Long>> partitionToFilesMap) {
// during initialization, we need stats for base and log files.
HoodieData<HoodieRecord> records = HoodieTableMetadataUtil.convertFilesToColumnStatsRecords(
engineContext, Collections.emptyMap(), partitionToFilesMap, dataMetaClient, dataWriteConfig.isMetadataColumnStatsIndexEnabled(),
dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex());
dataWriteConfig.getColumnStatsIndexParallelism(), dataWriteConfig.getColumnsEnabledForColumnStatsIndex(),
dataWriteConfig.getMetadataConfig().getMaxReaderBufferSize());

final int fileGroupCount = dataWriteConfig.getMetadataConfig().getColumnStatsIndexFileGroupCount();
return Pair.of(fileGroupCount, records);
Expand Down Expand Up @@ -863,12 +856,13 @@ private List<DirectoryInfo> listAllPartitionsFromFilesystem(String initializatio
* @return List consisting of {@code DirectoryInfo} for each partition found.
*/
private List<DirectoryInfo> listAllPartitionsFromMDT(String initializationTime, Set<String> pendingDataInstants) throws IOException {
List<String> allPartitionPaths = metadata.getAllPartitionPaths().stream()
List<String> allAbsolutePartitionPaths = metadata.getAllPartitionPaths().stream()
.map(partitionPath -> dataWriteConfig.getBasePath() + StoragePath.SEPARATOR_CHAR + partitionPath).collect(Collectors.toList());
Map<String, List<StoragePathInfo>> partitionFileMap = metadata.getAllFilesInPartitions(allPartitionPaths);
Map<String, List<StoragePathInfo>> partitionFileMap = metadata.getAllFilesInPartitions(allAbsolutePartitionPaths);
List<DirectoryInfo> dirinfoList = new ArrayList<>(partitionFileMap.size());
for (Map.Entry<String, List<StoragePathInfo>> entry : partitionFileMap.entrySet()) {
dirinfoList.add(new DirectoryInfo(entry.getKey(), entry.getValue(), initializationTime, pendingDataInstants));
String relativeDirPath = FSUtils.getRelativePartitionPath(new StoragePath(dataWriteConfig.getBasePath()), new StoragePath(entry.getKey()));
dirinfoList.add(new DirectoryInfo(relativeDirPath, entry.getValue(), initializationTime, pendingDataInstants, false));
}
return dirinfoList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ public static String getFileId(String fullFileName) {
return fullFileName.split("_", 2)[0];
}

/**
* @param filePath
* @returns the filename from the given path. Path could be the absolute path or just partition path and file name.
*/
public static String getFileNameFromPath(String filePath) {
return filePath.substring(filePath.lastIndexOf("/") + 1);
}

/**
* Gets all partition paths assuming date partitioning (year, month, day) three levels down.
* TODO: (Lin) Delete this function after we remove the assume.date.partitioning config completely.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected <T> void processNextRecord(HoodieRecord<T> hoodieRecord) throws Except

@Override
protected void processNextDeletedRecord(DeleteRecord deleteRecord) {
throw new IllegalStateException("Not expected to see delete records in this log-scan mode. Check Job Config");
// no - op
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -840,12 +840,17 @@ private Map<String, String> reverseLookupSecondaryKeys(String partitionName, Lis
}

Set<String> keySet = new TreeSet<>(recordKeys);
Set<String> deletedRecordsFromLogs = new HashSet<>();
Map<String, HoodieRecord<HoodieMetadataPayload>> logRecordsMap = new HashMap<>();
logRecordScanner.getRecords().forEach(record -> {
HoodieMetadataPayload payload = record.getData();
String recordKey = payload.getRecordKeyFromSecondaryIndex();
if (keySet.contains(recordKey)) {
logRecordsMap.put(recordKey, record);
if (!payload.isDeleted()) { // process only valid records.
String recordKey = payload.getRecordKeyFromSecondaryIndex();
if (keySet.contains(recordKey)) {
logRecordsMap.put(recordKey, record);
}
} else {
deletedRecordsFromLogs.add(record.getRecordKey());
}
});

Expand All @@ -856,7 +861,11 @@ private Map<String, String> reverseLookupSecondaryKeys(String partitionName, Lis
Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord = HoodieMetadataPayload.combineSecondaryIndexRecord(oldRecord, newRecord);
return mergedRecord.orElseGet(null);
}));
baseFileRecords.forEach((key, value) -> recordKeyMap.put(key, value.getRecordKey()));
baseFileRecords.forEach((key, value) -> {
if (!deletedRecordsFromLogs.contains(key)) {
recordKeyMap.put(key, value.getRecordKey());
}
});
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata table for " + recordKeys.size() + " key : ", ioe);
} finally {
Expand Down Expand Up @@ -931,17 +940,22 @@ private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> lookupSecondaryKe
List<String> sortedSecondaryKeys = new ArrayList<>(secondaryKeys);
secondaryKeySet.addAll(sortedSecondaryKeys);
Collections.sort(sortedSecondaryKeys);
Set<String> deletedRecordKeysFromLogs = new HashSet<>();

logRecordScanner.getRecords().forEach(record -> {
HoodieMetadataPayload payload = record.getData();
String secondaryKey = payload.key;
if (secondaryKeySet.contains(secondaryKey)) {
String recordKey = payload.getRecordKeyFromSecondaryIndex();
logRecordsMap.computeIfAbsent(secondaryKey, k -> new HashMap<>()).put(recordKey, record);
if (!payload.isDeleted()) {
String secondaryKey = payload.key;
if (secondaryKeySet.contains(secondaryKey)) {
String recordKey = payload.getRecordKeyFromSecondaryIndex();
logRecordsMap.computeIfAbsent(secondaryKey, k -> new HashMap<>()).put(recordKey, record);
}
} else {
deletedRecordKeysFromLogs.add(record.getRecordKey());
}
});

return readNonUniqueRecordsAndMergeWithLogRecords(baseFileReader, sortedSecondaryKeys, logRecordsMap, timings, partitionName);
return readNonUniqueRecordsAndMergeWithLogRecords(baseFileReader, sortedSecondaryKeys, logRecordsMap, timings, partitionName, deletedRecordKeysFromLogs);
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata table for " + secondaryKeys.size() + " key : ", ioe);
} finally {
Expand All @@ -955,7 +969,8 @@ private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> readNonUniqueReco
List<String> sortedKeys,
Map<String, HashMap<String, HoodieRecord>> logRecordsMap,
List<Long> timings,
String partitionName) throws IOException {
String partitionName,
Set<String> deleteRecordKeysFromLogs) throws IOException {
HoodieTimer timer = HoodieTimer.start();

Map<String, List<HoodieRecord<HoodieMetadataPayload>>> resultMap = new HashMap<>();
Expand All @@ -978,9 +993,13 @@ private Map<String, List<HoodieRecord<HoodieMetadataPayload>>> readNonUniqueReco
if (logRecordsMap.isEmpty() && !baseFileRecordsMap.isEmpty()) {
// file slice has only base file
timings.add(timer.endTimer());
if (!deleteRecordKeysFromLogs.isEmpty()) { // remove deleted records from log from base file record list
deleteRecordKeysFromLogs.forEach(key -> baseFileRecordsMap.remove(key));
}
return baseFileRecordsMap;
}

// check why we are not considering records missing from logs, but only from base file.
logRecordsMap.forEach((secondaryKey, logRecords) -> {
if (!baseFileRecordsMap.containsKey(secondaryKey)) {
List<HoodieRecord<HoodieMetadataPayload>> recordList = logRecords
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,38 +206,40 @@ public HoodieMetadataPayload(Option<GenericRecord> recordOpt) {
}

protected HoodieMetadataPayload(String key, int type, Map<String, HoodieMetadataFileInfo> filesystemMetadata) {
this(key, type, filesystemMetadata, null, null, null, null);
this(key, type, filesystemMetadata, null, null, null, null, false);
}

protected HoodieMetadataPayload(String key, HoodieMetadataBloomFilter metadataBloomFilter) {
this(key, MetadataPartitionType.BLOOM_FILTERS.getRecordType(), null, metadataBloomFilter, null, null, null);
this(key, MetadataPartitionType.BLOOM_FILTERS.getRecordType(), null, metadataBloomFilter, null, null, null, metadataBloomFilter.getIsDeleted());
}

protected HoodieMetadataPayload(String key, HoodieMetadataColumnStats columnStats, int recordType) {
this(key, recordType, null, null, columnStats, null, null);
this(key, recordType, null, null, columnStats, null, null, columnStats.getIsDeleted());
}

private HoodieMetadataPayload(String key, HoodieRecordIndexInfo recordIndexMetadata) {
this(key, MetadataPartitionType.RECORD_INDEX.getRecordType(), null, null, null, recordIndexMetadata, null);
this(key, MetadataPartitionType.RECORD_INDEX.getRecordType(), null, null, null, recordIndexMetadata, null, false);
}

private HoodieMetadataPayload(String key, HoodieSecondaryIndexInfo secondaryIndexMetadata) {
this(key, MetadataPartitionType.SECONDARY_INDEX.getRecordType(), null, null, null, null, secondaryIndexMetadata);
this(key, MetadataPartitionType.SECONDARY_INDEX.getRecordType(), null, null, null, null, secondaryIndexMetadata, secondaryIndexMetadata.getIsDeleted());
}

protected HoodieMetadataPayload(String key, int type,
Map<String, HoodieMetadataFileInfo> filesystemMetadata,
HoodieMetadataBloomFilter metadataBloomFilter,
HoodieMetadataColumnStats columnStats,
HoodieRecordIndexInfo recordIndexMetadata,
HoodieSecondaryIndexInfo secondaryIndexMetadata) {
HoodieSecondaryIndexInfo secondaryIndexMetadata,
boolean isDeletedRecord) {
this.key = key;
this.type = type;
this.filesystemMetadata = filesystemMetadata;
this.bloomFilterMetadata = metadataBloomFilter;
this.columnStatMetadata = columnStats;
this.recordIndexMetadata = recordIndexMetadata;
this.secondaryIndexMetadata = secondaryIndexMetadata;
this.isDeletedRecord = isDeletedRecord;
}

/**
Expand Down
Loading

0 comments on commit 588e011

Please sign in to comment.