diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 47e7a762a690..a41d3ddfb8df 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -874,6 +874,10 @@ acceptedBreaks: justification: "Static utility class - should not have public constructor" "1.4.0": org.apache.iceberg:iceberg-core: + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.PartitionData" + new: "class org.apache.iceberg.PartitionData" + justification: "Serialization across versions is not supported" - code: "java.class.defaultSerializationChanged" old: "class org.apache.iceberg.mapping.NameMapping" new: "class org.apache.iceberg.mapping.NameMapping" diff --git a/core/src/main/java/org/apache/iceberg/PartitionData.java b/core/src/main/java/org/apache/iceberg/PartitionData.java index 639fdeacd7fd..41bc4c0c1233 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionData.java +++ b/core/src/main/java/org/apache/iceberg/PartitionData.java @@ -81,6 +81,15 @@ private PartitionData(PartitionData toCopy) { this.schema = toCopy.schema; } + /** Copy constructor that inherits the state but replaces the partition values */ + private PartitionData(PartitionData toCopy, StructLike partition) { + this.partitionType = toCopy.partitionType; + this.size = toCopy.size; + this.data = copyData(partitionType, partition); + this.stringSchema = toCopy.stringSchema; + this.schema = toCopy.schema; + } + public Types.StructType getPartitionType() { return partitionType; } @@ -134,18 +143,7 @@ public Object get(int pos) { @Override public void set(int pos, T value) { - if (value instanceof Utf8) { - // Utf8 is not Serializable - data[pos] = value.toString(); - } else if (value instanceof ByteBuffer) { - // ByteBuffer is not Serializable - ByteBuffer buffer = (ByteBuffer) value; - byte[] bytes = new byte[buffer.remaining()]; - buffer.duplicate().get(bytes); - data[pos] = bytes; - } else { - data[pos] = value; - } + data[pos] = toInternalValue(value); } @Override @@ -171,6 +169,10 @@ public PartitionData copy() { return new PartitionData(this); } + public PartitionData copyFor(StructLike partition) { + return new PartitionData(this, partition); + } + @Override public boolean equals(Object o) { if (this == o) { @@ -221,4 +223,32 @@ public static Object[] copyData(Types.StructType type, Object[] data) { return copy; } + + private static Object[] copyData(Types.StructType type, StructLike partition) { + List fields = type.fields(); + Object[] data = new Object[fields.size()]; + + for (int pos = 0; pos < fields.size(); pos++) { + Types.NestedField field = fields.get(pos); + Class javaClass = field.type().typeId().javaClass(); + data[pos] = toInternalValue(partition.get(pos, javaClass)); + } + + return data; + } + + private static Object toInternalValue(Object value) { + if (value instanceof Utf8) { + // Utf8 is not Serializable + return value.toString(); + } else if (value instanceof ByteBuffer) { + // ByteBuffer is not Serializable + ByteBuffer buffer = (ByteBuffer) value; + byte[] bytes = new byte[buffer.remaining()]; + buffer.duplicate().get(bytes); + return bytes; + } else { + return value; + } + } } diff --git a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java index 6e25e380dd21..e2dbcb61e9b7 100644 --- a/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/TableScanUtil.java @@ -153,6 +153,7 @@ public static List> planTaskGroup task -> Math.max(task.sizeBytes(), task.filesCount() * openFileCost); Map groupingKeyProjectionsBySpec = Maps.newHashMap(); + PartitionData groupingKeyTemplate = new PartitionData(groupingKeyType); // group tasks by grouping keys derived from their partition tuples StructLikeMap> tasksByGroupingKey = StructLikeMap.create(groupingKeyType); @@ -166,7 +167,7 @@ public static List> planTaskGroup specId -> StructProjection.create(spec.partitionType(), groupingKeyType)); List groupingKeyTasks = tasksByGroupingKey.computeIfAbsent( - projectGroupingKey(groupingKeyProjection, groupingKeyType, partition), + groupingKeyTemplate.copyFor(groupingKeyProjection.wrap(partition)), groupingKey -> Lists.newArrayList()); if (task instanceof SplittableScanTask) { ((SplittableScanTask) task).split(splitSize).forEach(groupingKeyTasks::add); @@ -188,23 +189,6 @@ public static List> planTaskGroup return taskGroups; } - private static StructLike projectGroupingKey( - StructProjection groupingKeyProjection, - Types.StructType groupingKeyType, - StructLike partition) { - - PartitionData groupingKey = new PartitionData(groupingKeyType); - - groupingKeyProjection.wrap(partition); - - for (int pos = 0; pos < groupingKeyProjection.size(); pos++) { - Class javaClass = groupingKey.getType(pos).typeId().javaClass(); - groupingKey.set(pos, groupingKeyProjection.get(pos, javaClass)); - } - - return groupingKey; - } - private static Iterable> toTaskGroupIterable( StructLike groupingKey, Iterable tasks, diff --git a/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java b/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java index 90b3cd4b53e2..ad78205ce98c 100644 --- a/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java +++ b/spark/v3.5/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java @@ -18,8 +18,6 @@ */ package org.apache.iceberg.spark; -import static org.apache.spark.sql.functions.lit; - import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; import java.io.IOException; @@ -28,36 +26,26 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; -import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; -import org.apache.iceberg.FileMetadata; -import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.FileGenerationUtil; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Partitioning; import org.apache.iceberg.RowDelta; import org.apache.iceberg.RowLevelOperationMode; -import org.apache.iceberg.ScanTask; import org.apache.iceberg.ScanTaskGroup; -import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestHelpers; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.spark.data.RandomData; import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions; import org.apache.iceberg.util.TableScanUtil; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.parser.ParseException; -import org.apache.spark.sql.types.StructType; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -93,16 +81,14 @@ public class TaskGroupPlanningBenchmark { private static final String PARTITION_COLUMN = "ss_ticket_number"; private static final int NUM_PARTITIONS = 150; - private static final int NUM_REAL_DATA_FILES_PER_PARTITION = 5; - private static final int NUM_REPLICA_DATA_FILES_PER_PARTITION = 50_000; + private static final int NUM_DATA_FILES_PER_PARTITION = 50_000; private static final int NUM_DELETE_FILES_PER_PARTITION = 25; - private static final int NUM_ROWS_PER_DATA_FILE = 150; private final Configuration hadoopConf = new Configuration(); private SparkSession spark; private Table table; - private List fileTasks; + private List fileTasks; @Setup public void setupBenchmark() throws NoSuchTableException, ParseException { @@ -122,7 +108,7 @@ public void tearDownBenchmark() { @Threads(1) public void planTaskGroups(Blackhole blackhole) { SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of()); - List> taskGroups = + List> taskGroups = TableScanUtil.planTaskGroups( fileTasks, readConf.splitSize(), @@ -130,113 +116,86 @@ public void planTaskGroups(Blackhole blackhole) { readConf.splitOpenFileCost()); long rowsCount = 0L; - for (ScanTaskGroup taskGroup : taskGroups) { + for (ScanTaskGroup taskGroup : taskGroups) { rowsCount += taskGroup.estimatedRowsCount(); } blackhole.consume(rowsCount); long filesCount = 0L; - for (ScanTaskGroup taskGroup : taskGroups) { + for (ScanTaskGroup taskGroup : taskGroups) { filesCount += taskGroup.filesCount(); } blackhole.consume(filesCount); long sizeBytes = 0L; - for (ScanTaskGroup taskGroup : taskGroups) { + for (ScanTaskGroup taskGroup : taskGroups) { sizeBytes += taskGroup.sizeBytes(); } blackhole.consume(sizeBytes); } - private void loadFileTasks() { - table.refresh(); + @Benchmark + @Threads(1) + public void planTaskGroupsWithGrouping(Blackhole blackhole) { + SparkReadConf readConf = new SparkReadConf(spark, table, ImmutableMap.of()); - try (CloseableIterable fileTasksIterable = table.newBatchScan().planFiles()) { - this.fileTasks = Lists.newArrayList(fileTasksIterable); - } catch (IOException e) { - throw new UncheckedIOException(e); + List> taskGroups = + TableScanUtil.planTaskGroups( + fileTasks, + readConf.splitSize(), + readConf.splitLookback(), + readConf.splitOpenFileCost(), + Partitioning.groupingKeyType(table.schema(), table.specs().values())); + + long rowsCount = 0L; + for (ScanTaskGroup taskGroup : taskGroups) { + rowsCount += taskGroup.estimatedRowsCount(); } - } + blackhole.consume(rowsCount); - private DataFile loadAddedDataFile() { - table.refresh(); + long filesCount = 0L; + for (ScanTaskGroup taskGroup : taskGroups) { + filesCount += taskGroup.filesCount(); + } + blackhole.consume(filesCount); - Iterable addedDataFiles = table.currentSnapshot().addedDataFiles(table.io()); - return Iterables.getOnlyElement(addedDataFiles); + long sizeBytes = 0L; + for (ScanTaskGroup taskGroup : taskGroups) { + sizeBytes += taskGroup.sizeBytes(); + } + blackhole.consume(sizeBytes); } - private DeleteFile loadAddedDeleteFile() { + private void loadFileTasks() { table.refresh(); - Iterable addedDeleteFiles = table.currentSnapshot().addedDeleteFiles(table.io()); - return Iterables.getOnlyElement(addedDeleteFiles); + try (CloseableIterable fileTasksIterable = table.newScan().planFiles()) { + this.fileTasks = Lists.newArrayList(fileTasksIterable); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } - private void initDataAndDeletes() throws NoSuchTableException { - Schema schema = table.schema(); - PartitionSpec spec = table.spec(); - LocationProvider locations = table.locationProvider(); - + private void initDataAndDeletes() { for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; partitionOrdinal++) { - Dataset inputDF = - randomDataDF(schema, NUM_ROWS_PER_DATA_FILE) - .drop(PARTITION_COLUMN) - .withColumn(PARTITION_COLUMN, lit(partitionOrdinal)); - - for (int fileOrdinal = 0; fileOrdinal < NUM_REAL_DATA_FILES_PER_PARTITION; fileOrdinal++) { - appendAsFile(inputDF); - } + StructLike partition = TestHelpers.Row.of(partitionOrdinal); - DataFile dataFile = loadAddedDataFile(); - - sql( - "DELETE FROM %s WHERE ss_item_sk IS NULL AND %s = %d", - TABLE_NAME, PARTITION_COLUMN, partitionOrdinal); - - DeleteFile deleteFile = loadAddedDeleteFile(); - - AppendFiles append = table.newFastAppend(); + RowDelta rowDelta = table.newRowDelta(); - for (int fileOrdinal = 0; fileOrdinal < NUM_REPLICA_DATA_FILES_PER_PARTITION; fileOrdinal++) { - String replicaFileName = UUID.randomUUID() + "-replica.parquet"; - DataFile replicaDataFile = - DataFiles.builder(spec) - .copy(dataFile) - .withPath(locations.newDataLocation(spec, dataFile.partition(), replicaFileName)) - .build(); - append.appendFile(replicaDataFile); + for (int fileOrdinal = 0; fileOrdinal < NUM_DATA_FILES_PER_PARTITION; fileOrdinal++) { + DataFile dataFile = FileGenerationUtil.generateDataFile(table, partition); + rowDelta.addRows(dataFile); } - append.commit(); - - RowDelta rowDelta = table.newRowDelta(); - for (int fileOrdinal = 0; fileOrdinal < NUM_DELETE_FILES_PER_PARTITION; fileOrdinal++) { - String replicaFileName = UUID.randomUUID() + "-replica.parquet"; - DeleteFile replicaDeleteFile = - FileMetadata.deleteFileBuilder(spec) - .copy(deleteFile) - .withPath(locations.newDataLocation(spec, deleteFile.partition(), replicaFileName)) - .build(); - rowDelta.addDeletes(replicaDeleteFile); + DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, partition); + rowDelta.addDeletes(deleteFile); } rowDelta.commit(); } } - private void appendAsFile(Dataset df) throws NoSuchTableException { - df.coalesce(1).writeTo(TABLE_NAME).append(); - } - - private Dataset randomDataDF(Schema schema, int numRows) { - Iterable rows = RandomData.generateSpark(schema, numRows, 0); - JavaSparkContext context = JavaSparkContext.fromSparkContext(spark.sparkContext()); - JavaRDD rowRDD = context.parallelize(Lists.newArrayList(rows)); - StructType rowSparkType = SparkSchemaUtil.convert(schema); - return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType, false); - } - private void setupSpark() { this.spark = SparkSession.builder()