Skip to content

Commit

Permalink
Core: Add Avro readers for Iceberg's internal object model.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue committed Sep 18, 2024
1 parent bbeadea commit 6b6cd1b
Show file tree
Hide file tree
Showing 6 changed files with 476 additions and 68 deletions.
23 changes: 12 additions & 11 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.avro.InternalReader;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Evaluator;
import org.apache.iceberg.expressions.Expression;
Expand Down Expand Up @@ -65,16 +66,16 @@ public class ManifestReader<F extends ContentFile<F>> extends CloseableGroup
"record_count");

protected enum FileType {
DATA_FILES(GenericDataFile.class.getName()),
DELETE_FILES(GenericDeleteFile.class.getName());
DATA_FILES(GenericDataFile.class),
DELETE_FILES(GenericDeleteFile.class);

private final String fileClass;
private final Class<? extends StructLike> fileClass;

FileType(String fileClass) {
FileType(Class<? extends StructLike> fileClass) {
this.fileClass = fileClass;
}

private String fileClass() {
private Class<? extends StructLike> fileClass() {
return fileClass;
}
}
Expand Down Expand Up @@ -261,12 +262,12 @@ private CloseableIterable<ManifestEntry<F>> open(Schema projection) {
AvroIterable<ManifestEntry<F>> reader =
Avro.read(file)
.project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
.rename("manifest_entry", GenericManifestEntry.class.getName())
.rename("partition", PartitionData.class.getName())
.rename("r102", PartitionData.class.getName())
.rename("data_file", content.fileClass())
.rename("r2", content.fileClass())
.classLoader(GenericManifestEntry.class.getClassLoader())
.createResolvingReader(
schema ->
InternalReader.create(schema)
.setRootType(GenericManifestEntry.class)
.setCustomType(ManifestEntry.DATA_FILE_ID, content.fileClass())
.setCustomType(DataFile.PARTITION_ID, PartitionData.class))
.reuseContainers()
.build();

Expand Down
62 changes: 6 additions & 56 deletions core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,8 @@
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;
Expand All @@ -43,7 +40,7 @@ public class GenericAvroReader<T>
private final Types.StructType expectedType;
private ClassLoader loader = Thread.currentThread().getContextClassLoader();
private Map<String, String> renames = ImmutableMap.of();
private final Map<Integer, ?> idToConstant = ImmutableMap.of();
private final Map<Integer, Object> idToConstant = ImmutableMap.of();
private Schema fileSchema = null;
private ValueReader<T> reader = null;

Expand Down Expand Up @@ -111,46 +108,13 @@ private ResolvingReadBuilder(Types.StructType expectedType, String rootName) {

@Override
public ValueReader<?> record(Type partner, Schema record, List<ValueReader<?>> fieldResults) {
Types.StructType expected = partner != null ? partner.asStructType() : null;
Map<Integer, Integer> idToPos = idToPos(expected);

List<Pair<Integer, ValueReader<?>>> readPlan = Lists.newArrayList();
List<Schema.Field> fileFields = record.getFields();
for (int pos = 0; pos < fileFields.size(); pos += 1) {
Schema.Field field = fileFields.get(pos);
ValueReader<?> fieldReader = fieldResults.get(pos);
Integer fieldId = AvroSchemaUtil.fieldId(field);
Integer projectionPos = idToPos.remove(fieldId);

Object constant = idToConstant.get(fieldId);
if (projectionPos != null && constant != null) {
readPlan.add(
Pair.of(projectionPos, ValueReaders.replaceWithConstant(fieldReader, constant)));
} else {
readPlan.add(Pair.of(projectionPos, fieldReader));
}
if (partner == null) {
return ValueReaders.skipStruct(fieldResults);
}

// handle any expected columns that are not in the data file
for (Map.Entry<Integer, Integer> idAndPos : idToPos.entrySet()) {
int fieldId = idAndPos.getKey();
int pos = idAndPos.getValue();

Object constant = idToConstant.get(fieldId);
Types.NestedField field = expected.field(fieldId);
if (constant != null) {
readPlan.add(Pair.of(pos, ValueReaders.constant(constant)));
} else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) {
readPlan.add(Pair.of(pos, ValueReaders.constant(false)));
} else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) {
readPlan.add(Pair.of(pos, ValueReaders.positions()));
} else if (field.isOptional()) {
readPlan.add(Pair.of(pos, ValueReaders.constant(null)));
} else {
throw new IllegalArgumentException(
String.format("Missing required field: %s", field.name()));
}
}
Types.StructType expected = partner.asStructType();
List<Pair<Integer, ValueReader<?>>> readPlan =
ValueReaders.buildReadPlan(expected, record, fieldResults, idToConstant);

return recordReader(readPlan, avroSchemas.get(partner), record.getFullName());
}
Expand Down Expand Up @@ -264,19 +228,5 @@ public ValueReader<?> primitive(Type partner, Schema primitive) {
throw new IllegalArgumentException("Unsupported type: " + primitive);
}
}

private Map<Integer, Integer> idToPos(Types.StructType struct) {
Map<Integer, Integer> idToPos = Maps.newHashMap();

if (struct != null) {
List<Types.NestedField> fields = struct.fields();
for (int pos = 0; pos < fields.size(); pos += 1) {
Types.NestedField field = fields.get(pos);
idToPos.put(field.fieldId(), pos);
}
}

return idToPos;
}
}
}
251 changes: 251 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/InternalReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.avro;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.Pair;

