Skip to content

Commit

Permalink
Core, Spark: Include content offset/size in PositionDeletesTable
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Dec 16, 2024
1 parent 897207a commit ffb3824
Show file tree
Hide file tree
Showing 6 changed files with 350 additions and 79 deletions.
14 changes: 14 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,20 @@ private MetadataColumns() {}
Types.LongType.get(),
"Commit snapshot ID");

// the content size and offset of a DV
public static final NestedField CONTENT_OFFSET =
Types.NestedField.optional(
Integer.MAX_VALUE - 107,
"content_offset",
Types.LongType.get(),
"The offset in the DV where the content starts");
public static final NestedField CONTENT_SIZE_IN_BYTES =
Types.NestedField.optional(
Integer.MAX_VALUE - 108,
"content_size_in_bytes",
Types.LongType.get(),
"The length in bytes of referenced content stored in a DV");

private static final Map<String, NestedField> META_COLUMNS =
ImmutableMap.of(
FILE_PATH.name(), FILE_PATH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ private Schema calculateSchema() {
Types.StringType.get(),
MetadataColumns.FILE_PATH_COLUMN_DOC));

if (formatVersion >= 3) {
builder.add(MetadataColumns.CONTENT_OFFSET, MetadataColumns.CONTENT_SIZE_IN_BYTES);
}

List<Types.NestedField> columns = builder.build();

// Calculate used ids (for de-conflict)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1706,7 +1706,7 @@ public void testPositionDeletesManyColumns() {
PositionDeletesTable positionDeletesTable = new PositionDeletesTable(table);
int expectedIds =
formatVersion >= 3
? 6 // partition col + 5 columns
? 8 // partition col + 7 columns
: 2010; // partition col + 6 columns + 2003 ids inside the deleted row column
assertThat(TypeUtil.indexById(positionDeletesTable.schema().asStruct()).size())
.isEqualTo(expectedIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ScanTaskUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.unsafe.types.UTF8String;
Expand Down Expand Up @@ -79,6 +80,10 @@ public InternalRow next() {
rowValues.add(idToConstant.get(MetadataColumns.SPEC_ID_COLUMN_ID));
} else if (fieldId == MetadataColumns.FILE_PATH_COLUMN_ID) {
rowValues.add(idToConstant.get(MetadataColumns.FILE_PATH_COLUMN_ID));
} else if (fieldId == MetadataColumns.CONTENT_OFFSET.fieldId()) {
rowValues.add(deleteFile.contentOffset());
} else if (fieldId == MetadataColumns.CONTENT_SIZE_IN_BYTES.fieldId()) {
rowValues.add(ScanTaskUtil.contentSizeInBytes(deleteFile));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -71,6 +73,7 @@
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.SparkCatalogConfig;
Expand Down Expand Up @@ -232,6 +235,89 @@ public void testUnpartitioned() throws Exception {
assertEquals("Position deletes must match", expectedDeletes, actualDeletes);
}

@TestTemplate
public void testDVsAreRewritten() throws Exception {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
Table table = createTableUnpartitioned(2, SCALE);
List<DataFile> dataFiles = TestHelpers.dataFiles(table);
writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles);
assertThat(dataFiles).hasSize(2);

List<DeleteFile> deleteFiles = deleteFiles(table);
assertThat(deleteFiles).hasSize(2);

List<Object[]> expectedRecords = records(table);
List<Object[]> expectedDeletes = deleteRecords(table);
assertThat(expectedRecords).hasSize(2000);
assertThat(expectedDeletes).hasSize(2000);
List<Row> dvsBeforeRewrite = dvRecords(table);

Result result =
SparkActions.get(spark)
.rewritePositionDeletes(table)
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.execute();
List<DeleteFile> newDeleteFiles = deleteFiles(table);
assertThat(newDeleteFiles).as("Expected 1 delete file per data file").hasSameSizeAs(dataFiles);
assertLocallySorted(newDeleteFiles);
assertNotContains(deleteFiles, newDeleteFiles);
checkResult(result, deleteFiles, newDeleteFiles, 1);
checkSequenceNumbers(table, deleteFiles, newDeleteFiles);

List<Object[]> actualRecords = records(table);
List<Object[]> actualDeletes = deleteRecords(table);
assertEquals("Rows must match", expectedRecords, actualRecords);
assertEquals("Position deletes must match", expectedDeletes, actualDeletes);

Set<String> pathsBeforeRewrite =
deleteFiles.stream().map(ContentFile::location).collect(Collectors.toSet());
// both delete files point to a separate DV file
assertThat(pathsBeforeRewrite).hasSameSizeAs(dataFiles);

Set<String> pathsAfterRewrite =
newDeleteFiles.stream().map(ContentFile::location).collect(Collectors.toSet());
// both delete files point to the same DV file
assertThat(pathsAfterRewrite).hasSize(1);
// all new deletes point to the same DV file
assertThat(newDeleteFiles)
.allMatch(file -> file.location().equals(Iterables.getOnlyElement(pathsAfterRewrite)));

List<Row> dvsAfterRewrite = dvRecords(table);
assertThat(dvsAfterRewrite).hasSameSizeAs(dvsBeforeRewrite);

for (Row beforeRewrite : dvsBeforeRewrite) {
Optional<Row> rewritten =
dvsAfterRewrite.stream()
// filter by data file path
.filter(row -> row.getString(0).equals(beforeRewrite.getString(0)))
.findFirst();
assertThat(rewritten).isPresent();
Row rewrittenRow = rewritten.get();

Optional<DeleteFile> delete =
newDeleteFiles.stream()
// filter by data file path
.filter(f -> f.referencedDataFile().equals(rewrittenRow.getString(0)))
.findFirst();
assertThat(delete).isPresent();
DeleteFile rewrittenDelete = delete.get();

// delete_file_path has been rewritten and is different from beforeRewrite's path
assertThat(rewrittenRow.get(1))
.isNotEqualTo(beforeRewrite.get(1))
.isEqualTo(rewrittenDelete.location());

// only compare content_offset after the rewrite with the rewritten delete file as
// it can be different from beforeRewrite's content offset
assertThat(rewrittenRow.get(2)).isEqualTo(rewrittenDelete.contentOffset());

// content_size_in_bytes must be the same before and after rewriting
assertThat(rewrittenRow.get(3))
.isEqualTo(beforeRewrite.get(3))
.isEqualTo(ScanTaskUtil.contentSizeInBytes(rewrittenDelete));
}
}

@TestTemplate
public void testRewriteAll() throws Exception {
Table table = createTablePartitioned(4, 2, SCALE);
Expand Down Expand Up @@ -951,6 +1037,16 @@ private List<Object[]> deleteRecords(Table table) {
.collectAsList());
}

private List<Row> dvRecords(Table table) {
return spark
.read()
.format("iceberg")
.load(name(table) + ".position_deletes")
.select("file_path", "delete_file_path", "content_offset", "content_size_in_bytes")
.distinct()
.collectAsList();
}

private void writePosDeletesForFiles(
Table table, int deleteFilesPerPartition, int deletesPerDataFile, List<DataFile> files)
throws IOException {
Expand Down
Loading

0 comments on commit ffb3824

Please sign in to comment.