Skip to content

Commit

Permalink
Update for rebase after apache#12167.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue committed Feb 5, 2025
1 parent 25b5045 commit 215e4fd
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static void assertEqual(VariantValue expected, VariantValue actual) {
assertThat(expected).isNotNull();
assertThat(actual.type()).as("Variant type should match").isEqualTo(expected.type());

if (expected.type() == Variants.PhysicalType.OBJECT) {
if (expected.type() == PhysicalType.OBJECT) {
VariantObject expectedObject = expected.asObject();
VariantObject actualObject = actual.asObject();
assertThat(actualObject.numFields())
Expand All @@ -61,7 +61,7 @@ public static void assertEqual(VariantValue expected, VariantValue actual) {
assertEqual(expectedObject.get(fieldName), actualObject.get(fieldName));
}

} else if (expected.type() == Variants.PhysicalType.ARRAY) {
} else if (expected.type() == PhysicalType.ARRAY) {
VariantArray expectedArray = expected.asArray();
VariantArray actualArray = actual.asArray();
assertThat(actualArray.numElements())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.variants.PhysicalType;
import org.apache.iceberg.variants.ShreddedObject;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantObject;
import org.apache.iceberg.variants.VariantValue;
import org.apache.iceberg.variants.Variants;
import org.apache.iceberg.variants.Variants.PhysicalType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.iceberg.parquet.ParquetVariantReaders.VariantValueReader;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.variants.Variants;
import org.apache.iceberg.variants.PhysicalType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
Expand Down Expand Up @@ -101,23 +101,23 @@ public VariantValueReader primitive(PrimitiveType primitive) {
switch (primitive.getPrimitiveTypeName()) {
case BINARY:
return ParquetVariantReaders.asVariant(
Variants.PhysicalType.BINARY, ParquetValueReaders.byteBuffers(desc));
PhysicalType.BINARY, ParquetValueReaders.byteBuffers(desc));
case BOOLEAN:
// the actual boolean type will be fixed in PrimitiveWrapper
return ParquetVariantReaders.asVariant(
Variants.PhysicalType.BOOLEAN_TRUE, ParquetValueReaders.unboxed(desc));
PhysicalType.BOOLEAN_TRUE, ParquetValueReaders.unboxed(desc));
case INT32:
return ParquetVariantReaders.asVariant(
Variants.PhysicalType.INT32, ParquetValueReaders.unboxed(desc));
PhysicalType.INT32, ParquetValueReaders.unboxed(desc));
case INT64:
return ParquetVariantReaders.asVariant(
Variants.PhysicalType.INT64, ParquetValueReaders.unboxed(desc));
PhysicalType.INT64, ParquetValueReaders.unboxed(desc));
case FLOAT:
return ParquetVariantReaders.asVariant(
Variants.PhysicalType.FLOAT, ParquetValueReaders.unboxed(desc));
PhysicalType.FLOAT, ParquetValueReaders.unboxed(desc));
case DOUBLE:
return ParquetVariantReaders.asVariant(
Variants.PhysicalType.DOUBLE, ParquetValueReaders.unboxed(desc));
PhysicalType.DOUBLE, ParquetValueReaders.unboxed(desc));
}
}

Expand Down Expand Up @@ -178,15 +178,14 @@ private LogicalTypeToVariantReader(ColumnDescriptor desc) {
@Override
public Optional<VariantValueReader> visit(StringLogicalTypeAnnotation ignored) {
VariantValueReader reader =
ParquetVariantReaders.asVariant(
Variants.PhysicalType.STRING, ParquetValueReaders.strings(desc));
ParquetVariantReaders.asVariant(PhysicalType.STRING, ParquetValueReaders.strings(desc));

return Optional.of(reader);
}

@Override
public Optional<VariantValueReader> visit(DecimalLogicalTypeAnnotation logical) {
Variants.PhysicalType variantType = variantDecimalType(desc.getPrimitiveType());
PhysicalType variantType = variantDecimalType(desc.getPrimitiveType());
VariantValueReader reader =
ParquetVariantReaders.asVariant(variantType, ParquetValueReaders.bigDecimals(desc));

Expand All @@ -196,18 +195,15 @@ public Optional<VariantValueReader> visit(DecimalLogicalTypeAnnotation logical)
@Override
public Optional<VariantValueReader> visit(DateLogicalTypeAnnotation ignored) {
VariantValueReader reader =
ParquetVariantReaders.asVariant(
Variants.PhysicalType.DATE, ParquetValueReaders.unboxed(desc));
ParquetVariantReaders.asVariant(PhysicalType.DATE, ParquetValueReaders.unboxed(desc));

return Optional.of(reader);
}

@Override
public Optional<VariantValueReader> visit(TimestampLogicalTypeAnnotation logical) {
Variants.PhysicalType variantType =
logical.isAdjustedToUTC()
? Variants.PhysicalType.TIMESTAMPTZ
: Variants.PhysicalType.TIMESTAMPNTZ;
PhysicalType variantType =
logical.isAdjustedToUTC() ? PhysicalType.TIMESTAMPTZ : PhysicalType.TIMESTAMPNTZ;

VariantValueReader reader =
ParquetVariantReaders.asVariant(variantType, ParquetValueReaders.timestamps(desc));
Expand All @@ -227,22 +223,22 @@ public Optional<VariantValueReader> visit(IntLogicalTypeAnnotation logical) {
case 64:
reader =
ParquetVariantReaders.asVariant(
Variants.PhysicalType.INT64, ParquetValueReaders.unboxed(desc));
PhysicalType.INT64, ParquetValueReaders.unboxed(desc));
break;
case 32:
reader =
ParquetVariantReaders.asVariant(
Variants.PhysicalType.INT32, ParquetValueReaders.unboxed(desc));
PhysicalType.INT32, ParquetValueReaders.unboxed(desc));
break;
case 16:
reader =
ParquetVariantReaders.asVariant(
Variants.PhysicalType.INT16, ParquetValueReaders.intsAsShort(desc));
PhysicalType.INT16, ParquetValueReaders.intsAsShort(desc));
break;
case 8:
reader =
ParquetVariantReaders.asVariant(
Variants.PhysicalType.INT8, ParquetValueReaders.intsAsByte(desc));
PhysicalType.INT8, ParquetValueReaders.intsAsByte(desc));
break;
default:
throw new IllegalArgumentException("Invalid bit width for int: " + logical.getBitWidth());
Expand All @@ -251,15 +247,15 @@ public Optional<VariantValueReader> visit(IntLogicalTypeAnnotation logical) {
return Optional.of(reader);
}