/**
* A reader that produces Iceberg's internal in-memory object model.
*
* <p>Iceberg's internal in-memory object model produces the types defined in {@link
* Type.TypeID#javaClass()}.
*
* @param <T> Java type returned by the reader
*/
public class InternalReader<T> implements DatumReader<T>, SupportsRowPosition {

private final Types.StructType expectedType;
private final Map<Integer, Class<? extends StructLike>> typeMap = Maps.newHashMap();
private final Map<Integer, Object> idToConstant = ImmutableMap.of();
private Schema fileSchema = null;
private ValueReader<T> reader = null;

public static <D> InternalReader<D> create(org.apache.iceberg.Schema schema) {
return new InternalReader<>(schema);
}

InternalReader(org.apache.iceberg.Schema readSchema) {
this.expectedType = readSchema.asStruct();
}

@SuppressWarnings("unchecked")
private void initReader() {
this.reader =
(ValueReader<T>)
AvroWithPartnerVisitor.visit(
Pair.of(-1, expectedType),
fileSchema,
new ResolvingReadBuilder(),
AccessByID.instance());
}

@Override
public void setSchema(Schema schema) {
this.fileSchema = schema;
initReader();
}

public InternalReader<T> setRootType(Class<? extends StructLike> rootClass) {
typeMap.put(-1, rootClass);
return this;
}

public InternalReader<T> setCustomType(int fieldId, Class<? extends StructLike> structClass) {
typeMap.put(fieldId, structClass);
return this;
}

@Override
public void setRowPositionSupplier(Supplier<Long> posSupplier) {
if (reader instanceof SupportsRowPosition) {
((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier);
}
}

@Override
public T read(T reuse, Decoder decoder) throws IOException {
return reader.read(decoder, reuse);
}

private class ResolvingReadBuilder
extends AvroWithPartnerVisitor<Pair<Integer, Type>, ValueReader<?>> {
@Override
public ValueReader<?> record(
Pair<Integer, Type> partner, Schema record, List<ValueReader<?>> fieldResults) {
if (partner == null) {
return ValueReaders.skipStruct(fieldResults);
}

Types.StructType expected = partner.second().asStructType();
List<Pair<Integer, ValueReader<?>>> readPlan =
ValueReaders.buildReadPlan(expected, record, fieldResults, idToConstant);

return structReader(readPlan, partner.first(), expected);
}

private ValueReader<?> structReader(
List<Pair<Integer, ValueReader<?>>> readPlan, int fieldId, Types.StructType struct) {

Class<? extends StructLike> structClass = typeMap.get(fieldId);
if (structClass != null) {
return InternalReaders.struct(struct, structClass, readPlan);
} else {
return InternalReaders.struct(struct, readPlan);
}
}

@Override
public ValueReader<?> union(
Pair<Integer, Type> partner, Schema union, List<ValueReader<?>> options) {
return ValueReaders.union(options);
}

@Override
public ValueReader<?> arrayMap(
Pair<Integer, Type> partner,
Schema map,
ValueReader<?> keyReader,
ValueReader<?> valueReader) {
return ValueReaders.arrayMap(keyReader, valueReader);
}

@Override
public ValueReader<?> array(
Pair<Integer, Type> partner, Schema array, ValueReader<?> elementReader) {
return ValueReaders.array(elementReader);
}

@Override
public ValueReader<?> map(Pair<Integer, Type> partner, Schema map, ValueReader<?> valueReader) {
return ValueReaders.map(ValueReaders.strings(), valueReader);
}

@Override
public ValueReader<?> primitive(Pair<Integer, Type> partner, Schema primitive) {
LogicalType logicalType = primitive.getLogicalType();
if (logicalType != null) {
switch (logicalType.getName()) {
case "date":
return ValueReaders.ints();

case "time-micros":
return ValueReaders.longs();

case "timestamp-millis":
// adjust to microseconds
ValueReader<Long> longs = ValueReaders.longs();
return (ValueReader<Long>) (decoder, ignored) -> longs.read(decoder, null) * 1000L;

case "timestamp-micros":
return ValueReaders.longs();

case "decimal":
return ValueReaders.decimal(
ValueReaders.decimalBytesReader(primitive),
((LogicalTypes.Decimal) logicalType).getScale());

case "uuid":
return ValueReaders.uuids();

default:
throw new IllegalArgumentException("Unknown logical type: " + logicalType);
}
}

switch (primitive.getType()) {
case NULL:
return ValueReaders.nulls();
case BOOLEAN:
return ValueReaders.booleans();
case INT:
if (partner != null && partner.second().typeId() == Type.TypeID.LONG) {
return ValueReaders.intsAsLongs();
}
return ValueReaders.ints();
case LONG:
return ValueReaders.longs();
case FLOAT:
if (partner != null && partner.second().typeId() == Type.TypeID.DOUBLE) {
return ValueReaders.floatsAsDoubles();
}
return ValueReaders.floats();
case DOUBLE:
return ValueReaders.doubles();
case STRING:
return ValueReaders.strings();
case FIXED:
return ValueReaders.fixed(primitive);
case BYTES:
return ValueReaders.byteBuffers();
case ENUM:
return ValueReaders.enums(primitive.getEnumSymbols());
default:
throw new IllegalArgumentException("Unsupported type: " + primitive);
}
}
}

private static class AccessByID
implements AvroWithPartnerVisitor.PartnerAccessors<Pair<Integer, Type>> {
private static final AccessByID INSTANCE = new AccessByID();

public static AccessByID instance() {
return INSTANCE;
}

@Override
public Pair<Integer, Type> fieldPartner(
Pair<Integer, Type> partner, Integer fieldId, String name) {
Types.NestedField field = partner.second().asStructType().field(fieldId);
return field != null ? Pair.of(field.fieldId(), field.type()) : null;
}

@Override
public Pair<Integer, Type> mapKeyPartner(Pair<Integer, Type> partner) {
Types.MapType map = partner.second().asMapType();
return Pair.of(map.keyId(), map.keyType());
}

@Override
public Pair<Integer, Type> mapValuePartner(Pair<Integer, Type> partner) {
Types.MapType map = partner.second().asMapType();
return Pair.of(map.valueId(), map.valueType());
}

@Override
public Pair<Integer, Type> listElementPartner(Pair<Integer, Type> partner) {
Types.ListType list = partner.second().asListType();
return Pair.of(list.elementId(), list.elementType());
}
}
}
Loading

0 comments on commit 6b6cd1b

Please sign in to comment.