Skip to content

Commit

Permalink
Core: Add ContentFile comparator
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Sep 18, 2024
1 parent 40ffcb9 commit fbf6a7d
Show file tree
Hide file tree
Showing 9 changed files with 43 additions and 17 deletions.
16 changes: 16 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/Comparators.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.function.IntFunction;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.UnicodeUtil;
Expand Down Expand Up @@ -179,6 +180,10 @@ public static Comparator<CharSequence> filePath() {
return FilePathComparator.INSTANCE;
}

public static Comparator<ContentFile<?>> contentFile() {
return ContentFileComparator.INSTANCE;
}

private static class NullsFirst<T> implements Comparator<T> {
private static final NullsFirst<?> INSTANCE = new NullsFirst<>();

Expand Down Expand Up @@ -394,4 +399,15 @@ public int compare(CharSequence s1, CharSequence s2) {
return 0;
}
}

private static class ContentFileComparator implements Comparator<ContentFile<?>> {
private static final ContentFileComparator INSTANCE = new ContentFileComparator();

private ContentFileComparator() {}

@Override
public int compare(ContentFile<?> s1, ContentFile<?> s2) {
return CharSeqComparator.INSTANCE.compare(s1.path(), s2.path());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
implements OverwriteFiles {
private final Set<DataFile> deletedDataFiles = Sets.newHashSet();
private final Set<DataFile> deletedDataFiles = Sets.newTreeSet(Comparators.contentFile());
private boolean validateAddedFilesMatchOverwriteFilter = false;
private Long startingSnapshotId = null;
private Expression conflictDetectionFilter = null;
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

class BaseRewriteFiles extends MergingSnapshotProducer<RewriteFiles> implements RewriteFiles {
private final Set<DataFile> replacedDataFiles = Sets.newHashSet();
private final Set<DataFile> replacedDataFiles = Sets.newTreeSet(Comparators.contentFile());
private Long startingSnapshotId = null;

BaseRewriteFiles(String tableName, TableOperations ops) {
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

/**
* {@link AppendFiles Append} implementation that adds a new manifest file for the write.
Expand All @@ -44,7 +45,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final PartitionSpec spec;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final List<DataFile> newFiles = Lists.newArrayList();
private final CharSequenceSet newFilePaths = CharSequenceSet.empty();
private final Set<DataFile> newFileSet = Sets.newTreeSet(Comparators.contentFile());
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private List<ManifestFile> newManifests = null;
Expand Down Expand Up @@ -86,7 +87,7 @@ protected Map<String, String> summary() {
@Override
public FastAppend appendFile(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newFilePaths.add(file.path())) {
if (newFileSet.add(file)) {
this.hasNewFiles = true;
newFiles.add(file);
summaryBuilder.addedFile(spec, file);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
Expand Down Expand Up @@ -81,8 +82,8 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {

// update data
private final Map<PartitionSpec, List<DataFile>> newDataFilesBySpec = Maps.newHashMap();
private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty();
private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty();
private final Set<DataFile> newDataFiles = Sets.newTreeSet(Comparators.contentFile());
private final Set<DeleteFile> newDeleteFiles = Sets.newTreeSet(Comparators.contentFile());
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
Expand Down Expand Up @@ -234,7 +235,7 @@ protected boolean addsDeleteFiles() {
/** Add a data file to the new snapshot. */
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newDataFilePaths.add(file.path())) {
if (newDataFiles.add(file)) {
PartitionSpec fileSpec = ops.current().spec(file.specId());
Preconditions.checkArgument(
fileSpec != null,
Expand Down Expand Up @@ -268,7 +269,7 @@ private void add(DeleteFileHolder fileHolder) {
List<DeleteFileHolder> deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList());

if (newDeleteFilePaths.add(fileHolder.deleteFile().path())) {
if (newDeleteFiles.add(fileHolder.deleteFile())) {
deleteFiles.add(fileHolder);
addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
hasNewDeleteFiles = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -73,8 +74,8 @@ public RewriteDataFilesCommitManager(
* @param fileGroups fileSets to commit
*/
public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
Set<DataFile> rewrittenDataFiles = Sets.newHashSet();
Set<DataFile> addedDataFiles = Sets.newHashSet();
Set<DataFile> rewrittenDataFiles = Sets.newTreeSet(Comparators.contentFile());
Set<DataFile> addedDataFiles = Sets.newTreeSet(Comparators.contentFile());
for (RewriteFileGroup group : fileGroups) {
rewrittenDataFiles.addAll(group.rewrittenFiles());
addedDataFiles.addAll(group.addedFiles());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.actions;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
Expand All @@ -29,6 +28,8 @@
import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

/**
* Container class representing a set of files to be rewritten by a RewriteAction and the new files
Expand All @@ -38,7 +39,7 @@ public class RewriteFileGroup {
private final FileGroupInfo info;
private final List<FileScanTask> fileScanTasks;

private Set<DataFile> addedFiles = Collections.emptySet();
private Set<DataFile> addedFiles = Sets.newTreeSet(Comparators.contentFile());

public RewriteFileGroup(FileGroupInfo info, List<FileScanTask> fileScanTasks) {
this.info = info;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.actions;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
Expand All @@ -30,6 +29,8 @@
import org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupRewriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

/**
* Container class representing a set of position delete files to be rewritten by a {@link
Expand All @@ -40,7 +41,7 @@ public class RewritePositionDeletesGroup {
private final List<PositionDeletesScanTask> tasks;
private final long maxRewrittenDataSequenceNumber;

private Set<DeleteFile> addedDeleteFiles = Collections.emptySet();
private Set<DeleteFile> addedDeleteFiles = Sets.newTreeSet(Comparators.contentFile());

public RewritePositionDeletesGroup(FileGroupInfo info, List<PositionDeletesScanTask> tasks) {
Preconditions.checkArgument(!tasks.isEmpty(), "Tasks must not be empty");
Expand All @@ -67,7 +68,9 @@ public long maxRewrittenDataSequenceNumber() {
}

public Set<DeleteFile> rewrittenDeleteFiles() {
return tasks().stream().map(PositionDeletesScanTask::file).collect(Collectors.toSet());
return tasks().stream()
.map(PositionDeletesScanTask::file)
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile())));
}

public Set<DeleteFile> addedDeleteFiles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.orc.storage.serde2.io.DateWritable;
Expand Down Expand Up @@ -787,7 +788,7 @@ public static List<DataFile> dataFiles(Table table, String branch) {
}

public static Set<DeleteFile> deleteFiles(Table table) {
Set<DeleteFile> deleteFiles = Sets.newHashSet();
Set<DeleteFile> deleteFiles = Sets.newTreeSet(Comparators.contentFile());

for (FileScanTask task : table.newScan().planFiles()) {
deleteFiles.addAll(task.deletes());
Expand Down

0 comments on commit fbf6a7d

Please sign in to comment.