private static Variants.PhysicalType variantDecimalType(PrimitiveType primitive) {
private static PhysicalType variantDecimalType(PrimitiveType primitive) {
switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
case BINARY:
return Variants.PhysicalType.DECIMAL16;
return PhysicalType.DECIMAL16;
case INT64:
return Variants.PhysicalType.DECIMAL8;
return PhysicalType.DECIMAL8;
case INT32:
return Variants.PhysicalType.DECIMAL4;
return PhysicalType.DECIMAL4;
}

throw new IllegalArgumentException("Invalid primitive type for decimal: " + primitive);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.iceberg.types.Types.VariantType;
import org.apache.iceberg.variants.PhysicalType;
import org.apache.iceberg.variants.ShreddedObject;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantMetadata;
Expand Down Expand Up @@ -87,7 +88,7 @@ public class TestVariantReaders {
TEST_METADATA_BUFFER,
ImmutableMap.of(
"a", Variants.ofNull(),
"d", Variants.of(Variants.PhysicalType.STRING, "iceberg")));
"d", Variants.of(PhysicalType.STRING, "iceberg")));

private static final VariantMetadata EMPTY_METADATA =
Variants.metadata(VariantTestUtil.emptyMetadata());
Expand Down Expand Up @@ -143,7 +144,7 @@ public void testUnshreddedVariants(VariantMetadata metadata, VariantValue expect
MessageType parquetSchema = parquetSchema(variantType);

GenericRecord variant =
record(variantType, Map.of("metadata", metadata.buffer(), "value", serialize(expected)));
record(variantType, Map.of("metadata", serialize(metadata), "value", serialize(expected)));
GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant));

Record actual = writeAndRead(parquetSchema, record);
Expand All @@ -164,7 +165,7 @@ public void testUnshreddedVariantsWithShreddedSchema(
MessageType parquetSchema = parquetSchema(variantType);

GenericRecord variant =
record(variantType, Map.of("metadata", metadata.buffer(), "value", serialize(expected)));
record(variantType, Map.of("metadata", serialize(metadata), "value", serialize(expected)));
GenericRecord record = record(parquetSchema, Map.of("id", 1, "var", variant));

Record actual = writeAndRead(parquetSchema, record);
Expand All @@ -179,7 +180,7 @@ public void testUnshreddedVariantsWithShreddedSchema(
@ParameterizedTest
@FieldSource("PRIMITIVES")
public void testShreddedVariantPrimitives(VariantPrimitive<?> primitive) throws IOException {
Assumptions.assumeThat(primitive.type() != Variants.PhysicalType.NULL)
Assumptions.assumeThat(primitive.type() != PhysicalType.NULL)
.as("Null is not a shredded type")
.isTrue();

Expand Down Expand Up @@ -888,6 +889,12 @@ private static ByteBuffer serialize(VariantValue value) {
return buffer;
}

private static ByteBuffer serialize(VariantMetadata metadata) {
ByteBuffer buffer = ByteBuffer.allocate(metadata.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN);
metadata.writeTo(buffer, 0);
return buffer;
}

/** Creates an Avro record from a map of field name to value. */
private static GenericRecord record(GroupType type, Map<String, Object> fields) {
GenericRecord record = new GenericData.Record(avroSchema(type));
Expand Down

0 comments on commit 215e4fd

Please sign in to comment.