Skip to content

Commit

Permalink
Core, Spark: Include content offset/size in PositionDeletesTable
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Dec 2, 2024
1 parent f4dbc39 commit a52f450
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 99 deletions.
14 changes: 14 additions & 0 deletions core/src/main/java/org/apache/iceberg/MetadataColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,20 @@ private MetadataColumns() {}
Types.LongType.get(),
"Commit snapshot ID");

// the content size and offset of a DV
public static final NestedField CONTENT_OFFSET =
Types.NestedField.optional(
Integer.MAX_VALUE - 107,
"content_offset",
Types.LongType.get(),
"The offset in the DV where the content starts");
public static final NestedField CONTENT_SIZE_IN_BYTES =
Types.NestedField.optional(
Integer.MAX_VALUE - 108,
"content_size_in_bytes",
Types.LongType.get(),
"The length in bytes of referenced content stored in a DV");

private static final Map<String, NestedField> META_COLUMNS =
ImmutableMap.of(
FILE_PATH.name(), FILE_PATH,
Expand Down
37 changes: 26 additions & 11 deletions core/src/main/java/org/apache/iceberg/PositionDeletesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,32 +110,47 @@ public Map<String, String> properties() {
}

private Schema calculateSchema() {
int formatVersion = TableUtil.formatVersion(table());
Types.StructType partitionType = Partitioning.partitionType(table());
List<Types.NestedField> columns =
ImmutableList.of(
MetadataColumns.DELETE_FILE_PATH,
MetadataColumns.DELETE_FILE_POS,
Types.NestedField.optional(
MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
MetadataColumns.DELETE_FILE_ROW_FIELD_NAME,
table().schema().asStruct(),
MetadataColumns.DELETE_FILE_ROW_DOC),
ImmutableList.Builder<Types.NestedField> builder =
ImmutableList.<Types.NestedField>builder()
.add(MetadataColumns.DELETE_FILE_PATH)
.add(MetadataColumns.DELETE_FILE_POS);
if (formatVersion == 2) {
builder.add(
Types.NestedField.optional(
MetadataColumns.DELETE_FILE_ROW_FIELD_ID,
MetadataColumns.DELETE_FILE_ROW_FIELD_NAME,
table().schema().asStruct(),
MetadataColumns.DELETE_FILE_ROW_DOC));
}

builder
.add(
Types.NestedField.required(
MetadataColumns.PARTITION_COLUMN_ID,
PARTITION,
partitionType,
"Partition that position delete row belongs to"),
"Partition that position delete row belongs to"))
.add(
Types.NestedField.required(
MetadataColumns.SPEC_ID_COLUMN_ID,
SPEC_ID,
Types.IntegerType.get(),
MetadataColumns.SPEC_ID_COLUMN_DOC),
MetadataColumns.SPEC_ID_COLUMN_DOC))
.add(
Types.NestedField.required(
MetadataColumns.FILE_PATH_COLUMN_ID,
DELETE_FILE_PATH,
Types.StringType.get(),
MetadataColumns.FILE_PATH_COLUMN_DOC));

if (formatVersion >= 3) {
builder.add(MetadataColumns.CONTENT_OFFSET, MetadataColumns.CONTENT_SIZE_IN_BYTES);
}

List<Types.NestedField> columns = builder.build();

// Calculate used ids (for de-conflict)
Set<Integer> currentlyUsedIds =
Collections.unmodifiableSet(TypeUtil.indexById(Types.StructType.of(columns)).keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,12 @@ public void testPositionDeletesManyColumns() {
table.newRowDelta().addDeletes(delete1).addDeletes(delete2).commit();

PositionDeletesTable positionDeletesTable = new PositionDeletesTable(table);
assertThat(TypeUtil.indexById(positionDeletesTable.schema().asStruct()).size()).isEqualTo(2010);
int expectedIds =
formatVersion >= 3
? 8 // partition col + 7 columns
: 2010; // partition col + 6 columns + 2003 ids inside the deleted row column
assertThat(TypeUtil.indexById(positionDeletesTable.schema().asStruct()).size())
.isEqualTo(expectedIds);

BatchScan scan = positionDeletesTable.newBatchScan();
assertThat(scan).isInstanceOf(PositionDeletesTable.PositionDeletesBatchScan.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.ScanTaskUtil;
import org.apache.iceberg.util.StructLikeUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
Expand Down Expand Up @@ -121,11 +122,6 @@ public InternalRow next() {
deletedPositionIndex = rowValues.size() - 1;
}

if (null != projection.findField(MetadataColumns.DELETE_FILE_ROW_FIELD_ID)) {
// there's no info about deleted rows with DVs, so always return null
rowValues.add(null);
}

Types.NestedField partition = projection.findField(MetadataColumns.PARTITION_COLUMN_ID);
if (null != partition) {
Types.StructType type = partition.type().asStructType();
Expand All @@ -143,6 +139,14 @@ public InternalRow next() {
rowValues.add(UTF8String.fromString(deleteFile.location()));
}

if (null != projection.findField(MetadataColumns.CONTENT_OFFSET.fieldId())) {
rowValues.add(deleteFile.contentOffset());
}

if (null != projection.findField(MetadataColumns.CONTENT_SIZE_IN_BYTES.fieldId())) {
rowValues.add(ScanTaskUtil.contentSizeInBytes(deleteFile));
}

this.row = new GenericInternalRow(rowValues.toArray());
} else if (null != deletedPositionIndex) {
// only update the deleted position if necessary, everything else stays the same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
.suffix("deletes")
.build();

if (TableUtil.formatVersion(underlyingTable(table)) >= 3) {
return new DVWriter(table, deleteFileFactory, dsSchema, specId, partition);
}

Schema positionDeleteRowSchema = positionDeleteRowSchema();
StructType deleteSparkType = deleteSparkType();
StructType deleteSparkTypeWithoutRow = deleteSparkTypeWithoutRow();
Expand All @@ -246,10 +250,6 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
.writeProperties(writeProperties)
.build();

if (TableUtil.formatVersion(underlyingTable(table)) >= 3) {
return new DVWriter(table, deleteFileFactory, dsSchema, specId, partition);
}

return new DeleteWriter(
table,
writerFactoryWithRow,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import static org.apache.spark.sql.functions.expr;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -70,6 +72,7 @@
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.SparkCatalogConfig;
Expand Down Expand Up @@ -230,6 +233,89 @@ public void testUnpartitioned() throws Exception {
assertEquals("Position deletes must match", expectedDeletes, actualDeletes);
}

@TestTemplate
public void testDVsAreRewritten() throws Exception {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
Table table = createTableUnpartitioned(2, SCALE);
List<DataFile> dataFiles = TestHelpers.dataFiles(table);
writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles);
assertThat(dataFiles).hasSize(2);

List<DeleteFile> deleteFiles = deleteFiles(table);
assertThat(deleteFiles).hasSize(2);

List<Object[]> expectedRecords = records(table);
List<Object[]> expectedDeletes = deleteRecords(table);
assertThat(expectedRecords).hasSize(2000);
assertThat(expectedDeletes).hasSize(2000);
List<Row> dvsBeforeRewrite = dvRecords(table);

Result result =
SparkActions.get(spark)
.rewritePositionDeletes(table)
.option(SizeBasedFileRewriter.REWRITE_ALL, "true")
.execute();
List<DeleteFile> newDeleteFiles = deleteFiles(table);
assertThat(newDeleteFiles).as("Expected 1 delete file per data file").hasSameSizeAs(dataFiles);
assertLocallySorted(newDeleteFiles);
assertNotContains(deleteFiles, newDeleteFiles);
checkResult(result, deleteFiles, newDeleteFiles, 1);
checkSequenceNumbers(table, deleteFiles, newDeleteFiles);

List<Object[]> actualRecords = records(table);
List<Object[]> actualDeletes = deleteRecords(table);
assertEquals("Rows must match", expectedRecords, actualRecords);
assertEquals("Position deletes must match", expectedDeletes, actualDeletes);

Set<String> pathsBeforeRewrite =
deleteFiles.stream().map(ContentFile::location).collect(Collectors.toSet());
// both delete files point to a separate DV file
assertThat(pathsBeforeRewrite).hasSameSizeAs(dataFiles);

Set<String> pathsAfterRewrite =
newDeleteFiles.stream().map(ContentFile::location).collect(Collectors.toSet());
// both delete files point to the same DV file
assertThat(pathsAfterRewrite).hasSize(1);
// all new deletes point to the same DV file
assertThat(newDeleteFiles)
.allMatch(file -> file.location().equals(Iterables.getOnlyElement(pathsAfterRewrite)));

List<Row> dvsAfterRewrite = dvRecords(table);
assertThat(dvsAfterRewrite).hasSameSizeAs(dvsBeforeRewrite);

for (Row beforeRewrite : dvsBeforeRewrite) {
Optional<Row> rewritten =
dvsAfterRewrite.stream()
// filter by data file path
.filter(row -> row.getString(0).equals(beforeRewrite.getString(0)))
.findFirst();
assertThat(rewritten).isPresent();
Row rewrittenRow = rewritten.get();

Optional<DeleteFile> delete =
newDeleteFiles.stream()
// filter by data file path
.filter(f -> f.referencedDataFile().equals(rewrittenRow.getString(0)))
.findFirst();
assertThat(delete).isPresent();
DeleteFile rewrittenDelete = delete.get();

// delete_file_path has been rewritten and is different from beforeRewrite's path
assertThat(rewrittenRow.get(1))
.isNotEqualTo(beforeRewrite.get(1))
.isEqualTo(rewrittenDelete.location());

// only compare content_offset after the rewrite with the rewritten delete file as
// it can be different from beforeRewrite's content offset
assertThat(rewrittenRow.get(2)).isEqualTo(rewrittenDelete.contentOffset());

// content_size_in_bytes must be the same before and after rewriting
assertThat(rewrittenRow.get(3))
.isEqualTo(beforeRewrite.get(3))
.isEqualTo(ScanTaskUtil.contentSizeInBytes(rewrittenDelete));
}
}

@TestTemplate
public void testRewriteAll() throws Exception {
Table table = createTablePartitioned(4, 2, SCALE);
Expand Down Expand Up @@ -935,6 +1021,16 @@ private List<Object[]> deleteRecords(Table table) {
.collectAsList());
}

private List<Row> dvRecords(Table table) {
return spark
.read()
.format("iceberg")
.load(name(table) + ".position_deletes")
.select("file_path", "delete_file_path", "content_offset", "content_size_in_bytes")
.distinct()
.collectAsList();
}

private void writePosDeletesForFiles(
Table table, int deleteFilesPerPartition, int deletesPerDataFile, List<DataFile> files)
throws IOException {
Expand Down
Loading

0 comments on commit a52f450

Please sign in to comment.