diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 6364603c591f..aa7f2877c2f9 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -27,6 +27,7 @@ import java.util.Set; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroIterable; +import org.apache.iceberg.avro.InternalReader; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Evaluator; import org.apache.iceberg.expressions.Expression; @@ -65,16 +66,16 @@ public class ManifestReader> extends CloseableGroup "record_count"); protected enum FileType { - DATA_FILES(GenericDataFile.class.getName()), - DELETE_FILES(GenericDeleteFile.class.getName()); + DATA_FILES(GenericDataFile.class), + DELETE_FILES(GenericDeleteFile.class); - private final String fileClass; + private final Class fileClass; - FileType(String fileClass) { + FileType(Class fileClass) { this.fileClass = fileClass; } - private String fileClass() { + private Class fileClass() { return fileClass; } } @@ -261,12 +262,12 @@ private CloseableIterable> open(Schema projection) { AvroIterable> reader = Avro.read(file) .project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields))) - .rename("manifest_entry", GenericManifestEntry.class.getName()) - .rename("partition", PartitionData.class.getName()) - .rename("r102", PartitionData.class.getName()) - .rename("data_file", content.fileClass()) - .rename("r2", content.fileClass()) - .classLoader(GenericManifestEntry.class.getClassLoader()) + .createResolvingReader( + schema -> + InternalReader.create(schema) + .setRootType(GenericManifestEntry.class) + .setCustomType(ManifestEntry.DATA_FILE_ID, content.fileClass()) + .setCustomType(DataFile.PARTITION_ID, PartitionData.class)) .reuseContainers() .build(); diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java index 93bfa2398466..3d70003d39ef 100644 --- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java @@ -28,11 +28,8 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; -import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.common.DynClasses; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.Pair; @@ -43,7 +40,7 @@ public class GenericAvroReader private final Types.StructType expectedType; private ClassLoader loader = Thread.currentThread().getContextClassLoader(); private Map renames = ImmutableMap.of(); - private final Map idToConstant = ImmutableMap.of(); + private final Map idToConstant = ImmutableMap.of(); private Schema fileSchema = null; private ValueReader reader = null; @@ -111,46 +108,13 @@ private ResolvingReadBuilder(Types.StructType expectedType, String rootName) { @Override public ValueReader record(Type partner, Schema record, List> fieldResults) { - Types.StructType expected = partner != null ? partner.asStructType() : null; - Map idToPos = idToPos(expected); - - List>> readPlan = Lists.newArrayList(); - List fileFields = record.getFields(); - for (int pos = 0; pos < fileFields.size(); pos += 1) { - Schema.Field field = fileFields.get(pos); - ValueReader fieldReader = fieldResults.get(pos); - Integer fieldId = AvroSchemaUtil.fieldId(field); - Integer projectionPos = idToPos.remove(fieldId); - - Object constant = idToConstant.get(fieldId); - if (projectionPos != null && constant != null) { - readPlan.add( - Pair.of(projectionPos, ValueReaders.replaceWithConstant(fieldReader, constant))); - } else { - readPlan.add(Pair.of(projectionPos, fieldReader)); - } + if (partner == null) { + return ValueReaders.skipStruct(fieldResults); } - // handle any expected columns that are not in the data file - for (Map.Entry idAndPos : idToPos.entrySet()) { - int fieldId = idAndPos.getKey(); - int pos = idAndPos.getValue(); - - Object constant = idToConstant.get(fieldId); - Types.NestedField field = expected.field(fieldId); - if (constant != null) { - readPlan.add(Pair.of(pos, ValueReaders.constant(constant))); - } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) { - readPlan.add(Pair.of(pos, ValueReaders.constant(false))); - } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) { - readPlan.add(Pair.of(pos, ValueReaders.positions())); - } else if (field.isOptional()) { - readPlan.add(Pair.of(pos, ValueReaders.constant(null))); - } else { - throw new IllegalArgumentException( - String.format("Missing required field: %s", field.name())); - } - } + Types.StructType expected = partner.asStructType(); + List>> readPlan = + ValueReaders.buildReadPlan(expected, record, fieldResults, idToConstant); return recordReader(readPlan, avroSchemas.get(partner), record.getFullName()); } @@ -264,19 +228,5 @@ public ValueReader primitive(Type partner, Schema primitive) { throw new IllegalArgumentException("Unsupported type: " + primitive); } } - - private Map idToPos(Types.StructType struct) { - Map idToPos = Maps.newHashMap(); - - if (struct != null) { - List fields = struct.fields(); - for (int pos = 0; pos < fields.size(); pos += 1) { - Types.NestedField field = fields.get(pos); - idToPos.put(field.fieldId(), pos); - } - } - - return idToPos; - } } } diff --git a/core/src/main/java/org/apache/iceberg/avro/InternalReader.java b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java new file mode 100644 index 000000000000..5c4af66e160b --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/InternalReader.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; + +/** + * A reader that produces Iceberg's internal in-memory object model. + * + *

Iceberg's internal in-memory object model produces the types defined in {@link + * Type.TypeID#javaClass()}. + * + * @param Java type returned by the reader + */ +public class InternalReader implements DatumReader, SupportsRowPosition { + + private final Types.StructType expectedType; + private final Map> typeMap = Maps.newHashMap(); + private final Map idToConstant = ImmutableMap.of(); + private Schema fileSchema = null; + private ValueReader reader = null; + + public static InternalReader create(org.apache.iceberg.Schema schema) { + return new InternalReader<>(schema); + } + + InternalReader(org.apache.iceberg.Schema readSchema) { + this.expectedType = readSchema.asStruct(); + } + + @SuppressWarnings("unchecked") + private void initReader() { + this.reader = + (ValueReader) + AvroWithPartnerVisitor.visit( + Pair.of(-1, expectedType), + fileSchema, + new ResolvingReadBuilder(), + AccessByID.instance()); + } + + @Override + public void setSchema(Schema schema) { + this.fileSchema = schema; + initReader(); + } + + public InternalReader setRootType(Class rootClass) { + typeMap.put(-1, rootClass); + return this; + } + + public InternalReader setCustomType(int fieldId, Class structClass) { + typeMap.put(fieldId, structClass); + return this; + } + + @Override + public void setRowPositionSupplier(Supplier posSupplier) { + if (reader instanceof SupportsRowPosition) { + ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier); + } + } + + @Override + public T read(T reuse, Decoder decoder) throws IOException { + return reader.read(decoder, reuse); + } + + private class ResolvingReadBuilder + extends AvroWithPartnerVisitor, ValueReader> { + @Override + public ValueReader record( + Pair partner, Schema record, List> fieldResults) { + if (partner == null) { + return ValueReaders.skipStruct(fieldResults); + } + + Types.StructType expected = partner.second().asStructType(); + List>> readPlan = + ValueReaders.buildReadPlan(expected, record, fieldResults, idToConstant); + + return structReader(readPlan, partner.first(), expected); + } + + private ValueReader structReader( + List>> readPlan, int fieldId, Types.StructType struct) { + + Class structClass = typeMap.get(fieldId); + if (structClass != null) { + return InternalReaders.struct(struct, structClass, readPlan); + } else { + return InternalReaders.struct(struct, readPlan); + } + } + + @Override + public ValueReader union( + Pair partner, Schema union, List> options) { + return ValueReaders.union(options); + } + + @Override + public ValueReader arrayMap( + Pair partner, + Schema map, + ValueReader keyReader, + ValueReader valueReader) { + return ValueReaders.arrayMap(keyReader, valueReader); + } + + @Override + public ValueReader array( + Pair partner, Schema array, ValueReader elementReader) { + return ValueReaders.array(elementReader); + } + + @Override + public ValueReader map(Pair partner, Schema map, ValueReader valueReader) { + return ValueReaders.map(ValueReaders.strings(), valueReader); + } + + @Override + public ValueReader primitive(Pair partner, Schema primitive) { + LogicalType logicalType = primitive.getLogicalType(); + if (logicalType != null) { + switch (logicalType.getName()) { + case "date": + return ValueReaders.ints(); + + case "time-micros": + return ValueReaders.longs(); + + case "timestamp-millis": + // adjust to microseconds + ValueReader longs = ValueReaders.longs(); + return (ValueReader) (decoder, ignored) -> longs.read(decoder, null) * 1000L; + + case "timestamp-micros": + return ValueReaders.longs(); + + case "decimal": + return ValueReaders.decimal( + ValueReaders.decimalBytesReader(primitive), + ((LogicalTypes.Decimal) logicalType).getScale()); + + case "uuid": + return ValueReaders.uuids(); + + default: + throw new IllegalArgumentException("Unknown logical type: " + logicalType); + } + } + + switch (primitive.getType()) { + case NULL: + return ValueReaders.nulls(); + case BOOLEAN: + return ValueReaders.booleans(); + case INT: + if (partner != null && partner.second().typeId() == Type.TypeID.LONG) { + return ValueReaders.intsAsLongs(); + } + return ValueReaders.ints(); + case LONG: + return ValueReaders.longs(); + case FLOAT: + if (partner != null && partner.second().typeId() == Type.TypeID.DOUBLE) { + return ValueReaders.floatsAsDoubles(); + } + return ValueReaders.floats(); + case DOUBLE: + return ValueReaders.doubles(); + case STRING: + return ValueReaders.strings(); + case FIXED: + return ValueReaders.fixed(primitive); + case BYTES: + return ValueReaders.byteBuffers(); + case ENUM: + return ValueReaders.enums(primitive.getEnumSymbols()); + default: + throw new IllegalArgumentException("Unsupported type: " + primitive); + } + } + } + + private static class AccessByID + implements AvroWithPartnerVisitor.PartnerAccessors> { + private static final AccessByID INSTANCE = new AccessByID(); + + public static AccessByID instance() { + return INSTANCE; + } + + @Override + public Pair fieldPartner( + Pair partner, Integer fieldId, String name) { + Types.NestedField field = partner.second().asStructType().field(fieldId); + return field != null ? Pair.of(field.fieldId(), field.type()) : null; + } + + @Override + public Pair mapKeyPartner(Pair partner) { + Types.MapType map = partner.second().asMapType(); + return Pair.of(map.keyId(), map.keyType()); + } + + @Override + public Pair mapValuePartner(Pair partner) { + Types.MapType map = partner.second().asMapType(); + return Pair.of(map.valueId(), map.valueType()); + } + + @Override + public Pair listElementPartner(Pair partner) { + Types.ListType list = partner.second().asListType(); + return Pair.of(list.elementId(), list.elementType()); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/InternalReaders.java b/core/src/main/java/org/apache/iceberg/avro/InternalReaders.java new file mode 100644 index 000000000000..0087e234951e --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/InternalReaders.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import java.util.List; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.common.DynConstructors; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; + +class InternalReaders { + private InternalReaders() {} + + static ValueReader struct( + Types.StructType struct, List>> readPlan) { + return new RecordReader(readPlan, struct); + } + + static ValueReader struct( + Types.StructType struct, Class structClass, List>> readPlan) { + return new PlannedStructLikeReader<>(readPlan, struct, structClass); + } + + private static class PlannedStructLikeReader + extends ValueReaders.PlannedStructReader { + private final Types.StructType structType; + private final Class structClass; + private final DynConstructors.Ctor ctor; + + private PlannedStructLikeReader( + List>> readPlan, + Types.StructType structType, + Class structClass) { + super(readPlan); + this.structType = structType; + this.structClass = structClass; + this.ctor = + DynConstructors.builder(StructLike.class) + .hiddenImpl(structClass, Types.StructType.class) + .hiddenImpl(structClass) + .build(); + } + + @Override + protected S reuseOrCreate(Object reuse) { + if (structClass.isInstance(reuse)) { + return structClass.cast(reuse); + } else { + return ctor.newInstance(structType); + } + } + + @Override + protected Object get(S struct, int pos) { + return struct.get(pos, Object.class); + } + + @Override + protected void set(S struct, int pos, Object value) { + struct.set(pos, value); + } + } + + private static class RecordReader extends ValueReaders.PlannedStructReader { + private final Types.StructType structType; + + private RecordReader(List>> readPlan, Types.StructType structType) { + super(readPlan); + this.structType = structType; + } + + @Override + protected GenericRecord reuseOrCreate(Object reuse) { + if (reuse instanceof GenericRecord) { + return (GenericRecord) reuse; + } else { + return GenericRecord.create(structType); + } + } + + @Override + protected Object get(GenericRecord struct, int pos) { + return struct.get(pos); + } + + @Override + protected void set(GenericRecord struct, int pos, Object value) { + struct.set(pos, value); + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java index d530bc1854e1..94954dc49f24 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java @@ -181,6 +181,81 @@ public static ValueReader record( return new PlannedIndexedReader<>(recordSchema, recordClass, readPlan); } + public static ValueReader skipStruct(List> readers) { + return new SkipStructReader(readers); + } + + /** + * Builds a read plan for record classes that use planned reads instead of a ResolvingDecoder. + * + * @param expected expected StructType + * @param record Avro record schema + * @param fieldReaders list of readers for each field in the Avro record schema + * @param idToConstant a map of field ID to constants values + * @return a read plan that is a list of (position, reader) pairs + */ + static List>> buildReadPlan( + Types.StructType expected, + Schema record, + List> fieldReaders, + Map idToConstant) { + Map idToPos = idToPos(expected); + + List>> readPlan = Lists.newArrayList(); + List fileFields = record.getFields(); + for (int pos = 0; pos < fileFields.size(); pos += 1) { + Schema.Field field = fileFields.get(pos); + ValueReader fieldReader = fieldReaders.get(pos); + Integer fieldId = AvroSchemaUtil.fieldId(field); + Integer projectionPos = idToPos.remove(fieldId); + + Object constant = idToConstant.get(fieldId); + if (projectionPos != null && constant != null) { + readPlan.add( + Pair.of(projectionPos, ValueReaders.replaceWithConstant(fieldReader, constant))); + } else { + readPlan.add(Pair.of(projectionPos, fieldReader)); + } + } + + // handle any expected columns that are not in the data file + for (Map.Entry idAndPos : idToPos.entrySet()) { + int fieldId = idAndPos.getKey(); + int pos = idAndPos.getValue(); + + Object constant = idToConstant.get(fieldId); + Types.NestedField field = expected.field(fieldId); + if (constant != null) { + readPlan.add(Pair.of(pos, ValueReaders.constant(constant))); + } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) { + readPlan.add(Pair.of(pos, ValueReaders.constant(false))); + } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) { + readPlan.add(Pair.of(pos, ValueReaders.positions())); + } else if (field.isOptional()) { + readPlan.add(Pair.of(pos, ValueReaders.constant(null))); + } else { + throw new IllegalArgumentException( + String.format("Missing required field: %s", field.name())); + } + } + + return readPlan; + } + + private static Map idToPos(Types.StructType struct) { + Map idToPos = Maps.newHashMap(); + + if (struct != null) { + List fields = struct.fields(); + for (int pos = 0; pos < fields.size(); pos += 1) { + Types.NestedField field = fields.get(pos); + idToPos.put(field.fieldId(), pos); + } + } + + return idToPos; + } + private static class NullReader implements ValueReader { private static final NullReader INSTANCE = new NullReader(); @@ -777,6 +852,27 @@ public void skip(Decoder decoder) throws IOException { } } + private static class SkipStructReader implements ValueReader { + private final ValueReader[] readers; + + private SkipStructReader(List> readers) { + this.readers = readers.toArray(ValueReader[]::new); + } + + @Override + public Void read(Decoder decoder, Object reuse) throws IOException { + skip(decoder); + return null; + } + + @Override + public void skip(Decoder decoder) throws IOException { + for (ValueReader reader : readers) { + reader.skip(decoder); + } + } + } + public abstract static class PlannedStructReader implements ValueReader, SupportsRowPosition { private final ValueReader[] readers; diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java index c9d0e292706e..e45415f1f2d2 100644 --- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -44,7 +44,8 @@ public class TestManifestReader extends TestBase { "fileOrdinal", "fileSequenceNumber", "fromProjectionPos", - "manifestLocation") + "manifestLocation", + "partitionData.partitionType.fieldsById") .build(); @TestTemplate