Skip to content

Commit

Permalink
Spark: Fix stats in rewrite metadata action (apache#5691)
Browse files Browse the repository at this point in the history
* Core: Don't show dropped fields from the partition spec

* Use projection instead

* Use StructProjection in SparkDataFile.

Co-authored-by: Fokko Driesprong <[email protected]>
  • Loading branch information
2 people authored and aokolnychyi committed Oct 21, 2022
1 parent 175b9eb commit f7b7638
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;

import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.AssertHelpers;
Expand Down Expand Up @@ -193,4 +195,31 @@ public void testInvalidRewriteManifestsCases() {
"Cannot handle an empty identifier",
() -> sql("CALL %s.system.rewrite_manifests('')", catalogName));
}

@Test
public void testReplacePartitionField() {
sql(
"CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
tableName);

sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')", tableName);
sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n", tableName);
sql(
"INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))",
tableName);

assertEquals(
"Should have expected rows",
ImmutableList.of(
row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));

sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableName);

assertEquals(
"Should have expected rows",
ImmutableList.of(
row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructProjection;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;

Expand All @@ -52,13 +53,29 @@ public class SparkDataFile implements DataFile {
private final Type keyMetadataType;

private final SparkStructLike wrappedPartition;
private final StructLike partitionProjection;
private Row wrapped;

public SparkDataFile(Types.StructType type, StructType sparkType) {
this(type, null, sparkType);
}

public SparkDataFile(
Types.StructType type, Types.StructType projectedType, StructType sparkType) {
this.lowerBoundsType = type.fieldType("lower_bounds");
this.upperBoundsType = type.fieldType("upper_bounds");
this.keyMetadataType = type.fieldType("key_metadata");
this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType());

Types.StructType partitionType = type.fieldType("partition").asStructType();
this.wrappedPartition = new SparkStructLike(partitionType);

if (projectedType != null) {
Types.StructType projectedPartitionType = projectedType.fieldType("partition").asStructType();
this.partitionProjection =
StructProjection.create(partitionType, projectedPartitionType).wrap(wrappedPartition);
} else {
this.partitionProjection = wrappedPartition;
}

Map<String, Integer> positions = Maps.newHashMap();
type.fields()
Expand Down Expand Up @@ -115,7 +132,7 @@ public FileFormat format() {

@Override
public StructLike partition() {
return wrappedPartition;
return partitionProjection;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
Expand Down Expand Up @@ -206,6 +207,7 @@ private List<ManifestFile> writeManifestsForUnpartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests) {
Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);

// we rely only on the target number of manifests for unpartitioned tables
// as we should not worry about having too much metadata per partition
Expand All @@ -214,7 +216,14 @@ private List<ManifestFile> writeManifestsForUnpartitionedTable(
return manifestEntryDF
.repartition(numManifests)
.mapPartitions(
toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
toManifests(
io,
maxNumManifestEntries,
stagingLocation,
formatVersion,
combinedPartitionType,
spec,
sparkType),
manifestEncoder)
.collectAsList();
}
Expand All @@ -224,6 +233,7 @@ private List<ManifestFile> writeManifestsForPartitionedTable(

Broadcast<FileIO> io = sparkContext().broadcast(fileIO);
StructType sparkType = (StructType) manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);

// we allow the actual size of manifests to be 10% higher if the estimation is not precise
// enough
Expand All @@ -238,7 +248,13 @@ private List<ManifestFile> writeManifestsForPartitionedTable(
.sortWithinPartitions(partitionColumn)
.mapPartitions(
toManifests(
io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
io,
maxNumManifestEntries,
stagingLocation,
formatVersion,
combinedPartitionType,
spec,
sparkType),
manifestEncoder)
.collectAsList();
});
Expand Down Expand Up @@ -318,6 +334,7 @@ private static ManifestFile writeManifest(
Broadcast<FileIO> io,
String location,
int format,
Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType)
throws IOException {
Expand All @@ -327,8 +344,9 @@ private static ManifestFile writeManifest(
OutputFile outputFile =
io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));

Types.StructType dataFileType = DataFile.getType(spec.partitionType());
SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
Types.StructType combinedFileType = DataFile.getType(combinedPartitionType);
Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
SparkDataFile wrapper = new SparkDataFile(combinedFileType, manifestFileType, sparkType);

ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, outputFile, null);

Expand All @@ -352,6 +370,7 @@ private static MapPartitionsFunction<Row, ManifestFile> toManifests(
long maxNumManifestEntries,
String location,
int format,
Types.StructType combinedPartitionType,
PartitionSpec spec,
StructType sparkType) {

Expand All @@ -365,14 +384,40 @@ private static MapPartitionsFunction<Row, ManifestFile> toManifests(
List<ManifestFile> manifests = Lists.newArrayList();
if (rowsAsList.size() <= maxNumManifestEntries) {
manifests.add(
writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
writeManifest(
rowsAsList,
0,
rowsAsList.size(),
io,
location,
format,
combinedPartitionType,
spec,
sparkType));
} else {
int midIndex = rowsAsList.size() / 2;
manifests.add(
writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
writeManifest(
rowsAsList,
0,
midIndex,
io,
location,
format,
combinedPartitionType,
spec,
sparkType));
manifests.add(
writeManifest(
rowsAsList, midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
rowsAsList,
midIndex,
rowsAsList.size(),
io,
location,
format,
combinedPartitionType,
spec,
sparkType));
}

return manifests.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED;

import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.AssertHelpers;
Expand Down Expand Up @@ -193,4 +195,31 @@ public void testInvalidRewriteManifestsCases() {
"Cannot handle an empty identifier",
() -> sql("CALL %s.system.rewrite_manifests('')", catalogName));
}

@Test
public void testReplacePartitionField() {
sql(
"CREATE TABLE %s (id int, ts timestamp, day_of_ts date) USING iceberg PARTITIONED BY (day_of_ts)",
tableName);

sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version' = '2')", tableName);
sql("ALTER TABLE %s REPLACE PARTITION FIELD day_of_ts WITH days(ts)\n", tableName);
sql(
"INSERT INTO %s VALUES (1, CAST('2022-01-01 10:00:00' AS TIMESTAMP), CAST('2022-01-01' AS DATE))",
tableName);

assertEquals(
"Should have expected rows",
ImmutableList.of(
row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));

sql("CALL %s.system.rewrite_manifests(table => '%s')", catalogName, tableName);

assertEquals(
"Should have expected rows",
ImmutableList.of(
row(1, Timestamp.valueOf("2022-01-01 10:00:00"), Date.valueOf("2022-01-01"))),
sql("SELECT * FROM %s WHERE ts < current_timestamp()", tableName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.StructProjection;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.StructType;

Expand All @@ -52,13 +53,29 @@ public class SparkDataFile implements DataFile {
private final Type keyMetadataType;

private final SparkStructLike wrappedPartition;
private final StructLike partitionProjection;
private Row wrapped;

public SparkDataFile(Types.StructType type, StructType sparkType) {
this(type, null, sparkType);
}

public SparkDataFile(
Types.StructType type, Types.StructType projectedType, StructType sparkType) {
this.lowerBoundsType = type.fieldType("lower_bounds");
this.upperBoundsType = type.fieldType("upper_bounds");
this.keyMetadataType = type.fieldType("key_metadata");
this.wrappedPartition = new SparkStructLike(type.fieldType("partition").asStructType());

Types.StructType partitionType = type.fieldType("partition").asStructType();
this.wrappedPartition = new SparkStructLike(partitionType);

if (projectedType != null) {
Types.StructType projectedPartitionType = projectedType.fieldType("partition").asStructType();
this.partitionProjection =
StructProjection.create(partitionType, projectedPartitionType).wrap(wrappedPartition);
} else {
this.partitionProjection = wrappedPartition;
}

Map<String, Integer> positions = Maps.newHashMap();
type.fields()
Expand Down Expand Up @@ -115,7 +132,7 @@ public FileFormat format() {

@Override
public StructLike partition() {
return wrappedPartition;
return partitionProjection;
}

@Override
Expand Down
Loading

0 comments on commit f7b7638

Please sign in to comment.