diff --git a/api/src/main/java/org/apache/iceberg/types/Comparators.java b/api/src/main/java/org/apache/iceberg/types/Comparators.java index bfbffc64b673..6d968d2055e2 100644 --- a/api/src/main/java/org/apache/iceberg/types/Comparators.java +++ b/api/src/main/java/org/apache/iceberg/types/Comparators.java @@ -22,6 +22,7 @@ import java.util.Comparator; import java.util.List; import java.util.function.IntFunction; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.util.UnicodeUtil; @@ -179,6 +180,10 @@ public static Comparator filePath() { return FilePathComparator.INSTANCE; } + public static Comparator> contentFile() { + return ContentFileComparator.INSTANCE; + } + private static class NullsFirst implements Comparator { private static final NullsFirst INSTANCE = new NullsFirst<>(); @@ -394,4 +399,15 @@ public int compare(CharSequence s1, CharSequence s2) { return 0; } } + + private static class ContentFileComparator implements Comparator> { + private static final ContentFileComparator INSTANCE = new ContentFileComparator(); + + private ContentFileComparator() {} + + @Override + public int compare(ContentFile s1, ContentFile s2) { + return CharSeqComparator.INSTANCE.compare(s1.path(), s2.path()); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java index d929bc068ec2..18b8a177eb66 100644 --- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java @@ -27,10 +27,11 @@ import org.apache.iceberg.expressions.StrictMetricsEvaluator; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; public class BaseOverwriteFiles extends MergingSnapshotProducer implements OverwriteFiles { - private final Set deletedDataFiles = Sets.newHashSet(); + private final Set deletedDataFiles = Sets.newTreeSet(Comparators.contentFile()); private boolean validateAddedFilesMatchOverwriteFilter = false; private Long startingSnapshotId = null; private Expression conflictDetectionFilter = null; diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java index d231536d0642..510295e91271 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java @@ -22,9 +22,10 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; class BaseRewriteFiles extends MergingSnapshotProducer implements RewriteFiles { - private final Set replacedDataFiles = Sets.newHashSet(); + private final Set replacedDataFiles = Sets.newTreeSet(Comparators.contentFile()); private Long startingSnapshotId = null; BaseRewriteFiles(String tableName, TableOperations ops) { diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 1bae2e2fc5a0..616e009439da 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -30,7 +30,8 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; /** * {@link AppendFiles Append} implementation that adds a new manifest file for the write. @@ -44,7 +45,7 @@ class FastAppend extends SnapshotProducer implements AppendFiles { private final PartitionSpec spec; private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder(); private final List newFiles = Lists.newArrayList(); - private final CharSequenceSet newFilePaths = CharSequenceSet.empty(); + private final Set newFileSet = Sets.newTreeSet(Comparators.contentFile()); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private List newManifests = null; @@ -86,7 +87,7 @@ protected Map summary() { @Override public FastAppend appendFile(DataFile file) { Preconditions.checkNotNull(file, "Invalid data file: null"); - if (newFilePaths.add(file.path())) { + if (newFileSet.add(file)) { this.hasNewFiles = true; newFiles.add(file); summaryBuilder.addedFile(spec, file); diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 6a4da2abc9b6..ffeaa221522d 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -49,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PartitionSet; @@ -81,8 +82,8 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { // update data private final Map> newDataFilesBySpec = Maps.newHashMap(); - private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty(); - private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty(); + private final Set newDataFiles = Sets.newTreeSet(Comparators.contentFile()); + private final Set newDeleteFiles = Sets.newTreeSet(Comparators.contentFile()); private Long newDataFilesDataSequenceNumber; private final Map> newDeleteFilesBySpec = Maps.newHashMap(); private final List appendManifests = Lists.newArrayList(); @@ -234,7 +235,7 @@ protected boolean addsDeleteFiles() { /** Add a data file to the new snapshot. */ protected void add(DataFile file) { Preconditions.checkNotNull(file, "Invalid data file: null"); - if (newDataFilePaths.add(file.path())) { + if (newDataFiles.add(file)) { PartitionSpec fileSpec = ops.current().spec(file.specId()); Preconditions.checkArgument( fileSpec != null, @@ -268,7 +269,7 @@ private void add(DeleteFileHolder fileHolder) { List deleteFiles = newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList()); - if (newDeleteFilePaths.add(fileHolder.deleteFile().path())) { + if (newDeleteFiles.add(fileHolder.deleteFile())) { deleteFiles.add(fileHolder); addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile()); hasNewDeleteFiles = true; diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java index 45b4bcf0a4d9..435a5e4b118e 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java @@ -28,6 +28,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,8 +74,8 @@ public RewriteDataFilesCommitManager( * @param fileGroups fileSets to commit */ public void commitFileGroups(Set fileGroups) { - Set rewrittenDataFiles = Sets.newHashSet(); - Set addedDataFiles = Sets.newHashSet(); + Set rewrittenDataFiles = Sets.newTreeSet(Comparators.contentFile()); + Set addedDataFiles = Sets.newTreeSet(Comparators.contentFile()); for (RewriteFileGroup group : fileGroups) { rewrittenDataFiles.addAll(group.rewrittenFiles()); addedDataFiles.addAll(group.addedFiles()); diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java index dd1358f2ed40..63bb8b26352e 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.actions; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -29,6 +28,8 @@ import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; /** * Container class representing a set of files to be rewritten by a RewriteAction and the new files @@ -38,7 +39,7 @@ public class RewriteFileGroup { private final FileGroupInfo info; private final List fileScanTasks; - private Set addedFiles = Collections.emptySet(); + private Set addedFiles = Sets.newTreeSet(Comparators.contentFile()); public RewriteFileGroup(FileGroupInfo info, List fileScanTasks) { this.info = info; diff --git a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java index 2be7145bcd34..8a24eafb412f 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.actions; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -30,6 +29,8 @@ import org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupRewriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; /** * Container class representing a set of position delete files to be rewritten by a {@link @@ -40,7 +41,7 @@ public class RewritePositionDeletesGroup { private final List tasks; private final long maxRewrittenDataSequenceNumber; - private Set addedDeleteFiles = Collections.emptySet(); + private Set addedDeleteFiles = Sets.newTreeSet(Comparators.contentFile()); public RewritePositionDeletesGroup(FileGroupInfo info, List tasks) { Preconditions.checkArgument(!tasks.isEmpty(), "Tasks must not be empty"); @@ -67,7 +68,9 @@ public long maxRewrittenDataSequenceNumber() { } public Set rewrittenDeleteFiles() { - return tasks().stream().map(PositionDeletesScanTask::file).collect(Collectors.toSet()); + return tasks().stream() + .map(PositionDeletesScanTask::file) + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile()))); } public Set addedDeleteFiles() { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index c73ef630ac48..6fdde47905bd 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -57,6 +57,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.orc.storage.serde2.io.DateWritable; @@ -787,7 +788,7 @@ public static List dataFiles(Table table, String branch) { } public static Set deleteFiles(Table table) { - Set deleteFiles = Sets.newHashSet(); + Set deleteFiles = Sets.newTreeSet(Comparators.contentFile()); for (FileScanTask task : table.newScan().planFiles()) { deleteFiles.addAll(task.deletes());