From ffb3824982ea884cbbdf5475de0bf0f4bd03fcdc Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Fri, 29 Nov 2024 08:29:02 +0100 Subject: [PATCH] Core, Spark: Include content offset/size in PositionDeletesTable --- .../org/apache/iceberg/MetadataColumns.java | 14 + .../apache/iceberg/PositionDeletesTable.java | 4 + .../iceberg/TestMetadataTableScans.java | 2 +- .../iceberg/spark/source/DVIterator.java | 5 + .../TestRewritePositionDeleteFilesAction.java | 96 ++++++ .../source/TestPositionDeletesTable.java | 308 +++++++++++++----- 6 files changed, 350 insertions(+), 79 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/MetadataColumns.java b/core/src/main/java/org/apache/iceberg/MetadataColumns.java index 060d27a018c0..5b1c72b4fdf7 100644 --- a/core/src/main/java/org/apache/iceberg/MetadataColumns.java +++ b/core/src/main/java/org/apache/iceberg/MetadataColumns.java @@ -92,6 +92,20 @@ private MetadataColumns() {} Types.LongType.get(), "Commit snapshot ID"); + // the content size and offset of a DV + public static final NestedField CONTENT_OFFSET = + Types.NestedField.optional( + Integer.MAX_VALUE - 107, + "content_offset", + Types.LongType.get(), + "The offset in the DV where the content starts"); + public static final NestedField CONTENT_SIZE_IN_BYTES = + Types.NestedField.optional( + Integer.MAX_VALUE - 108, + "content_size_in_bytes", + Types.LongType.get(), + "The length in bytes of referenced content stored in a DV"); + private static final Map META_COLUMNS = ImmutableMap.of( FILE_PATH.name(), FILE_PATH, diff --git a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java index 1fb4a8476aaf..8fc07ffab45d 100644 --- a/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java +++ b/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java @@ -145,6 +145,10 @@ private Schema calculateSchema() { Types.StringType.get(), MetadataColumns.FILE_PATH_COLUMN_DOC)); + if (formatVersion >= 3) { + builder.add(MetadataColumns.CONTENT_OFFSET, MetadataColumns.CONTENT_SIZE_IN_BYTES); + } + List columns = builder.build(); // Calculate used ids (for de-conflict) diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java index a23563fff3d5..25392a9a1efb 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java +++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java @@ -1706,7 +1706,7 @@ public void testPositionDeletesManyColumns() { PositionDeletesTable positionDeletesTable = new PositionDeletesTable(table); int expectedIds = formatVersion >= 3 - ? 6 // partition col + 5 columns + ? 8 // partition col + 7 columns : 2010; // partition col + 6 columns + 2003 ids inside the deleted row column assertThat(TypeUtil.indexById(positionDeletesTable.schema().asStruct()).size()) .isEqualTo(expectedIds); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java index 7b08b86cbfd0..74bdbafd4e34 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java @@ -30,6 +30,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ScanTaskUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.unsafe.types.UTF8String; @@ -79,6 +80,10 @@ public InternalRow next() { rowValues.add(idToConstant.get(MetadataColumns.SPEC_ID_COLUMN_ID)); } else if (fieldId == MetadataColumns.FILE_PATH_COLUMN_ID) { rowValues.add(idToConstant.get(MetadataColumns.FILE_PATH_COLUMN_ID)); + } else if (fieldId == MetadataColumns.CONTENT_OFFSET.fieldId()) { + rowValues.add(deleteFile.contentOffset()); + } else if (fieldId == MetadataColumns.CONTENT_SIZE_IN_BYTES.fieldId()) { + rowValues.add(ScanTaskUtil.contentSizeInBytes(deleteFile)); } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java index b4104a6a9bad..ea49fc85a94f 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java @@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -30,6 +31,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -71,6 +73,7 @@ import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.CatalogTestBase; import org.apache.iceberg.spark.SparkCatalogConfig; @@ -232,6 +235,89 @@ public void testUnpartitioned() throws Exception { assertEquals("Position deletes must match", expectedDeletes, actualDeletes); } + @TestTemplate + public void testDVsAreRewritten() throws Exception { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + Table table = createTableUnpartitioned(2, SCALE); + List dataFiles = TestHelpers.dataFiles(table); + writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles); + assertThat(dataFiles).hasSize(2); + + List deleteFiles = deleteFiles(table); + assertThat(deleteFiles).hasSize(2); + + List expectedRecords = records(table); + List expectedDeletes = deleteRecords(table); + assertThat(expectedRecords).hasSize(2000); + assertThat(expectedDeletes).hasSize(2000); + List dvsBeforeRewrite = dvRecords(table); + + Result result = + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + List newDeleteFiles = deleteFiles(table); + assertThat(newDeleteFiles).as("Expected 1 delete file per data file").hasSameSizeAs(dataFiles); + assertLocallySorted(newDeleteFiles); + assertNotContains(deleteFiles, newDeleteFiles); + checkResult(result, deleteFiles, newDeleteFiles, 1); + checkSequenceNumbers(table, deleteFiles, newDeleteFiles); + + List actualRecords = records(table); + List actualDeletes = deleteRecords(table); + assertEquals("Rows must match", expectedRecords, actualRecords); + assertEquals("Position deletes must match", expectedDeletes, actualDeletes); + + Set pathsBeforeRewrite = + deleteFiles.stream().map(ContentFile::location).collect(Collectors.toSet()); + // both delete files point to a separate DV file + assertThat(pathsBeforeRewrite).hasSameSizeAs(dataFiles); + + Set pathsAfterRewrite = + newDeleteFiles.stream().map(ContentFile::location).collect(Collectors.toSet()); + // both delete files point to the same DV file + assertThat(pathsAfterRewrite).hasSize(1); + // all new deletes point to the same DV file + assertThat(newDeleteFiles) + .allMatch(file -> file.location().equals(Iterables.getOnlyElement(pathsAfterRewrite))); + + List dvsAfterRewrite = dvRecords(table); + assertThat(dvsAfterRewrite).hasSameSizeAs(dvsBeforeRewrite); + + for (Row beforeRewrite : dvsBeforeRewrite) { + Optional rewritten = + dvsAfterRewrite.stream() + // filter by data file path + .filter(row -> row.getString(0).equals(beforeRewrite.getString(0))) + .findFirst(); + assertThat(rewritten).isPresent(); + Row rewrittenRow = rewritten.get(); + + Optional delete = + newDeleteFiles.stream() + // filter by data file path + .filter(f -> f.referencedDataFile().equals(rewrittenRow.getString(0))) + .findFirst(); + assertThat(delete).isPresent(); + DeleteFile rewrittenDelete = delete.get(); + + // delete_file_path has been rewritten and is different from beforeRewrite's path + assertThat(rewrittenRow.get(1)) + .isNotEqualTo(beforeRewrite.get(1)) + .isEqualTo(rewrittenDelete.location()); + + // only compare content_offset after the rewrite with the rewritten delete file as + // it can be different from beforeRewrite's content offset + assertThat(rewrittenRow.get(2)).isEqualTo(rewrittenDelete.contentOffset()); + + // content_size_in_bytes must be the same before and after rewriting + assertThat(rewrittenRow.get(3)) + .isEqualTo(beforeRewrite.get(3)) + .isEqualTo(ScanTaskUtil.contentSizeInBytes(rewrittenDelete)); + } + } + @TestTemplate public void testRewriteAll() throws Exception { Table table = createTablePartitioned(4, 2, SCALE); @@ -951,6 +1037,16 @@ private List deleteRecords(Table table) { .collectAsList()); } + private List dvRecords(Table table) { + return spark + .read() + .format("iceberg") + .load(name(table) + ".position_deletes") + .select("file_path", "delete_file_path", "content_offset", "content_size_in_bytes") + .distinct() + .collectAsList(); + } + private void writePosDeletesForFiles( Table table, int deleteFilesPerPartition, int deletesPerDataFile, List files) throws IOException { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java index 357cd0cdaa06..71ffa9ccf283 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -75,6 +76,7 @@ import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.DeleteFileSet; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.ScanTaskUtil; import org.apache.iceberg.util.StructLikeSet; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; @@ -98,30 +100,48 @@ public class TestPositionDeletesTable extends CatalogTestBase { "cache-enabled", "false"); private static final List NON_PATH_COLS = ImmutableList.of("file_path", "pos", "row", "partition", "spec_id"); + private static final List NON_PATH_V3_COLS = + ImmutableList.of( + "file_path", "pos", "partition", "spec_id", "content_offset", "content_size_in_bytes"); @Parameter(index = 3) private FileFormat format; - @Parameters(name = "catalogName = {1}, implementation = {2}, config = {3}, fileFormat = {4}") + @Parameter(index = 4) + private int formatVersion; + + @Parameters( + name = + "catalogName = {1}, implementation = {2}, config = {3}, fileFormat = {4}, formatVersion = {5}") public static Object[][] parameters() { return new Object[][] { { SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, - FileFormat.PARQUET + FileFormat.PARQUET, + 2 }, { SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, - FileFormat.AVRO + FileFormat.AVRO, + 2 }, { SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, - FileFormat.ORC + FileFormat.ORC, + 2 + }, + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + CATALOG_PROPS, + FileFormat.PARQUET, + 3 }, }; } @@ -142,7 +162,8 @@ public void testNullRows() throws IOException { tab, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); tab.newRowDelta().addDeletes(posDeletes.first()).commit(); StructLikeSet actual = actual(tableName, tab); @@ -150,7 +171,7 @@ public void testNullRows() throws IOException { List> expectedDeletes = Lists.newArrayList( positionDelete(dFile.location(), 0L), positionDelete(dFile.location(), 1L)); - StructLikeSet expected = expected(tab, expectedDeletes, null, posDeletes.first().location()); + StructLikeSet expected = expected(tab, expectedDeletes, null, posDeletes.first()); assertThat(actual).as("Position Delete table should contain expected rows").isEqualTo(expected); dropTable(tableName); @@ -158,6 +179,7 @@ public void testNullRows() throws IOException { @TestTemplate public void testPartitionedTable() throws IOException { + assumeThat(formatVersion).as("DVs don't have row info in PositionDeletesTable").isEqualTo(2); // Create table with two partitions String tableName = "partitioned_table"; PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); @@ -178,8 +200,7 @@ public void testPartitionedTable() throws IOException { StructLikeSet actual = actual(tableName, tab, "row.data='b'"); GenericRecord partitionB = GenericRecord.create(tab.spec().partitionType()); partitionB.setField("data", "b"); - StructLikeSet expected = - expected(tab, deletesB.first(), partitionB, deletesB.second().location()); + StructLikeSet expected = expected(tab, deletesB.first(), partitionB, deletesB.second()); assertThat(actual).as("Position Delete table should contain expected rows").isEqualTo(expected); dropTable(tableName); @@ -187,6 +208,7 @@ public void testPartitionedTable() throws IOException { @TestTemplate public void testSelect() throws IOException { + assumeThat(formatVersion).as("DVs don't have row info in PositionDeletesTable").isEqualTo(2); // Create table with two partitions String tableName = "select"; PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); @@ -249,6 +271,73 @@ public void testSelect() throws IOException { dropTable(tableName); } + @TestTemplate + public void testSelectWithDVs() throws IOException { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + String tableName = "select_with_dvs"; + Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned()); + + DataFile dataFileA = dataFile(tab); + DataFile dataFileB = dataFile(tab); + + tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit(); + + Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB); + + tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); + + // Select certain columns + Dataset df = + spark + .read() + .format("iceberg") + .load("default." + tableName + ".position_deletes") + .select( + "pos", "file_path", "delete_file_path", "content_offset", "content_size_in_bytes"); + List actual = rowsToJava(df.collectAsList()); + + // Select cols from expected delete values + List expected = Lists.newArrayList(); + BiFunction, DeleteFile, Object[]> toRow = + (delete, file) -> + row( + delete.get(1, Long.class), + file.referencedDataFile(), + file.location(), + file.contentOffset(), + ScanTaskUtil.contentSizeInBytes(file)); + + expected.addAll( + deletesA.first().stream() + .map(d -> toRow.apply(d, deletesA.second())) + .collect(Collectors.toList())); + expected.addAll( + deletesB.first().stream() + .map(d -> toRow.apply(d, deletesB.second())) + .collect(Collectors.toList())); + + // Sort by pos and file_path + Comparator comp = + (o1, o2) -> { + int result = Long.compare((long) o1[0], (long) o2[0]); + if (result != 0) { + return result; + } else { + return ((String) o1[1]).compareTo((String) o2[1]); + } + }; + + actual.sort(comp); + expected.sort(comp); + + assertThat(actual) + .as("Position Delete table should contain expected rows") + .usingRecursiveComparison() + .isEqualTo(expected); + dropTable(tableName); + } + @TestTemplate public void testSplitTasks() throws IOException { String tableName = "big_table"; @@ -278,7 +367,8 @@ public void testSplitTasks() throws IOException { tab, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); tab.newRowDelta().addDeletes(posDeletes).commit(); Table deleteTable = @@ -295,7 +385,7 @@ public void testSplitTasks() throws IOException { } StructLikeSet actual = actual(tableName, tab); - StructLikeSet expected = expected(tab, deletes, null, posDeletes.location()); + StructLikeSet expected = expected(tab, deletes, null, posDeletes); assertThat(actual).as("Position Delete table should contain expected rows").isEqualTo(expected); dropTable(tableName); @@ -324,10 +414,8 @@ public void testPartitionFilter() throws IOException { GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType()); Record partitionA = partitionRecordTemplate.copy("data", "a"); Record partitionB = partitionRecordTemplate.copy("data", "b"); - StructLikeSet expectedA = - expected(tab, deletesA.first(), partitionA, deletesA.second().location()); - StructLikeSet expectedB = - expected(tab, deletesB.first(), partitionB, deletesB.second().location()); + StructLikeSet expectedA = expected(tab, deletesA.first(), partitionA, deletesA.second()); + StructLikeSet expectedB = expected(tab, deletesB.first(), partitionB, deletesB.second()); StructLikeSet allExpected = StructLikeSet.create(deletesTab.schema().asStruct()); allExpected.addAll(expectedA); allExpected.addAll(expectedB); @@ -371,10 +459,8 @@ public void testPartitionTransformFilter() throws IOException { GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType()); Record partitionA = partitionRecordTemplate.copy("data_trunc", "a"); Record partitionB = partitionRecordTemplate.copy("data_trunc", "b"); - StructLikeSet expectedA = - expected(tab, deletesA.first(), partitionA, deletesA.second().location()); - StructLikeSet expectedB = - expected(tab, deletesB.first(), partitionB, deletesB.second().location()); + StructLikeSet expectedA = expected(tab, deletesA.first(), partitionA, deletesA.second()); + StructLikeSet expectedB = expected(tab, deletesB.first(), partitionB, deletesB.second()); StructLikeSet allExpected = StructLikeSet.create(deletesTable.schema().asStruct()); allExpected.addAll(expectedA); allExpected.addAll(expectedB); @@ -426,7 +512,7 @@ public void testPartitionEvolutionReplace() throws Exception { GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab)); Record partitionA = partitionRecordTemplate.copy("data", "a"); StructLikeSet expectedA = - expected(tab, deletesA.first(), partitionA, dataSpec, deletesA.second().location()); + expected(tab, deletesA.first(), partitionA, dataSpec, deletesA.second()); StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0"); assertThat(actualA) .as("Position Delete table should contain expected rows") @@ -435,12 +521,7 @@ public void testPartitionEvolutionReplace() throws Exception { // Query partition of new spec Record partition10 = partitionRecordTemplate.copy("id", 10); StructLikeSet expected10 = - expected( - tab, - deletes10.first(), - partition10, - tab.spec().specId(), - deletes10.second().location()); + expected(tab, deletes10.first(), partition10, tab.spec().specId(), deletes10.second()); StructLikeSet actual10 = actual(tableName, tab, "partition.id = 10 AND pos >= 0"); assertThat(actual10) @@ -480,7 +561,7 @@ public void testPartitionEvolutionAdd() throws Exception { GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab)); Record partitionA = partitionRecordTemplate.copy("data", "a"); StructLikeSet expectedA = - expected(tab, deletesA.first(), partitionA, specId1, deletesA.second().location()); + expected(tab, deletesA.first(), partitionA, specId1, deletesA.second()); StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0"); assertThat(actualA) .as("Position Delete table should contain expected rows") @@ -494,7 +575,7 @@ public void testPartitionEvolutionAdd() throws Exception { deletesUnpartitioned.first(), unpartitionedRecord, specId0, - deletesUnpartitioned.second().location()); + deletesUnpartitioned.second()); StructLikeSet actualUnpartitioned = actual(tableName, tab, "partition.data IS NULL and pos >= 0"); @@ -536,7 +617,7 @@ public void testPartitionEvolutionRemove() throws Exception { GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab)); Record partitionA = partitionRecordTemplate.copy("data", "a"); StructLikeSet expectedA = - expected(tab, deletesA.first(), partitionA, specId0, deletesA.second().location()); + expected(tab, deletesA.first(), partitionA, specId0, deletesA.second()); StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0"); assertThat(actualA) .as("Position Delete table should contain expected rows") @@ -550,7 +631,7 @@ public void testPartitionEvolutionRemove() throws Exception { deletesUnpartitioned.first(), unpartitionedRecord, specId1, - deletesUnpartitioned.second().location()); + deletesUnpartitioned.second()); StructLikeSet actualUnpartitioned = actual(tableName, tab, "partition.data IS NULL and pos >= 0"); @@ -594,7 +675,7 @@ public void testSpecIdFilter() throws Exception { deletesUnpartitioned.first(), partitionRecordTemplate, unpartitionedSpec, - deletesUnpartitioned.second().location()); + deletesUnpartitioned.second()); StructLikeSet actualUnpartitioned = actual(tableName, tab, String.format("spec_id = %d", unpartitionedSpec)); assertThat(actualUnpartitioned) @@ -605,9 +686,8 @@ public void testSpecIdFilter() throws Exception { StructLike partitionA = partitionRecordTemplate.copy("data", "a"); StructLike partitionB = partitionRecordTemplate.copy("data", "b"); StructLikeSet expected = - expected(tab, deletesA.first(), partitionA, dataSpec, deletesA.second().location()); - expected.addAll( - expected(tab, deletesB.first(), partitionB, dataSpec, deletesB.second().location())); + expected(tab, deletesA.first(), partitionA, dataSpec, deletesA.second()); + expected.addAll(expected(tab, deletesB.first(), partitionB, dataSpec, deletesB.second())); StructLikeSet actual = actual(tableName, tab, String.format("spec_id = %d", dataSpec)); assertThat(actual).as("Position Delete table should contain expected rows").isEqualTo(expected); @@ -660,8 +740,7 @@ public void testSchemaEvolutionAdd() throws Exception { padded.set(3, null); d.set(2, padded); }); - StructLikeSet expectedA = - expected(tab, expectedDeletesA, partitionA, deletesA.second().location()); + StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, deletesA.second()); StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0"); assertThat(actualA) .as("Position Delete table should contain expected rows") @@ -669,8 +748,7 @@ public void testSchemaEvolutionAdd() throws Exception { // Select deletes from new schema Record partitionC = partitionRecordTemplate.copy("data", "c"); - StructLikeSet expectedC = - expected(tab, deletesC.first(), partitionC, deletesC.second().location()); + StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, deletesC.second()); StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c' and pos >= 0"); assertThat(actualC) @@ -726,8 +804,7 @@ public void testSchemaEvolutionRemove() throws Exception { padded.set(1, nested.get(1)); d.set(2, padded); }); - StructLikeSet expectedA = - expected(tab, expectedDeletesA, partitionA, deletesA.second().location()); + StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, deletesA.second()); StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0"); assertThat(actualA) .as("Position Delete table should contain expected rows") @@ -735,8 +812,7 @@ public void testSchemaEvolutionRemove() throws Exception { // Select deletes from new schema Record partitionC = partitionRecordTemplate.copy("data", "c"); - StructLikeSet expectedC = - expected(tab, deletesC.first(), partitionC, deletesC.second().location()); + StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, deletesC.second()); StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c' and pos >= 0"); assertThat(actualC) @@ -791,8 +867,8 @@ public void testWrite() throws IOException, NoSuchTableException { GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType()); Record partitionA = partitionRecordTemplate.copy("data", "a"); Record partitionB = partitionRecordTemplate.copy("data", "b"); - StructLikeSet expectedA = expected(tab, deletesA.first(), partitionA, null); - StructLikeSet expectedB = expected(tab, deletesB.first(), partitionB, null); + StructLikeSet expectedA = expected(tab, deletesA.first(), partitionA, deletesA.second(), false); + StructLikeSet expectedB = expected(tab, deletesB.first(), partitionB, deletesB.second(), false); StructLikeSet allExpected = StructLikeSet.create( TypeUtil.selectNot( @@ -802,7 +878,8 @@ public void testWrite() throws IOException, NoSuchTableException { allExpected.addAll(expectedB); // Compare values without 'delete_file_path' as these have been rewritten - StructLikeSet actual = actual(tableName, tab, null, NON_PATH_COLS); + StructLikeSet actual = + actual(tableName, tab, null, formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); assertThat(actual) .as("Position Delete table should contain expected rows") .isEqualTo(allExpected); @@ -825,7 +902,8 @@ public void testWriteUnpartitionedNullRows() throws Exception { tab, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); tab.newRowDelta().addDeletes(posDeletes.first()).commit(); Table posDeletesTable = @@ -851,14 +929,19 @@ public void testWriteUnpartitionedNullRows() throws Exception { commit(tab, posDeletesTable, fileSetID, 1); } + List columns = + formatVersion >= 3 + ? ImmutableList.of( + "file_path", "pos", "spec_id", "content_offset", "content_size_in_bytes") + : ImmutableList.of("file_path", "pos", "row", "spec_id"); + // Compare values without 'delete_file_path' as these have been rewritten - StructLikeSet actual = - actual(tableName, tab, null, ImmutableList.of("file_path", "pos", "row", "spec_id")); + StructLikeSet actual = actual(tableName, tab, null, columns); List> expectedDeletes = Lists.newArrayList( positionDelete(dFile.location(), 0L), positionDelete(dFile.location(), 1L)); - StructLikeSet expected = expected(tab, expectedDeletes, null, null); + StructLikeSet expected = expected(tab, expectedDeletes, null, posDeletes.first(), false); assertThat(actual).as("Position Delete table should contain expected rows").isEqualTo(expected); dropTable(tableName); @@ -883,7 +966,8 @@ public void testWriteMixedRows() throws Exception { tab, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of("a"), - deletes); + deletes, + formatVersion); Pair>, DeleteFile> deletesWithRow = deleteFile(tab, dataFileB, "b"); @@ -919,11 +1003,7 @@ public void testWriteMixedRows() throws Exception { // Compare values without 'delete_file_path' as these have been rewritten StructLikeSet actual = - actual( - tableName, - tab, - null, - ImmutableList.of("file_path", "pos", "row", "partition", "spec_id")); + actual(tableName, tab, null, formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); // Prepare expected values GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType()); @@ -940,8 +1020,10 @@ public void testWriteMixedRows() throws Exception { Lists.newArrayList( positionDelete(dataFileA.location(), 0L), positionDelete(dataFileA.location(), 1L)), partitionA, - null)); - allExpected.addAll(expected(tab, deletesWithRow.first(), partitionB, null)); + deletesWithoutRow.first(), + false)); + allExpected.addAll( + expected(tab, deletesWithRow.first(), partitionB, deletesWithRow.second(), false)); assertThat(actual) .as("Position Delete table should contain expected rows") @@ -1007,9 +1089,19 @@ public void testWritePartitionEvolutionAdd() throws Exception { GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab)); Record unpartitionedRecord = partitionRecordTemplate.copy("data", null); StructLikeSet expectedUnpartitioned = - expected(tab, deletesUnpartitioned.first(), unpartitionedRecord, specId0, null); + expected( + tab, + deletesUnpartitioned.first(), + unpartitionedRecord, + specId0, + deletesUnpartitioned.second(), + false); StructLikeSet actualUnpartitioned = - actual(tableName, tab, "partition.data IS NULL", NON_PATH_COLS); + actual( + tableName, + tab, + "partition.data IS NULL", + formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); assertThat(actualUnpartitioned) .as("Position Delete table should contain expected rows") .isEqualTo(expectedUnpartitioned); @@ -1046,10 +1138,16 @@ public void testWritePartitionEvolutionAdd() throws Exception { TypeUtil.selectNot( posDeletesTable.schema(), ImmutableSet.of(MetadataColumns.FILE_PATH_COLUMN_ID)) .asStruct()); - expectedAll.addAll(expected(tab, deletesA.first(), partitionA, specId1, null)); - expectedAll.addAll(expected(tab, deletesB.first(), partitionB, specId1, null)); + expectedAll.addAll( + expected(tab, deletesA.first(), partitionA, specId1, deletesA.second(), false)); + expectedAll.addAll( + expected(tab, deletesB.first(), partitionB, specId1, deletesB.second(), false)); StructLikeSet actualAll = - actual(tableName, tab, "partition.data = 'a' OR partition.data = 'b'", NON_PATH_COLS); + actual( + tableName, + tab, + "partition.data = 'a' OR partition.data = 'b'", + formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); assertThat(actualAll) .as("Position Delete table should contain expected rows") .isEqualTo(expectedAll); @@ -1181,8 +1279,13 @@ public void testWriteSchemaEvolutionAdd() throws Exception { padded.set(3, null); d.set(2, padded); }); - StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, null); - StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a'", NON_PATH_COLS); + StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, deletesA.second(), false); + StructLikeSet actualA = + actual( + tableName, + tab, + "partition.data = 'a'", + formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); assertThat(actualA) .as("Position Delete table should contain expected rows") .isEqualTo(expectedA); @@ -1211,8 +1314,13 @@ public void testWriteSchemaEvolutionAdd() throws Exception { // Select deletes from new schema Record partitionC = partitionRecordTemplate.copy("data", "c"); - StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, null); - StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c'", NON_PATH_COLS); + StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, deletesC.second(), false); + StructLikeSet actualC = + actual( + tableName, + tab, + "partition.data = 'c'", + formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); assertThat(actualC) .as("Position Delete table should contain expected rows") @@ -1294,16 +1402,26 @@ public void testWriteSchemaEvolutionRemove() throws Exception { padded.set(1, nested.get(1)); d.set(2, padded); }); - StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, null); - StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a'", NON_PATH_COLS); + StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, deletesA.second(), false); + StructLikeSet actualA = + actual( + tableName, + tab, + "partition.data = 'a'", + formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); assertThat(actualA) .as("Position Delete table should contain expected rows") .isEqualTo(expectedA); // Select deletes from new schema Record partitionC = partitionRecordTemplate.copy("data", "c"); - StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, null); - StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c'", NON_PATH_COLS); + StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, deletesC.second(), false); + StructLikeSet actualC = + actual( + tableName, + tab, + "partition.data = 'c'", + formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); assertThat(actualC) .as("Position Delete table should contain expected rows") @@ -1364,6 +1482,7 @@ private StructLikeSet actual(String tableName, Table table, String filter, List< .filter(f -> cols.contains(f.name())) .collect(Collectors.toList())); } + Types.StructType finalProjection = projection; StructLikeSet set = StructLikeSet.create(projection); df.collectAsList() @@ -1380,7 +1499,7 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { Map properties = ImmutableMap.of( TableProperties.FORMAT_VERSION, - "2", + String.valueOf(formatVersion), TableProperties.DEFAULT_FILE_FORMAT, format.toString()); return validationCatalog.createTable( @@ -1413,13 +1532,23 @@ private StructLikeSet expected( List> deletes, StructLike partitionStruct, int specId, - String deleteFilePath) { + DeleteFile deleteFile) { + return expected(testTable, deletes, partitionStruct, specId, deleteFile, true); + } + + private StructLikeSet expected( + Table testTable, + List> deletes, + StructLike partitionStruct, + int specId, + DeleteFile deleteFile, + boolean includeDeleteFilePath) { Table deletesTable = MetadataTableUtils.createMetadataTableInstance( testTable, MetadataTableType.POSITION_DELETES); Types.StructType posDeleteSchema = deletesTable.schema().asStruct(); // Do not compare file paths - if (deleteFilePath == null) { + if (!includeDeleteFilePath) { posDeleteSchema = TypeUtil.selectNot( deletesTable.schema(), ImmutableSet.of(MetadataColumns.FILE_PATH_COLUMN_ID)) @@ -1433,13 +1562,20 @@ private StructLikeSet expected( GenericRecord record = GenericRecord.create(finalSchema); record.setField("file_path", p.path()); record.setField("pos", p.pos()); - record.setField("row", p.row()); + if (formatVersion == 2) { + record.setField("row", p.row()); + } if (partitionStruct != null) { record.setField("partition", partitionStruct); } record.setField("spec_id", specId); - if (deleteFilePath != null) { - record.setField("delete_file_path", deleteFilePath); + if (includeDeleteFilePath) { + record.setField("delete_file_path", deleteFile.location()); + } + if (formatVersion >= 3) { + record.setField("content_offset", deleteFile.contentOffset()); + record.setField( + "content_size_in_bytes", ScanTaskUtil.contentSizeInBytes(deleteFile)); } return record; }) @@ -1451,8 +1587,23 @@ private StructLikeSet expected( Table testTable, List> deletes, StructLike partitionStruct, - String deleteFilePath) { - return expected(testTable, deletes, partitionStruct, testTable.spec().specId(), deleteFilePath); + DeleteFile deleteFile) { + return expected(testTable, deletes, partitionStruct, testTable.spec().specId(), deleteFile); + } + + private StructLikeSet expected( + Table testTable, + List> deletes, + StructLike partitionStruct, + DeleteFile deleteFile, + boolean includeDeleteFilePath) { + return expected( + testTable, + deletes, + partitionStruct, + testTable.spec().specId(), + deleteFile, + includeDeleteFilePath); } private DataFile dataFile(Table tab, Object... partValues) throws IOException { @@ -1559,7 +1710,8 @@ private Pair>, DeleteFile> deleteFile( tab, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), partitionInfo, - deletes); + deletes, + formatVersion); return Pair.of(deletes, deleteFile); }