diff --git a/core/src/main/java/org/apache/iceberg/SerializableTable.java b/core/src/main/java/org/apache/iceberg/SerializableTable.java index b7e2f6da09a4..dcf87deb9d60 100644 --- a/core/src/main/java/org/apache/iceberg/SerializableTable.java +++ b/core/src/main/java/org/apache/iceberg/SerializableTable.java @@ -161,6 +161,10 @@ public Map properties() { return properties; } + public Table underlyingTable() { + return lazyTable(); + } + public int formatVersion() { if (formatVersion == UNKNOWN_FORMAT_VERSION) { throw new UnsupportedOperationException( diff --git a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java index d1c688417a64..108b6d049eff 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java @@ -30,6 +30,7 @@ import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.util.DeleteFileSet; +import org.apache.iceberg.util.ScanTaskUtil; /** * Container class representing a set of position delete files to be rewritten by a {@link @@ -109,7 +110,7 @@ public long rewrittenBytes() { } public long addedBytes() { - return addedDeleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum(); + return addedDeleteFiles.stream().mapToLong(ScanTaskUtil::contentSizeInBytes).sum(); } public int numRewrittenDeleteFiles() { diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFiles.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFiles.java index f3be0a870972..a70aef9e89e7 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFiles.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFiles.java @@ -43,6 +43,7 @@ import org.apache.iceberg.Files; import org.apache.iceberg.MetadataTableType; import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.Parameter; import org.apache.iceberg.Parameters; import org.apache.iceberg.PositionDeletesScanTask; import org.apache.iceberg.RowDelta; @@ -54,6 +55,8 @@ import org.apache.iceberg.actions.SizeBasedFileRewriter; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.DVFileWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedFiles; @@ -62,12 +65,14 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppenderFactory; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkCatalogConfig; import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.ScanTaskUtil; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.types.StructType; @@ -88,13 +93,23 @@ public class TestRewritePositionDeleteFiles extends ExtensionsTestBase { private static final int DELETE_FILES_PER_PARTITION = 2; private static final int DELETE_FILE_SIZE = 10; - @Parameters(name = "formatVersion = {0}, catalogName = {1}, implementation = {2}, config = {3}") + @Parameter(index = 3) + private int formatVersion; + + @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, formatVersion = {3}") public static Object[][] parameters() { return new Object[][] { { SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), - CATALOG_PROPS + CATALOG_PROPS, + 2 + }, + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + CATALOG_PROPS, + 3 } }; } @@ -223,7 +238,11 @@ private void testDanglingDelete(String partitionCol, int numDataFiles) throws Ex // write dangling delete files for 'old data files' writePosDeletesForFiles(table, dataFiles); List deleteFiles = deleteFiles(table); - assertThat(deleteFiles).hasSize(numDataFiles * DELETE_FILES_PER_PARTITION); + if (formatVersion >= 3) { + assertThat(deleteFiles).hasSize(numDataFiles); + } else { + assertThat(deleteFiles).hasSize(numDataFiles * DELETE_FILES_PER_PARTITION); + } List expectedRecords = records(tableName, partitionCol); @@ -250,8 +269,8 @@ private void createTable(String partitionType, String partitionCol, String parti "CREATE TABLE %s (id long, %s %s, c1 string, c2 string) " + "USING iceberg " + "PARTITIONED BY (%s) " - + "TBLPROPERTIES('format-version'='2')", - tableName, partitionCol, partitionType, partitionTransform); + + "TBLPROPERTIES('format-version'='%s')", + tableName, partitionCol, partitionType, partitionTransform, formatVersion); } private void insertData(Function partitionValueFunction) throws Exception { @@ -303,17 +322,24 @@ private void writePosDeletesForFiles(Table table, List files) throws I int counter = 0; List> deletes = Lists.newArrayList(); - for (DataFile partitionFile : partitionFiles) { - for (int deletePos = 0; deletePos < DELETE_FILE_SIZE; deletePos++) { - deletes.add(Pair.of(partitionFile.location(), (long) deletePos)); - counter++; - if (counter == deleteFileSize) { - // Dump to file and reset variables - OutputFile output = - Files.localOutput(temp.resolve(UUID.randomUUID().toString()).toFile()); - deleteFiles.add(writeDeleteFile(table, output, partition, deletes)); - counter = 0; - deletes.clear(); + if (formatVersion >= 3) { + for (DataFile partitionFile : partitionFiles) { + deleteFiles.addAll( + writeDV(table, partition, partitionFile.location(), deletesForPartition)); + } + } else { + for (DataFile partitionFile : partitionFiles) { + for (int deletePos = 0; deletePos < DELETE_FILE_SIZE; deletePos++) { + deletes.add(Pair.of(partitionFile.location(), (long) deletePos)); + counter++; + if (counter == deleteFileSize) { + // Dump to file and reset variables + OutputFile output = + Files.localOutput(temp.resolve(UUID.randomUUID().toString()).toFile()); + deleteFiles.add(writeDeleteFile(table, output, partition, deletes)); + counter = 0; + deletes.clear(); + } } } } @@ -324,6 +350,20 @@ private void writePosDeletesForFiles(Table table, List files) throws I rowDelta.commit(); } + private List writeDV( + Table table, StructLike partition, String path, int numPositionsToDelete) throws IOException { + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null); + try (DVFileWriter closeableWriter = writer) { + for (int row = 0; row < numPositionsToDelete; row++) { + closeableWriter.delete(path, row, table.spec(), partition); + } + } + + return writer.result().deleteFiles(); + } + private DeleteFile writeDeleteFile( Table table, OutputFile out, StructLike partition, List> deletes) throws IOException { @@ -357,7 +397,7 @@ private List records(String table, String partitionCol) { } private long size(List deleteFiles) { - return deleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum(); + return deleteFiles.stream().mapToLong(ScanTaskUtil::contentSizeInBytes).sum(); } private List dataFiles(Table table) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java index 546cf63c2e0d..2562c74eafcc 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewritePositionDeleteFilesSparkAction.java @@ -39,7 +39,6 @@ import org.apache.iceberg.RewriteJobOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.TableUtil; import org.apache.iceberg.actions.ImmutableRewritePositionDeleteFiles; import org.apache.iceberg.actions.RewritePositionDeleteFiles; import org.apache.iceberg.actions.RewritePositionDeletesCommitManager; @@ -403,9 +402,6 @@ private void validateAndInitOptions() { PARTIAL_PROGRESS_MAX_COMMITS, maxCommits, PARTIAL_PROGRESS_ENABLED); - - Preconditions.checkArgument( - TableUtil.formatVersion(table) <= 2, "Cannot rewrite position deletes for V3 table"); } private String jobDesc(RewritePositionDeletesGroup group, RewriteExecutionContext ctx) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java index 0c319e2bd41a..0beaba04bf7d 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/DVIterator.java @@ -84,6 +84,8 @@ public InternalRow next() { rowValues.add(deleteFile.contentOffset()); } else if (fieldId == MetadataColumns.CONTENT_SIZE_IN_BYTES_COLUMN_ID) { rowValues.add(ScanTaskUtil.contentSizeInBytes(deleteFile)); + } else if (fieldId == MetadataColumns.DELETE_FILE_ROW_FIELD_ID) { + rowValues.add(null); } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java index 329bcf085569..a00eb5605089 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java @@ -73,6 +73,7 @@ protected Stream> referencedFiles(PositionDeletesScanTask task) { return Stream.of(task.file()); } + @SuppressWarnings("resource") // handled by BaseReader @Override protected CloseableIterator open(PositionDeletesScanTask task) { String filePath = task.file().location(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 73e6ab01563c..cbb87d99d116 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -22,19 +22,23 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import org.apache.iceberg.BaseMetadataTable; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PositionDeletesTable; import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.TableUtil; import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.io.ClusteredPositionDeleteWriter; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitioningDVWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; @@ -224,34 +228,49 @@ public DataWriter createWriter(int partitionId, long taskId) { .suffix("deletes") .build(); - Schema positionDeleteRowSchema = positionDeleteRowSchema(); - StructType deleteSparkType = deleteSparkType(); - StructType deleteSparkTypeWithoutRow = deleteSparkTypeWithoutRow(); + if (TableUtil.formatVersion(underlyingTable(table)) >= 3) { + return new DVWriter(table, deleteFileFactory, dsSchema, specId, partition); + } else { + Schema positionDeleteRowSchema = positionDeleteRowSchema(); + StructType deleteSparkType = deleteSparkType(); + StructType deleteSparkTypeWithoutRow = deleteSparkTypeWithoutRow(); + + SparkFileWriterFactory writerFactoryWithRow = + SparkFileWriterFactory.builderFor(table) + .deleteFileFormat(format) + .positionDeleteRowSchema(positionDeleteRowSchema) + .positionDeleteSparkType(deleteSparkType) + .writeProperties(writeProperties) + .build(); + SparkFileWriterFactory writerFactoryWithoutRow = + SparkFileWriterFactory.builderFor(table) + .deleteFileFormat(format) + .positionDeleteSparkType(deleteSparkTypeWithoutRow) + .writeProperties(writeProperties) + .build(); + + return new DeleteWriter( + table, + writerFactoryWithRow, + writerFactoryWithoutRow, + deleteFileFactory, + targetFileSize, + deleteGranularity, + dsSchema, + specId, + partition); + } + } - SparkFileWriterFactory writerFactoryWithRow = - SparkFileWriterFactory.builderFor(table) - .deleteFileFormat(format) - .positionDeleteRowSchema(positionDeleteRowSchema) - .positionDeleteSparkType(deleteSparkType) - .writeProperties(writeProperties) - .build(); - SparkFileWriterFactory writerFactoryWithoutRow = - SparkFileWriterFactory.builderFor(table) - .deleteFileFormat(format) - .positionDeleteSparkType(deleteSparkTypeWithoutRow) - .writeProperties(writeProperties) - .build(); + private Table underlyingTable(Table table) { + if (table instanceof SerializableTable.SerializableMetadataTable) { + return underlyingTable( + ((SerializableTable.SerializableMetadataTable) table).underlyingTable()); + } else if (table instanceof BaseMetadataTable) { + return ((BaseMetadataTable) table).table(); + } - return new DeleteWriter( - table, - writerFactoryWithRow, - writerFactoryWithoutRow, - deleteFileFactory, - targetFileSize, - deleteGranularity, - dsSchema, - specId, - partition); + return table; } private Schema positionDeleteRowSchema() { @@ -356,7 +375,7 @@ private static class DeleteWriter implements DataWriter { } @Override - public void write(InternalRow record) throws IOException { + public void write(InternalRow record) { String file = record.getString(fileOrdinal); long position = record.getLong(positionOrdinal); InternalRow row = record.getStruct(rowOrdinal, rowSize); @@ -424,6 +443,83 @@ private List allDeleteFiles() { } } + /** + * DV Writer for position deletes metadata table. + * + *

This writer is meant to be used for an action to rewrite delete files when the table + * supports DVs. + */ + private static class DVWriter implements DataWriter { + private final PositionDelete positionDelete; + private final FileIO io; + private final PartitionSpec spec; + private final int fileOrdinal; + private final int positionOrdinal; + private final StructLike partition; + private final PartitioningDVWriter dvWriter; + private boolean closed = false; + + /** + * Constructs a {@link DeleteWriter}. + * + * @param table position deletes metadata table + * @param deleteFileFactory delete file factory + * @param dsSchema schema of incoming dataset of position deletes + * @param specId partition spec id of incoming position deletes. All incoming partition deletes + * are required to have the same spec id. + * @param partition partition value of incoming position delete. All incoming partition deletes + * are required to have the same partition. + */ + DVWriter( + Table table, + OutputFileFactory deleteFileFactory, + StructType dsSchema, + int specId, + StructLike partition) { + this.positionDelete = PositionDelete.create(); + this.io = table.io(); + this.spec = table.specs().get(specId); + this.partition = partition; + this.fileOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_PATH.name()); + this.positionOrdinal = dsSchema.fieldIndex(MetadataColumns.DELETE_FILE_POS.name()); + this.dvWriter = new PartitioningDVWriter<>(deleteFileFactory, p -> null); + } + + @Override + public void write(InternalRow record) { + String file = record.getString(fileOrdinal); + long position = record.getLong(positionOrdinal); + positionDelete.set(file, position, null); + dvWriter.write(positionDelete, spec, partition); + } + + @Override + public WriterCommitMessage commit() throws IOException { + close(); + return new DeleteTaskCommit(allDeleteFiles()); + } + + @Override + public void abort() throws IOException { + close(); + SparkCleanupUtil.deleteTaskFiles(io, allDeleteFiles()); + } + + @Override + public void close() throws IOException { + if (!closed) { + if (null != dvWriter) { + dvWriter.close(); + } + this.closed = true; + } + } + + private List allDeleteFiles() { + return dvWriter.result().deleteFiles(); + } + } + public static class DeleteTaskCommit implements WriterCommitMessage { private final DeleteFile[] taskFiles; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java index b71ec52b7129..e5842e7a8827 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewritePositionDeleteFilesAction.java @@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -30,6 +31,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -60,14 +62,18 @@ import org.apache.iceberg.actions.SizeBasedFileRewriter; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.FileHelpers; +import org.apache.iceberg.deletes.BaseDVFileWriter; +import org.apache.iceberg.deletes.DVFileWriter; import org.apache.iceberg.deletes.DeleteGranularity; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.CatalogTestBase; import org.apache.iceberg.spark.SparkCatalogConfig; @@ -76,6 +82,7 @@ import org.apache.iceberg.spark.source.ThreeColumnRecord; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.ScanTaskUtil; import org.apache.iceberg.util.StructLikeMap; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -103,14 +110,24 @@ public class TestRewritePositionDeleteFilesAction extends CatalogTestBase { private static final int SCALE = 4000; private static final int DELETES_SCALE = 1000; - @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, fileFormat = {3}") + @Parameters( + name = + "catalogName = {0}, implementation = {1}, config = {2}, fileFormat = {3}, formatVersion = {4}") public static Object[][] parameters() { return new Object[][] { { SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, - FileFormat.PARQUET + FileFormat.PARQUET, + 2 + }, + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + CATALOG_PROPS, + FileFormat.PARQUET, + 3 } }; } @@ -120,6 +137,9 @@ public static Object[][] parameters() { @Parameter(index = 3) private FileFormat format; + @Parameter(index = 4) + private int formatVersion; + @AfterEach public void cleanup() { validationCatalog.dropTable(TableIdentifier.of("default", TABLE_NAME)); @@ -170,6 +190,9 @@ private void checkDeleteGranularity(DeleteGranularity deleteGranularity) throws .execute(); int expectedDeleteFilesCount = deleteGranularity == DeleteGranularity.FILE ? 2 : 1; + if (formatVersion >= 3) { + expectedDeleteFilesCount = 2; + } assertThat(result.addedDeleteFilesCount()).isEqualTo(expectedDeleteFilesCount); } @@ -194,7 +217,13 @@ public void testUnpartitioned() throws Exception { .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .execute(); List newDeleteFiles = deleteFiles(table); - assertThat(newDeleteFiles).as("New delete files").hasSize(1); + if (formatVersion >= 3) { + assertThat(newDeleteFiles) + .as("Expected 1 delete file per data file") + .hasSameSizeAs(dataFiles); + } else { + assertThat(newDeleteFiles).as("Expected 1 new delete file").hasSize(1); + } assertLocallySorted(newDeleteFiles); assertNotContains(deleteFiles, newDeleteFiles); checkResult(result, deleteFiles, newDeleteFiles, 1); @@ -206,6 +235,89 @@ public void testUnpartitioned() throws Exception { assertEquals("Position deletes", expectedDeletes, actualDeletes); } + @TestTemplate + public void testDVsAreRewritten() throws Exception { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + Table table = createTableUnpartitioned(2, SCALE); + List dataFiles = TestHelpers.dataFiles(table); + writePosDeletesForFiles(table, 2, DELETES_SCALE, dataFiles); + assertThat(dataFiles).hasSize(2); + + List deleteFiles = deleteFiles(table); + assertThat(deleteFiles).hasSize(2); + + List expectedRecords = records(table); + List expectedDeletes = deleteRecords(table); + assertThat(expectedRecords).hasSize(2000); + assertThat(expectedDeletes).hasSize(2000); + List dvsBeforeRewrite = dvRecords(table); + + Result result = + SparkActions.get(spark) + .rewritePositionDeletes(table) + .option(SizeBasedFileRewriter.REWRITE_ALL, "true") + .execute(); + List newDeleteFiles = deleteFiles(table); + assertThat(newDeleteFiles).as("Expected 1 delete file per data file").hasSameSizeAs(dataFiles); + assertLocallySorted(newDeleteFiles); + assertNotContains(deleteFiles, newDeleteFiles); + checkResult(result, deleteFiles, newDeleteFiles, 1); + checkSequenceNumbers(table, deleteFiles, newDeleteFiles); + + List actualRecords = records(table); + List actualDeletes = deleteRecords(table); + assertEquals("Rows must match", expectedRecords, actualRecords); + assertEquals("Position deletes must match", expectedDeletes, actualDeletes); + + Set pathsBeforeRewrite = + deleteFiles.stream().map(ContentFile::location).collect(Collectors.toSet()); + // both delete files point to a separate DV file + assertThat(pathsBeforeRewrite).hasSameSizeAs(dataFiles); + + Set pathsAfterRewrite = + newDeleteFiles.stream().map(ContentFile::location).collect(Collectors.toSet()); + // both delete files point to the same DV file + assertThat(pathsAfterRewrite).hasSize(1); + // all new deletes point to the same DV file + assertThat(newDeleteFiles) + .allMatch(file -> file.location().equals(Iterables.getOnlyElement(pathsAfterRewrite))); + + List dvsAfterRewrite = dvRecords(table); + assertThat(dvsAfterRewrite).hasSameSizeAs(dvsBeforeRewrite); + + for (Row beforeRewrite : dvsBeforeRewrite) { + Optional rewritten = + dvsAfterRewrite.stream() + // filter by data file path + .filter(row -> row.getString(0).equals(beforeRewrite.getString(0))) + .findFirst(); + assertThat(rewritten).isPresent(); + Row rewrittenRow = rewritten.get(); + + Optional delete = + newDeleteFiles.stream() + // filter by data file path + .filter(f -> f.referencedDataFile().equals(rewrittenRow.getString(0))) + .findFirst(); + assertThat(delete).isPresent(); + DeleteFile rewrittenDelete = delete.get(); + + // delete_file_path has been rewritten and is different from beforeRewrite's path + assertThat(rewrittenRow.get(1)) + .isNotEqualTo(beforeRewrite.get(1)) + .isEqualTo(rewrittenDelete.location()); + + // only compare content_offset after the rewrite with the rewritten delete file as + // it can be different from beforeRewrite's content offset + assertThat(rewrittenRow.get(2)).isEqualTo(rewrittenDelete.contentOffset()); + + // content_size_in_bytes must be the same before and after rewriting + assertThat(rewrittenRow.get(3)) + .isEqualTo(beforeRewrite.get(3)) + .isEqualTo(ScanTaskUtil.contentSizeInBytes(rewrittenDelete)); + } + } + @TestTemplate public void testRewriteAll() throws Exception { Table table = createTablePartitioned(4, 2, SCALE); @@ -215,7 +327,11 @@ public void testRewriteAll() throws Exception { assertThat(dataFiles).hasSize(4); List deleteFiles = deleteFiles(table); - assertThat(deleteFiles).hasSize(8); + if (formatVersion >= 3) { + assertThat(deleteFiles).hasSameSizeAs(dataFiles); + } else { + assertThat(deleteFiles).hasSize(8); + } List expectedRecords = records(table); List expectedDeletes = deleteRecords(table); @@ -252,7 +368,11 @@ public void testRewriteFilter() throws Exception { assertThat(dataFiles).hasSize(4); List deleteFiles = deleteFiles(table); - assertThat(deleteFiles).hasSize(8); + if (formatVersion >= 3) { + assertThat(deleteFiles).hasSameSizeAs(dataFiles); + } else { + assertThat(deleteFiles).hasSize(8); + } table.refresh(); List expectedRecords = records(table); @@ -315,7 +435,11 @@ public void testRewriteToSmallerTarget() throws Exception { assertThat(expectedDeletes).hasSize(4000); List deleteFiles = deleteFiles(table); - assertThat(deleteFiles).hasSize(8); + if (formatVersion >= 3) { + assertThat(deleteFiles).hasSameSizeAs(dataFiles); + } else { + assertThat(deleteFiles).hasSize(8); + } long avgSize = size(deleteFiles) / deleteFiles.size(); @@ -326,7 +450,13 @@ public void testRewriteToSmallerTarget() throws Exception { .option(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, String.valueOf(avgSize / 2)) .execute(); List newDeleteFiles = deleteFiles(table); - assertThat(newDeleteFiles).as("New delete files").hasSize(8); + if (formatVersion >= 3) { + assertThat(deleteFiles) + .as("Should have 1 delete file per data file") + .hasSameSizeAs(dataFiles); + } else { + assertThat(newDeleteFiles).as("Should have 8 new delete files").hasSize(8); + } assertNotContains(deleteFiles, newDeleteFiles); assertLocallySorted(newDeleteFiles); checkResult(result, deleteFiles, newDeleteFiles, 4); @@ -353,7 +483,11 @@ public void testRemoveDanglingDeletes() throws Exception { assertThat(dataFiles).hasSize(4); List deleteFiles = deleteFiles(table); - assertThat(deleteFiles).hasSize(8); + if (formatVersion >= 3) { + assertThat(deleteFiles).hasSameSizeAs(dataFiles); + } else { + assertThat(deleteFiles).hasSize(8); + } List expectedRecords = records(table); List expectedDeletes = deleteRecords(table); @@ -392,7 +526,11 @@ public void testSomePartitionsDanglingDeletes() throws Exception { assertThat(dataFiles).hasSize(4); List deleteFiles = deleteFiles(table); - assertThat(deleteFiles).hasSize(8); + if (formatVersion >= 3) { + assertThat(deleteFiles).hasSameSizeAs(dataFiles); + } else { + assertThat(deleteFiles).hasSize(8); + } List expectedRecords = records(table); List expectedDeletes = deleteRecords(table); @@ -425,7 +563,7 @@ public void testSomePartitionsDanglingDeletes() throws Exception { expectedDeletes.stream() .filter( r -> { - Object[] partition = (Object[]) r[3]; + Object[] partition = formatVersion >= 3 ? (Object[]) r[2] : (Object[]) r[3]; return partition[0] == (Integer) 2 || partition[0] == (Integer) 3; }) .collect(Collectors.toList()); @@ -446,7 +584,11 @@ public void testRewriteFilterRemoveDangling() throws Exception { assertThat(dataFiles).hasSize(4); List deleteFiles = deleteFiles(table); - assertThat(deleteFiles).hasSize(8); + if (formatVersion >= 3) { + assertThat(deleteFiles).hasSameSizeAs(dataFiles); + } else { + assertThat(deleteFiles).hasSize(8); + } table.refresh(); List expectedRecords = records(table); @@ -469,7 +611,7 @@ public void testRewriteFilterRemoveDangling() throws Exception { .execute(); List newDeleteFiles = except(deleteFiles(table), deleteFiles); - assertThat(newDeleteFiles).as("New delete files").isEmpty(); + assertThat(newDeleteFiles).as("Should have 0 new delete files").hasSize(0); List expectedRewrittenFiles = filterFiles(table, deleteFiles, ImmutableList.of(0), ImmutableList.of(1)); @@ -507,7 +649,11 @@ public void testPartitionEvolutionAdd() throws Exception { assertThat(partitionedDataFiles).hasSize(2); List partitionedDeleteFiles = except(deleteFiles(table), unpartitionedDeleteFiles); - assertThat(partitionedDeleteFiles).hasSize(4); + if (formatVersion >= 3) { + assertThat(partitionedDeleteFiles).hasSameSizeAs(partitionedDataFiles); + } else { + assertThat(partitionedDeleteFiles).hasSize(4); + } List expectedDeletes = deleteRecords(table); List expectedRecords = records(table); @@ -524,7 +670,13 @@ public void testPartitionEvolutionAdd() throws Exception { Stream.concat(unpartitionedDeleteFiles.stream(), partitionedDeleteFiles.stream()) .collect(Collectors.toList()); List newDeleteFiles = deleteFiles(table); - assertThat(newDeleteFiles).as("New delete files").hasSize(3); + if (formatVersion >= 3) { + assertThat(newDeleteFiles) + .as("Should have 1 delete file per data file") + .hasSize(partitionedDataFiles.size() + unpartitionedDataFiles.size()); + } else { + assertThat(newDeleteFiles).as("Should have 3 new delete files").hasSize(3); + } assertNotContains(rewrittenDeleteFiles, newDeleteFiles); assertLocallySorted(newDeleteFiles); checkResult(result, rewrittenDeleteFiles, newDeleteFiles, 3); @@ -544,7 +696,11 @@ public void testPartitionEvolutionRemove() throws Exception { assertThat(dataFilesUnpartitioned).hasSize(2); List deleteFilesUnpartitioned = deleteFiles(table); - assertThat(deleteFilesUnpartitioned).hasSize(4); + if (formatVersion >= 3) { + assertThat(deleteFilesUnpartitioned).hasSameSizeAs(dataFilesUnpartitioned); + } else { + assertThat(deleteFilesUnpartitioned).hasSize(4); + } table.updateSpec().removeField("c1").commit(); @@ -563,7 +719,8 @@ public void testPartitionEvolutionRemove() throws Exception { assertThat(expectedRecords).hasSize(8000); List expectedRewritten = deleteFiles(table); - assertThat(expectedRewritten).hasSize(6); + assertThat(expectedRewritten) + .hasSize(deleteFilesPartitioned.size() + deleteFilesUnpartitioned.size()); Result result = SparkActions.get(spark) @@ -571,7 +728,13 @@ public void testPartitionEvolutionRemove() throws Exception { .option(SizeBasedFileRewriter.REWRITE_ALL, "true") .execute(); List newDeleteFiles = deleteFiles(table); - assertThat(newDeleteFiles).as("New delete files").hasSize(3); + if (formatVersion >= 3) { + assertThat(newDeleteFiles) + .as("Should have 1 delete file per data file") + .hasSize(dataFilesUnpartitioned.size() + deleteFilesPartitioned.size()); + } else { + assertThat(newDeleteFiles).as("Should have 3 new delete files").hasSize(3); + } assertNotContains(expectedRewritten, newDeleteFiles); assertLocallySorted(newDeleteFiles); checkResult(result, expectedRewritten, newDeleteFiles, 3); @@ -591,7 +754,11 @@ public void testSchemaEvolution() throws Exception { assertThat(dataFiles).hasSize(2); List deleteFiles = deleteFiles(table); - assertThat(deleteFiles).hasSize(4); + if (formatVersion >= 3) { + assertThat(deleteFiles).hasSameSizeAs(dataFiles); + } else { + assertThat(deleteFiles).hasSize(4); + } table.updateSchema().addColumn("c4", Types.StringType.get()).commit(); writeNewSchemaRecords(table, 2, SCALE, 2, 2); @@ -604,7 +771,11 @@ public void testSchemaEvolution() throws Exception { writePosDeletesForFiles(table, 2, DELETES_SCALE, newSchemaDataFiles); List newSchemaDeleteFiles = except(deleteFiles(table), deleteFiles); - assertThat(newSchemaDeleteFiles).hasSize(4); + if (formatVersion >= 3) { + assertThat(newSchemaDeleteFiles).hasSameSizeAs(newSchemaDataFiles); + } else { + assertThat(newSchemaDeleteFiles).hasSize(4); + } table.refresh(); List expectedDeletes = deleteRecords(table); @@ -728,22 +899,6 @@ public void testRewriteManyColumns() throws Exception { assertEquals("Position deletes", expectedDeletes, actualDeletes); } - @TestTemplate - public void testRewritePositionDeletesForV3TableFails() { - Table table = - validationCatalog.createTable( - TableIdentifier.of("default", TABLE_NAME), - SCHEMA, - PartitionSpec.unpartitioned(), - tableProperties(3)); - - writeRecords(table, 2, SCALE); - - assertThatThrownBy(() -> SparkActions.get(spark).rewritePositionDeletes(table).execute()) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Cannot rewrite position deletes for V3 table"); - } - private Table createTablePartitioned(int partitions, int files, int numRecords) { PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build(); Table table = @@ -767,10 +922,6 @@ private Table createTableUnpartitioned(int files, int numRecords) { } private Map tableProperties() { - return tableProperties(2); - } - - private Map tableProperties(int formatVersion) { return ImmutableMap.of( TableProperties.DEFAULT_WRITE_METRICS_MODE, "full", @@ -867,10 +1018,16 @@ private List deleteRecords(Table table) { String[] additionalFields; // do not select delete_file_path for comparison // as delete files have been rewritten - if (table.spec().isUnpartitioned()) { - additionalFields = new String[] {"pos", "row"}; + if (formatVersion >= 3) { + additionalFields = + table.spec().isUnpartitioned() + ? new String[] {"pos"} + : new String[] {"pos", "partition", "spec_id"}; } else { - additionalFields = new String[] {"pos", "row", "partition", "spec_id"}; + additionalFields = + table.spec().isUnpartitioned() + ? new String[] {"pos", "row"} + : new String[] {"pos", "row", "partition", "spec_id"}; } return rowsToJava( spark @@ -882,6 +1039,16 @@ private List deleteRecords(Table table) { .collectAsList()); } + private List dvRecords(Table table) { + return spark + .read() + .format("iceberg") + .load(name(table) + ".position_deletes") + .select("file_path", "delete_file_path", "content_offset", "content_size_in_bytes") + .distinct() + .collectAsList(); + } + private void writePosDeletesForFiles( Table table, int deleteFilesPerPartition, int deletesPerDataFile, List files) throws IOException { @@ -918,17 +1085,25 @@ private void writePosDeletesForFiles( int deleteFileSize = deletesForPartition / deleteFilesPerPartition; int counter = 0; List> deletes = Lists.newArrayList(); - for (DataFile partitionFile : partitionFiles) { - for (int deletePos = 0; deletePos < deletesPerDataFile; deletePos++) { - deletes.add(Pair.of(partitionFile.location(), (long) deletePos)); - counter++; - if (counter == deleteFileSize) { - // Dump to file and reset variables - OutputFile output = - Files.localOutput(File.createTempFile("junit", suffix, temp.toFile())); - deleteFiles.add(FileHelpers.writeDeleteFile(table, output, partition, deletes).first()); - counter = 0; - deletes.clear(); + if (formatVersion >= 3) { + for (DataFile partitionFile : partitionFiles) { + deleteFiles.addAll( + writeDV(table, partition, partitionFile.location(), deletesPerDataFile)); + } + } else { + for (DataFile partitionFile : partitionFiles) { + for (int deletePos = 0; deletePos < deletesPerDataFile; deletePos++) { + deletes.add(Pair.of(partitionFile.path(), (long) deletePos)); + counter++; + if (counter == deleteFileSize) { + // Dump to file and reset variables + OutputFile output = + Files.localOutput(File.createTempFile("junit", suffix, temp.toFile())); + deleteFiles.add( + FileHelpers.writeDeleteFile(table, output, partition, deletes).first()); + counter = 0; + deletes.clear(); + } } } } @@ -948,6 +1123,20 @@ private void writePosDeletesForFiles( } } + private List writeDV( + Table table, StructLike partition, String path, int numPositionsToDelete) throws IOException { + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, 1, 1).format(FileFormat.PUFFIN).build(); + DVFileWriter writer = new BaseDVFileWriter(fileFactory, p -> null); + try (DVFileWriter closeableWriter = writer) { + for (int row = 0; row < numPositionsToDelete; row++) { + closeableWriter.delete(path, row, table.spec(), partition); + } + } + + return writer.result().deleteFiles(); + } + private List deleteFiles(Table table) { Table deletesTable = MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES); @@ -1002,7 +1191,7 @@ private String name(Table table) { } private long size(List deleteFiles) { - return deleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum(); + return deleteFiles.stream().mapToLong(ScanTaskUtil::contentSizeInBytes).sum(); } private List filterDeletes(List deletes, List... partitionValues) { @@ -1010,7 +1199,7 @@ private List filterDeletes(List deletes, List... partitio deletes.stream() .filter( r -> { - Object[] partition = (Object[]) r[3]; + Object[] partition = formatVersion >= 3 ? (Object[]) r[2] : (Object[]) r[3]; return Arrays.stream(partitionValues) .map(partitionValue -> match(partition, partitionValue)) .reduce((a, b) -> a || b) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java index 357cd0cdaa06..71ffa9ccf283 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPositionDeletesTable.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; @@ -75,6 +76,7 @@ import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.DeleteFileSet; import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.ScanTaskUtil; import org.apache.iceberg.util.StructLikeSet; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; @@ -98,30 +100,48 @@ public class TestPositionDeletesTable extends CatalogTestBase { "cache-enabled", "false"); private static final List NON_PATH_COLS = ImmutableList.of("file_path", "pos", "row", "partition", "spec_id"); + private static final List NON_PATH_V3_COLS = + ImmutableList.of( + "file_path", "pos", "partition", "spec_id", "content_offset", "content_size_in_bytes"); @Parameter(index = 3) private FileFormat format; - @Parameters(name = "catalogName = {1}, implementation = {2}, config = {3}, fileFormat = {4}") + @Parameter(index = 4) + private int formatVersion; + + @Parameters( + name = + "catalogName = {1}, implementation = {2}, config = {3}, fileFormat = {4}, formatVersion = {5}") public static Object[][] parameters() { return new Object[][] { { SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, - FileFormat.PARQUET + FileFormat.PARQUET, + 2 }, { SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, - FileFormat.AVRO + FileFormat.AVRO, + 2 }, { SparkCatalogConfig.HIVE.catalogName(), SparkCatalogConfig.HIVE.implementation(), CATALOG_PROPS, - FileFormat.ORC + FileFormat.ORC, + 2 + }, + { + SparkCatalogConfig.HIVE.catalogName(), + SparkCatalogConfig.HIVE.implementation(), + CATALOG_PROPS, + FileFormat.PARQUET, + 3 }, }; } @@ -142,7 +162,8 @@ public void testNullRows() throws IOException { tab, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); tab.newRowDelta().addDeletes(posDeletes.first()).commit(); StructLikeSet actual = actual(tableName, tab); @@ -150,7 +171,7 @@ public void testNullRows() throws IOException { List> expectedDeletes = Lists.newArrayList( positionDelete(dFile.location(), 0L), positionDelete(dFile.location(), 1L)); - StructLikeSet expected = expected(tab, expectedDeletes, null, posDeletes.first().location()); + StructLikeSet expected = expected(tab, expectedDeletes, null, posDeletes.first()); assertThat(actual).as("Position Delete table should contain expected rows").isEqualTo(expected); dropTable(tableName); @@ -158,6 +179,7 @@ public void testNullRows() throws IOException { @TestTemplate public void testPartitionedTable() throws IOException { + assumeThat(formatVersion).as("DVs don't have row info in PositionDeletesTable").isEqualTo(2); // Create table with two partitions String tableName = "partitioned_table"; PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); @@ -178,8 +200,7 @@ public void testPartitionedTable() throws IOException { StructLikeSet actual = actual(tableName, tab, "row.data='b'"); GenericRecord partitionB = GenericRecord.create(tab.spec().partitionType()); partitionB.setField("data", "b"); - StructLikeSet expected = - expected(tab, deletesB.first(), partitionB, deletesB.second().location()); + StructLikeSet expected = expected(tab, deletesB.first(), partitionB, deletesB.second()); assertThat(actual).as("Position Delete table should contain expected rows").isEqualTo(expected); dropTable(tableName); @@ -187,6 +208,7 @@ public void testPartitionedTable() throws IOException { @TestTemplate public void testSelect() throws IOException { + assumeThat(formatVersion).as("DVs don't have row info in PositionDeletesTable").isEqualTo(2); // Create table with two partitions String tableName = "select"; PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); @@ -249,6 +271,73 @@ public void testSelect() throws IOException { dropTable(tableName); } + @TestTemplate + public void testSelectWithDVs() throws IOException { + assumeThat(formatVersion).isGreaterThanOrEqualTo(3); + String tableName = "select_with_dvs"; + Table tab = createTable(tableName, SCHEMA, PartitionSpec.unpartitioned()); + + DataFile dataFileA = dataFile(tab); + DataFile dataFileB = dataFile(tab); + + tab.newAppend().appendFile(dataFileA).appendFile(dataFileB).commit(); + + Pair>, DeleteFile> deletesA = deleteFile(tab, dataFileA); + Pair>, DeleteFile> deletesB = deleteFile(tab, dataFileB); + + tab.newRowDelta().addDeletes(deletesA.second()).addDeletes(deletesB.second()).commit(); + + // Select certain columns + Dataset df = + spark + .read() + .format("iceberg") + .load("default." + tableName + ".position_deletes") + .select( + "pos", "file_path", "delete_file_path", "content_offset", "content_size_in_bytes"); + List actual = rowsToJava(df.collectAsList()); + + // Select cols from expected delete values + List expected = Lists.newArrayList(); + BiFunction, DeleteFile, Object[]> toRow = + (delete, file) -> + row( + delete.get(1, Long.class), + file.referencedDataFile(), + file.location(), + file.contentOffset(), + ScanTaskUtil.contentSizeInBytes(file)); + + expected.addAll( + deletesA.first().stream() + .map(d -> toRow.apply(d, deletesA.second())) + .collect(Collectors.toList())); + expected.addAll( + deletesB.first().stream() + .map(d -> toRow.apply(d, deletesB.second())) + .collect(Collectors.toList())); + + // Sort by pos and file_path + Comparator comp = + (o1, o2) -> { + int result = Long.compare((long) o1[0], (long) o2[0]); + if (result != 0) { + return result; + } else { + return ((String) o1[1]).compareTo((String) o2[1]); + } + }; + + actual.sort(comp); + expected.sort(comp); + + assertThat(actual) + .as("Position Delete table should contain expected rows") + .usingRecursiveComparison() + .isEqualTo(expected); + dropTable(tableName); + } + @TestTemplate public void testSplitTasks() throws IOException { String tableName = "big_table"; @@ -278,7 +367,8 @@ public void testSplitTasks() throws IOException { tab, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); tab.newRowDelta().addDeletes(posDeletes).commit(); Table deleteTable = @@ -295,7 +385,7 @@ public void testSplitTasks() throws IOException { } StructLikeSet actual = actual(tableName, tab); - StructLikeSet expected = expected(tab, deletes, null, posDeletes.location()); + StructLikeSet expected = expected(tab, deletes, null, posDeletes); assertThat(actual).as("Position Delete table should contain expected rows").isEqualTo(expected); dropTable(tableName); @@ -324,10 +414,8 @@ public void testPartitionFilter() throws IOException { GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType()); Record partitionA = partitionRecordTemplate.copy("data", "a"); Record partitionB = partitionRecordTemplate.copy("data", "b"); - StructLikeSet expectedA = - expected(tab, deletesA.first(), partitionA, deletesA.second().location()); - StructLikeSet expectedB = - expected(tab, deletesB.first(), partitionB, deletesB.second().location()); + StructLikeSet expectedA = expected(tab, deletesA.first(), partitionA, deletesA.second()); + StructLikeSet expectedB = expected(tab, deletesB.first(), partitionB, deletesB.second()); StructLikeSet allExpected = StructLikeSet.create(deletesTab.schema().asStruct()); allExpected.addAll(expectedA); allExpected.addAll(expectedB); @@ -371,10 +459,8 @@ public void testPartitionTransformFilter() throws IOException { GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType()); Record partitionA = partitionRecordTemplate.copy("data_trunc", "a"); Record partitionB = partitionRecordTemplate.copy("data_trunc", "b"); - StructLikeSet expectedA = - expected(tab, deletesA.first(), partitionA, deletesA.second().location()); - StructLikeSet expectedB = - expected(tab, deletesB.first(), partitionB, deletesB.second().location()); + StructLikeSet expectedA = expected(tab, deletesA.first(), partitionA, deletesA.second()); + StructLikeSet expectedB = expected(tab, deletesB.first(), partitionB, deletesB.second()); StructLikeSet allExpected = StructLikeSet.create(deletesTable.schema().asStruct()); allExpected.addAll(expectedA); allExpected.addAll(expectedB); @@ -426,7 +512,7 @@ public void testPartitionEvolutionReplace() throws Exception { GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab)); Record partitionA = partitionRecordTemplate.copy("data", "a"); StructLikeSet expectedA = - expected(tab, deletesA.first(), partitionA, dataSpec, deletesA.second().location()); + expected(tab, deletesA.first(), partitionA, dataSpec, deletesA.second()); StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0"); assertThat(actualA) .as("Position Delete table should contain expected rows") @@ -435,12 +521,7 @@ public void testPartitionEvolutionReplace() throws Exception { // Query partition of new spec Record partition10 = partitionRecordTemplate.copy("id", 10); StructLikeSet expected10 = - expected( - tab, - deletes10.first(), - partition10, - tab.spec().specId(), - deletes10.second().location()); + expected(tab, deletes10.first(), partition10, tab.spec().specId(), deletes10.second()); StructLikeSet actual10 = actual(tableName, tab, "partition.id = 10 AND pos >= 0"); assertThat(actual10) @@ -480,7 +561,7 @@ public void testPartitionEvolutionAdd() throws Exception { GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab)); Record partitionA = partitionRecordTemplate.copy("data", "a"); StructLikeSet expectedA = - expected(tab, deletesA.first(), partitionA, specId1, deletesA.second().location()); + expected(tab, deletesA.first(), partitionA, specId1, deletesA.second()); StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0"); assertThat(actualA) .as("Position Delete table should contain expected rows") @@ -494,7 +575,7 @@ public void testPartitionEvolutionAdd() throws Exception { deletesUnpartitioned.first(), unpartitionedRecord, specId0, - deletesUnpartitioned.second().location()); + deletesUnpartitioned.second()); StructLikeSet actualUnpartitioned = actual(tableName, tab, "partition.data IS NULL and pos >= 0"); @@ -536,7 +617,7 @@ public void testPartitionEvolutionRemove() throws Exception { GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab)); Record partitionA = partitionRecordTemplate.copy("data", "a"); StructLikeSet expectedA = - expected(tab, deletesA.first(), partitionA, specId0, deletesA.second().location()); + expected(tab, deletesA.first(), partitionA, specId0, deletesA.second()); StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0"); assertThat(actualA) .as("Position Delete table should contain expected rows") @@ -550,7 +631,7 @@ public void testPartitionEvolutionRemove() throws Exception { deletesUnpartitioned.first(), unpartitionedRecord, specId1, - deletesUnpartitioned.second().location()); + deletesUnpartitioned.second()); StructLikeSet actualUnpartitioned = actual(tableName, tab, "partition.data IS NULL and pos >= 0"); @@ -594,7 +675,7 @@ public void testSpecIdFilter() throws Exception { deletesUnpartitioned.first(), partitionRecordTemplate, unpartitionedSpec, - deletesUnpartitioned.second().location()); + deletesUnpartitioned.second()); StructLikeSet actualUnpartitioned = actual(tableName, tab, String.format("spec_id = %d", unpartitionedSpec)); assertThat(actualUnpartitioned) @@ -605,9 +686,8 @@ public void testSpecIdFilter() throws Exception { StructLike partitionA = partitionRecordTemplate.copy("data", "a"); StructLike partitionB = partitionRecordTemplate.copy("data", "b"); StructLikeSet expected = - expected(tab, deletesA.first(), partitionA, dataSpec, deletesA.second().location()); - expected.addAll( - expected(tab, deletesB.first(), partitionB, dataSpec, deletesB.second().location())); + expected(tab, deletesA.first(), partitionA, dataSpec, deletesA.second()); + expected.addAll(expected(tab, deletesB.first(), partitionB, dataSpec, deletesB.second())); StructLikeSet actual = actual(tableName, tab, String.format("spec_id = %d", dataSpec)); assertThat(actual).as("Position Delete table should contain expected rows").isEqualTo(expected); @@ -660,8 +740,7 @@ public void testSchemaEvolutionAdd() throws Exception { padded.set(3, null); d.set(2, padded); }); - StructLikeSet expectedA = - expected(tab, expectedDeletesA, partitionA, deletesA.second().location()); + StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, deletesA.second()); StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0"); assertThat(actualA) .as("Position Delete table should contain expected rows") @@ -669,8 +748,7 @@ public void testSchemaEvolutionAdd() throws Exception { // Select deletes from new schema Record partitionC = partitionRecordTemplate.copy("data", "c"); - StructLikeSet expectedC = - expected(tab, deletesC.first(), partitionC, deletesC.second().location()); + StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, deletesC.second()); StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c' and pos >= 0"); assertThat(actualC) @@ -726,8 +804,7 @@ public void testSchemaEvolutionRemove() throws Exception { padded.set(1, nested.get(1)); d.set(2, padded); }); - StructLikeSet expectedA = - expected(tab, expectedDeletesA, partitionA, deletesA.second().location()); + StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, deletesA.second()); StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a' AND pos >= 0"); assertThat(actualA) .as("Position Delete table should contain expected rows") @@ -735,8 +812,7 @@ public void testSchemaEvolutionRemove() throws Exception { // Select deletes from new schema Record partitionC = partitionRecordTemplate.copy("data", "c"); - StructLikeSet expectedC = - expected(tab, deletesC.first(), partitionC, deletesC.second().location()); + StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, deletesC.second()); StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c' and pos >= 0"); assertThat(actualC) @@ -791,8 +867,8 @@ public void testWrite() throws IOException, NoSuchTableException { GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType()); Record partitionA = partitionRecordTemplate.copy("data", "a"); Record partitionB = partitionRecordTemplate.copy("data", "b"); - StructLikeSet expectedA = expected(tab, deletesA.first(), partitionA, null); - StructLikeSet expectedB = expected(tab, deletesB.first(), partitionB, null); + StructLikeSet expectedA = expected(tab, deletesA.first(), partitionA, deletesA.second(), false); + StructLikeSet expectedB = expected(tab, deletesB.first(), partitionB, deletesB.second(), false); StructLikeSet allExpected = StructLikeSet.create( TypeUtil.selectNot( @@ -802,7 +878,8 @@ public void testWrite() throws IOException, NoSuchTableException { allExpected.addAll(expectedB); // Compare values without 'delete_file_path' as these have been rewritten - StructLikeSet actual = actual(tableName, tab, null, NON_PATH_COLS); + StructLikeSet actual = + actual(tableName, tab, null, formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); assertThat(actual) .as("Position Delete table should contain expected rows") .isEqualTo(allExpected); @@ -825,7 +902,8 @@ public void testWriteUnpartitionedNullRows() throws Exception { tab, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of(0), - deletes); + deletes, + formatVersion); tab.newRowDelta().addDeletes(posDeletes.first()).commit(); Table posDeletesTable = @@ -851,14 +929,19 @@ public void testWriteUnpartitionedNullRows() throws Exception { commit(tab, posDeletesTable, fileSetID, 1); } + List columns = + formatVersion >= 3 + ? ImmutableList.of( + "file_path", "pos", "spec_id", "content_offset", "content_size_in_bytes") + : ImmutableList.of("file_path", "pos", "row", "spec_id"); + // Compare values without 'delete_file_path' as these have been rewritten - StructLikeSet actual = - actual(tableName, tab, null, ImmutableList.of("file_path", "pos", "row", "spec_id")); + StructLikeSet actual = actual(tableName, tab, null, columns); List> expectedDeletes = Lists.newArrayList( positionDelete(dFile.location(), 0L), positionDelete(dFile.location(), 1L)); - StructLikeSet expected = expected(tab, expectedDeletes, null, null); + StructLikeSet expected = expected(tab, expectedDeletes, null, posDeletes.first(), false); assertThat(actual).as("Position Delete table should contain expected rows").isEqualTo(expected); dropTable(tableName); @@ -883,7 +966,8 @@ public void testWriteMixedRows() throws Exception { tab, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), TestHelpers.Row.of("a"), - deletes); + deletes, + formatVersion); Pair>, DeleteFile> deletesWithRow = deleteFile(tab, dataFileB, "b"); @@ -919,11 +1003,7 @@ public void testWriteMixedRows() throws Exception { // Compare values without 'delete_file_path' as these have been rewritten StructLikeSet actual = - actual( - tableName, - tab, - null, - ImmutableList.of("file_path", "pos", "row", "partition", "spec_id")); + actual(tableName, tab, null, formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); // Prepare expected values GenericRecord partitionRecordTemplate = GenericRecord.create(tab.spec().partitionType()); @@ -940,8 +1020,10 @@ public void testWriteMixedRows() throws Exception { Lists.newArrayList( positionDelete(dataFileA.location(), 0L), positionDelete(dataFileA.location(), 1L)), partitionA, - null)); - allExpected.addAll(expected(tab, deletesWithRow.first(), partitionB, null)); + deletesWithoutRow.first(), + false)); + allExpected.addAll( + expected(tab, deletesWithRow.first(), partitionB, deletesWithRow.second(), false)); assertThat(actual) .as("Position Delete table should contain expected rows") @@ -1007,9 +1089,19 @@ public void testWritePartitionEvolutionAdd() throws Exception { GenericRecord partitionRecordTemplate = GenericRecord.create(Partitioning.partitionType(tab)); Record unpartitionedRecord = partitionRecordTemplate.copy("data", null); StructLikeSet expectedUnpartitioned = - expected(tab, deletesUnpartitioned.first(), unpartitionedRecord, specId0, null); + expected( + tab, + deletesUnpartitioned.first(), + unpartitionedRecord, + specId0, + deletesUnpartitioned.second(), + false); StructLikeSet actualUnpartitioned = - actual(tableName, tab, "partition.data IS NULL", NON_PATH_COLS); + actual( + tableName, + tab, + "partition.data IS NULL", + formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); assertThat(actualUnpartitioned) .as("Position Delete table should contain expected rows") .isEqualTo(expectedUnpartitioned); @@ -1046,10 +1138,16 @@ public void testWritePartitionEvolutionAdd() throws Exception { TypeUtil.selectNot( posDeletesTable.schema(), ImmutableSet.of(MetadataColumns.FILE_PATH_COLUMN_ID)) .asStruct()); - expectedAll.addAll(expected(tab, deletesA.first(), partitionA, specId1, null)); - expectedAll.addAll(expected(tab, deletesB.first(), partitionB, specId1, null)); + expectedAll.addAll( + expected(tab, deletesA.first(), partitionA, specId1, deletesA.second(), false)); + expectedAll.addAll( + expected(tab, deletesB.first(), partitionB, specId1, deletesB.second(), false)); StructLikeSet actualAll = - actual(tableName, tab, "partition.data = 'a' OR partition.data = 'b'", NON_PATH_COLS); + actual( + tableName, + tab, + "partition.data = 'a' OR partition.data = 'b'", + formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); assertThat(actualAll) .as("Position Delete table should contain expected rows") .isEqualTo(expectedAll); @@ -1181,8 +1279,13 @@ public void testWriteSchemaEvolutionAdd() throws Exception { padded.set(3, null); d.set(2, padded); }); - StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, null); - StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a'", NON_PATH_COLS); + StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, deletesA.second(), false); + StructLikeSet actualA = + actual( + tableName, + tab, + "partition.data = 'a'", + formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); assertThat(actualA) .as("Position Delete table should contain expected rows") .isEqualTo(expectedA); @@ -1211,8 +1314,13 @@ public void testWriteSchemaEvolutionAdd() throws Exception { // Select deletes from new schema Record partitionC = partitionRecordTemplate.copy("data", "c"); - StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, null); - StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c'", NON_PATH_COLS); + StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, deletesC.second(), false); + StructLikeSet actualC = + actual( + tableName, + tab, + "partition.data = 'c'", + formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); assertThat(actualC) .as("Position Delete table should contain expected rows") @@ -1294,16 +1402,26 @@ public void testWriteSchemaEvolutionRemove() throws Exception { padded.set(1, nested.get(1)); d.set(2, padded); }); - StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, null); - StructLikeSet actualA = actual(tableName, tab, "partition.data = 'a'", NON_PATH_COLS); + StructLikeSet expectedA = expected(tab, expectedDeletesA, partitionA, deletesA.second(), false); + StructLikeSet actualA = + actual( + tableName, + tab, + "partition.data = 'a'", + formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); assertThat(actualA) .as("Position Delete table should contain expected rows") .isEqualTo(expectedA); // Select deletes from new schema Record partitionC = partitionRecordTemplate.copy("data", "c"); - StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, null); - StructLikeSet actualC = actual(tableName, tab, "partition.data = 'c'", NON_PATH_COLS); + StructLikeSet expectedC = expected(tab, deletesC.first(), partitionC, deletesC.second(), false); + StructLikeSet actualC = + actual( + tableName, + tab, + "partition.data = 'c'", + formatVersion >= 3 ? NON_PATH_V3_COLS : NON_PATH_COLS); assertThat(actualC) .as("Position Delete table should contain expected rows") @@ -1364,6 +1482,7 @@ private StructLikeSet actual(String tableName, Table table, String filter, List< .filter(f -> cols.contains(f.name())) .collect(Collectors.toList())); } + Types.StructType finalProjection = projection; StructLikeSet set = StructLikeSet.create(projection); df.collectAsList() @@ -1380,7 +1499,7 @@ protected Table createTable(String name, Schema schema, PartitionSpec spec) { Map properties = ImmutableMap.of( TableProperties.FORMAT_VERSION, - "2", + String.valueOf(formatVersion), TableProperties.DEFAULT_FILE_FORMAT, format.toString()); return validationCatalog.createTable( @@ -1413,13 +1532,23 @@ private StructLikeSet expected( List> deletes, StructLike partitionStruct, int specId, - String deleteFilePath) { + DeleteFile deleteFile) { + return expected(testTable, deletes, partitionStruct, specId, deleteFile, true); + } + + private StructLikeSet expected( + Table testTable, + List> deletes, + StructLike partitionStruct, + int specId, + DeleteFile deleteFile, + boolean includeDeleteFilePath) { Table deletesTable = MetadataTableUtils.createMetadataTableInstance( testTable, MetadataTableType.POSITION_DELETES); Types.StructType posDeleteSchema = deletesTable.schema().asStruct(); // Do not compare file paths - if (deleteFilePath == null) { + if (!includeDeleteFilePath) { posDeleteSchema = TypeUtil.selectNot( deletesTable.schema(), ImmutableSet.of(MetadataColumns.FILE_PATH_COLUMN_ID)) @@ -1433,13 +1562,20 @@ private StructLikeSet expected( GenericRecord record = GenericRecord.create(finalSchema); record.setField("file_path", p.path()); record.setField("pos", p.pos()); - record.setField("row", p.row()); + if (formatVersion == 2) { + record.setField("row", p.row()); + } if (partitionStruct != null) { record.setField("partition", partitionStruct); } record.setField("spec_id", specId); - if (deleteFilePath != null) { - record.setField("delete_file_path", deleteFilePath); + if (includeDeleteFilePath) { + record.setField("delete_file_path", deleteFile.location()); + } + if (formatVersion >= 3) { + record.setField("content_offset", deleteFile.contentOffset()); + record.setField( + "content_size_in_bytes", ScanTaskUtil.contentSizeInBytes(deleteFile)); } return record; }) @@ -1451,8 +1587,23 @@ private StructLikeSet expected( Table testTable, List> deletes, StructLike partitionStruct, - String deleteFilePath) { - return expected(testTable, deletes, partitionStruct, testTable.spec().specId(), deleteFilePath); + DeleteFile deleteFile) { + return expected(testTable, deletes, partitionStruct, testTable.spec().specId(), deleteFile); + } + + private StructLikeSet expected( + Table testTable, + List> deletes, + StructLike partitionStruct, + DeleteFile deleteFile, + boolean includeDeleteFilePath) { + return expected( + testTable, + deletes, + partitionStruct, + testTable.spec().specId(), + deleteFile, + includeDeleteFilePath); } private DataFile dataFile(Table tab, Object... partValues) throws IOException { @@ -1559,7 +1710,8 @@ private Pair>, DeleteFile> deleteFile( tab, Files.localOutput(File.createTempFile("junit", null, temp.toFile())), partitionInfo, - deletes); + deletes, + formatVersion); return Pair.of(deletes, deleteFile); }