Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Oct 1, 2024
1 parent 08ccda9 commit 52c5899
Showing 1 changed file with 61 additions and 62 deletions.
123 changes: 61 additions & 62 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ public String partition() {
private final PartitionSet deleteFilePartitions;
private final PartitionSet dropPartitions;
private final CharSequenceSet deletePaths = CharSequenceSet.empty();
private final DataFileSet dataFilesToDelete = DataFileSet.create();
private final DeleteFileSet deleteFilesToDelete = DeleteFileSet.create();
private final FilesToDeleteHolder filesToDelete = new FilesToDeleteHolder();
private Expression deleteExpression = Expressions.alwaysFalse();
private long minSequenceNumber = 0;
private boolean hasPathOnlyDeletes = false;
Expand Down Expand Up @@ -156,12 +155,7 @@ void caseSensitive(boolean newCaseSensitive) {
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();
if (file instanceof DataFile) {
dataFilesToDelete.add((DataFile) file);
} else {
deleteFilesToDelete.add((DeleteFile) file);
}

filesToDelete.delete(file);
deleteFilePartitions.add(file.specId(), file.partition());
}

Expand All @@ -175,8 +169,7 @@ void delete(CharSequence path) {

boolean containsDeletes() {
return !deletePaths.isEmpty()
|| !dataFilesToDelete.isEmpty()
|| !deleteFilesToDelete.isEmpty()
|| filesToDelete.containsDeletes()
|| deleteExpression != Expressions.alwaysFalse()
|| !dropPartitions.isEmpty();
}
Expand Down Expand Up @@ -241,42 +234,23 @@ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
*/
private void validateRequiredDeletes(ManifestFile... manifests) {
if (failMissingDeletePaths) {
FilesToDeleteHolder deletedFiles = deletedFiles(manifests);

ValidationException.check(
deletedFiles.dataFiles.containsAll(dataFilesToDelete),
"Missing required files to delete: %s",
COMMA.join(
dataFilesToDelete.stream()
.filter(f -> !deletedFiles.dataFiles.contains(f))
.map(ContentFile::location)
.collect(Collectors.toList())));

ValidationException.check(
deletedFiles.deleteFiles.containsAll(deleteFilesToDelete),
"Missing required files to delete: %s",
COMMA.join(
deleteFilesToDelete.stream()
.filter(f -> !deletedFiles.deleteFiles.contains(f))
.map(ContentFile::location)
.collect(Collectors.toList())));
deletedFiles(manifests).validateRequiredDeletes(filesToDelete);
}
}

private FilesToDeleteHolder deletedFiles(ManifestFile[] manifests) {
FilesToDeleteHolder result =
new FilesToDeleteHolder(DataFileSet.create(), DeleteFileSet.create());
FilesToDeleteHolder deletedFiles = new FilesToDeleteHolder();
if (manifests != null) {
for (ManifestFile manifest : manifests) {
FilesToDeleteHolder manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
result.dataFiles.addAll(manifestDeletes.dataFiles);
result.deleteFiles.addAll(manifestDeletes.deleteFiles);
deletedFiles.dataFiles.addAll(manifestDeletes.dataFiles);
deletedFiles.deleteFiles.addAll(manifestDeletes.deleteFiles);
}
}
}

return result;
return deletedFiles;
}

/**
Expand Down Expand Up @@ -368,7 +342,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
boolean canContainDroppedFiles;
if (hasPathOnlyDeletes) {
canContainDroppedFiles = true;
} else if (!dataFilesToDelete.isEmpty() || !deleteFilesToDelete.isEmpty()) {
} else if (filesToDelete.containsDeletes()) {
// because there were no path-only deletes, the set of deleted file partitions is valid
canContainDroppedFiles =
ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
Expand Down Expand Up @@ -396,16 +370,11 @@ private boolean manifestHasDeletedFiles(

// add path-based delete to set of files to be deleted
if (deletePaths.contains(CharSequenceWrapper.wrap(file.path()))) {
if (file instanceof DataFile) {
dataFilesToDelete.add((DataFile) file);
} else {
deleteFilesToDelete.add((DeleteFile) file);
}
filesToDelete.delete(file);
}

boolean markedForDelete =
dataFilesToDelete.contains(file)
|| deleteFilesToDelete.contains(file)
filesToDelete.markedForDelete(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand Down Expand Up @@ -441,8 +410,7 @@ private ManifestFile filterManifestWithDeletedFiles(
boolean isDelete = reader.isDeleteManifestReader();
// when this point is reached, there is at least one file that will be deleted in the
// manifest. produce a copy of the manifest with all deleted files removed.
DataFileSet deletedDataFiles = DataFileSet.create();
DeleteFileSet deletedDeleteFiles = DeleteFileSet.create();
FilesToDeleteHolder deletedFiles = new FilesToDeleteHolder();

try {
ManifestWriter<F> writer = newManifestWriter(reader.spec());
Expand All @@ -453,8 +421,7 @@ private ManifestFile filterManifestWithDeletedFiles(
entry -> {
F file = entry.file();
boolean markedForDelete =
dataFilesToDelete.contains(file)
|| deleteFilesToDelete.contains(file)
filesToDelete.markedForDelete(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand All @@ -469,25 +436,21 @@ private ManifestFile filterManifestWithDeletedFiles(
// the expression
"Cannot delete file where some, but not all, rows match filter %s: %s",
this.deleteExpression,
file.path());
file.location());

if (allRowsMatch) {
writer.delete(entry);

if (deletedDataFiles.contains(file) || deletedDeleteFiles.contains(file)) {
if (deletedFiles.markedForDelete(file)) {
LOG.warn(
"Deleting a duplicate path from manifest {}: {}",
manifest.path(),
file.path());
file.location());
duplicateDeleteCount += 1;
} else {
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for non-duplicate data
if (file instanceof DataFile) {
deletedDataFiles.add((DataFile) file.copyWithoutStats());
} else {
deletedDeleteFiles.add((DeleteFile) file.copyWithoutStats());
}
deletedFiles.delete(file.copyWithoutStats());
}
} else {
writer.existing(entry);
Expand All @@ -507,8 +470,7 @@ private ManifestFile filterManifestWithDeletedFiles(

// update caches
filteredManifests.put(manifest, filtered);
filteredManifestToDeletedFiles.put(
filtered, new FilesToDeleteHolder(deletedDataFiles, deletedDeleteFiles));
filteredManifestToDeletedFiles.put(filtered, deletedFiles);

return filtered;

Expand Down Expand Up @@ -570,13 +532,50 @@ private Pair<InclusiveMetricsEvaluator, StrictMetricsEvaluator> metricsEvaluator
}
}

private static class FilesToDeleteHolder {
private final DataFileSet dataFiles;
private final DeleteFileSet deleteFiles;
private class FilesToDeleteHolder {
private final DataFileSet dataFiles = DataFileSet.create();
private final DeleteFileSet deleteFiles = DeleteFileSet.create();

private FilesToDeleteHolder() {}

private void delete(F file) {
if (file instanceof DataFile) {
dataFiles.add((DataFile) file);
} else {
deleteFiles.add((DeleteFile) file);
}
}

private boolean containsDeletes() {
return !dataFiles.isEmpty() || !deleteFiles.isEmpty();
}

private boolean markedForDelete(F file) {
if (file instanceof DataFile) {
return dataFiles.contains((DataFile) file);
} else {
return deleteFiles.contains((DeleteFile) file);
}
}

private void validateRequiredDeletes(FilesToDeleteHolder filesToBeDeleted) {
ValidationException.check(
dataFiles.containsAll(filesToBeDeleted.dataFiles),
"Missing required files to delete: %s",
COMMA.join(
filesToBeDeleted.dataFiles.stream()
.filter(f -> !dataFiles.contains(f))
.map(ContentFile::location)
.collect(Collectors.toList())));

private FilesToDeleteHolder(DataFileSet dataFiles, DeleteFileSet deleteFiles) {
this.dataFiles = dataFiles;
this.deleteFiles = deleteFiles;
ValidationException.check(
deleteFiles.containsAll(filesToBeDeleted.deleteFiles),
"Missing required files to delete: %s",
COMMA.join(
filesToBeDeleted.deleteFiles.stream()
.filter(f -> !deleteFiles.contains(f))
.map(ContentFile::location)
.collect(Collectors.toList())));
}
}
}

0 comments on commit 52c5899

Please sign in to comment.