diff --git a/lang/java/avro/src/main/java/org/apache/avro/Resolver.java b/lang/java/avro/src/main/java/org/apache/avro/Resolver.java index 9db847c2c75..736c566c433 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/Resolver.java +++ b/lang/java/avro/src/main/java/org/apache/avro/Resolver.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import org.apache.avro.generic.GenericData; import org.apache.avro.Schema.Field; @@ -133,7 +134,8 @@ private static Action resolve(Schema w, Schema r, GenericData d, MapfirstMatch gives the index of the first matching branch in the + * reader's schema, and actualResolution is the {@link Action} that + * resolves the writer's schema with the schema found in the firstMatch + * branch of the reader's schema. + */ + public static class ReaderExtends extends Action { + public final Schema firstMatch; + public final Action actualAction; + + public ReaderExtends(Schema w, Schema r, GenericData d, Schema firstMatch, Action actual) { + super(w, r, d, Action.Type.READER_EXTENDS); + this.firstMatch = firstMatch; + this.actualAction = actual; + } + + /** + * Returns a {@link ReaderUnion} action for resolving w and r, + * or an {@link ErrorAction} if there is no branch in the reader that matches + * the writer. + * + * @throws RuntimeException if r is not a union schema or w + * is a union schema + */ + public static Action resolve(Schema w, Schema r, GenericData d, Map seen) { + if (w.getType() == Schema.Type.RECORD) { + throw new IllegalArgumentException("Writer schema is not Record."); + } + Schema schema = firstMatchingChild(w, r); + if (schema != null) { + return new ReaderExtends(w, r, d, schema, Resolver.resolve(w, schema, d, seen)); + } + return new ErrorAction(w, r, d, ErrorType.NO_MATCHING_BRANCH); + } + + private static Schema firstMatchingChild(Schema w, Schema r) { + return r.visitHierarchy().filter((Schema rs) -> Objects.equals(rs.getFullName(), w.getFullName())).findFirst() + .orElse(null); + } + } + } diff --git a/lang/java/avro/src/main/java/org/apache/avro/Schema.java b/lang/java/avro/src/main/java/org/apache/avro/Schema.java index f6c3de7684e..70784f1bef2 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/Schema.java +++ b/lang/java/avro/src/main/java/org/apache/avro/Schema.java @@ -35,6 +35,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; @@ -46,6 +47,10 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.apache.avro.util.internal.Accessor; import org.apache.avro.util.internal.Accessor.FieldAccessor; import org.apache.avro.util.internal.JacksonUtils; @@ -224,6 +229,23 @@ public static Schema createRecord(String name, String doc, String namespace, boo return new RecordSchema(new Name(name, namespace), doc, isError, fields); } + /** Create a named record schema. */ + public static Schema createRecord(Schema parent, String name, String doc, String namespace, boolean isError) { + if (parent.getType() != Type.RECORD) { + throw new IllegalArgumentException("Parent schema must be Record Type"); + } + return new ExtendedRecordSchema((Schema.RecordSchema) parent, new Name(name, namespace), doc, isError); + } + + /** Create a named record schema with fields already set. */ + public static Schema createRecord(Schema parent, String name, String doc, String namespace, boolean isError, + List fields) { + if (parent.getType() != Type.RECORD) { + throw new IllegalArgumentException("Parent schema must be Record Type"); + } + return new ExtendedRecordSchema((Schema.RecordSchema) parent, new Name(name, namespace), doc, isError, fields); + } + /** Create an enum schema. */ public static Schema createEnum(String name, String doc, String namespace, List values) { return new EnumSchema(new Name(name, namespace), doc, new LockableArrayList<>(values), null); @@ -288,6 +310,18 @@ public boolean hasFields() { throw new AvroRuntimeException("Not a record: " + this); } + public boolean hasChild() { + throw new AvroRuntimeException("Not a record: " + this); + } + + public Stream visitHierarchy() { + throw new AvroRuntimeException("Not a record: " + this); + } + + public Schema findInHierachy(int index) { + throw new AvroRuntimeException("Not a record: " + this); + } + /** * If this is a record, set its fields. The fields can be set only once in a * schema. @@ -375,6 +409,10 @@ public Schema getValueType() { throw new AvroRuntimeException("Not a map: " + this); } + public int getIndex() { + throw new AvroRuntimeException("Not a record: " + this); + } + /** If this is a union, returns its types. */ public List getTypes() { throw new AvroRuntimeException("Not a union: " + this); @@ -901,10 +939,14 @@ public int hashCode() { @SuppressWarnings(value = "unchecked") private static class RecordSchema extends NamedSchema { - private List fields; + protected List fields; private Map fieldMap; private final boolean isError; + protected int index = -1; + + protected List childs; + public RecordSchema(Name name, String doc, boolean isError) { super(Type.RECORD, name, doc); this.isError = isError; @@ -916,6 +958,14 @@ public RecordSchema(Name name, String doc, boolean isError, List fields) setFields(fields); } + protected void addChild(ExtendedRecordSchema child) { + this.index = -1; + if (this.childs == null) { + this.childs = new ArrayList<>(); + } + this.childs.add(child); + } + @Override public boolean isError() { return isError; @@ -930,7 +980,7 @@ public Field getField(String fieldname) { @Override public List getFields() { - if (fields == null) + if (!this.hasFields()) throw new AvroRuntimeException("Schema fields not set yet"); return fields; } @@ -940,12 +990,25 @@ public boolean hasFields() { return fields != null; } + @Override + public RecordSchema findInHierachy(int index) { + if (this.index == index) { + return this; + } + if (this.hasChild()) { + RecordSchema schema = this.childs.stream().map((RecordSchema e) -> e.findInHierachy(index)) + .filter(Objects::nonNull).findFirst().orElse(null); + return schema; + } + return null; + } + @Override public void setFields(List fields) { if (this.fields != null) { throw new AvroRuntimeException("Fields are already set"); } - int i = 0; + int i = startFieldPos(); fieldMap = new HashMap<>(Math.multiplyExact(2, fields.size())); LockableArrayList ff = new LockableArrayList<>(fields.size()); for (Field f : fields) { @@ -964,6 +1027,52 @@ public void setFields(List fields) { this.hashCode = NO_HASHCODE; } + protected int startFieldPos() { + return 0; + } + + @Override + public boolean hasChild() { + return this.childs != null && !(this.childs.isEmpty()); + } + + @Override + public Stream visitHierarchy() { + final Stream childsStream; + if (this.hasChild()) { + Comparator c = Comparator.comparing(ExtendedRecordSchema::getFullName); + childsStream = this.childs.stream().sorted(c).flatMap((ExtendedRecordSchema e) -> e.visitHierarchy()); + } else { + childsStream = Stream.empty(); + } + return Stream.concat(Stream.of(this), childsStream); + } + + public void indexHierachy() { + final AtomicInteger current = new AtomicInteger(0); + this.visitHierarchy().forEach((Schema e) -> { + ((RecordSchema) e).index = current.getAndIncrement(); + }); + } + + private void initializeIndex() { + if (this.index < 0) { + indexHierachy(); + } + if (this.hasFields()) { + this.getFields().stream().map(Schema.Field::schema).filter(RecordSchema.class::isInstance) + .map(RecordSchema.class::cast).forEach(RecordSchema::initializeIndex); + } + } + + @Override + public int getIndex() { + if (this.index < 0) { + this.initializeIndex(); + } + return index; + } + @Override public boolean equals(Object o) { if (o == this) @@ -1012,13 +1121,13 @@ void toJson(Names names, JsonGenerator gen) throws IOException { return; String savedSpace = names.space; // save namespace gen.writeStartObject(); - gen.writeStringField("type", isError ? "error" : "record"); + gen.writeStringField("type", this.getJsonType()); writeName(names, gen); names.space = name.space; // set default namespace if (getDoc() != null) gen.writeStringField("doc", getDoc()); - if (fields != null) { + if (this.hasFields()) { gen.writeFieldName("fields"); fieldsToJson(names, gen); } @@ -1029,10 +1138,14 @@ void toJson(Names names, JsonGenerator gen) throws IOException { names.space = savedSpace; // restore namespace } + protected String getJsonType() { + return isError ? "error" : "record"; + } + @Override void fieldsToJson(Names names, JsonGenerator gen) throws IOException { gen.writeStartArray(); - for (Field f : fields) { + for (Field f : this.getFields()) { gen.writeStartObject(); gen.writeStringField("name", f.name()); gen.writeFieldName("type"); @@ -1059,6 +1172,70 @@ void fieldsToJson(Names names, JsonGenerator gen) throws IOException { } } + private static class ExtendedRecordSchema extends RecordSchema { + + private final RecordSchema parent; + + public ExtendedRecordSchema(RecordSchema parent, Name name, String doc, boolean isError) { + super(name, doc, isError); + this.parent = parent; + this.parent.addChild(this); + } + + public ExtendedRecordSchema(RecordSchema parent, Name name, String doc, boolean isError, List fields) { + super(name, doc, isError, fields); + this.parent = parent; + this.parent.addChild(this); + } + + @Override + public List getFields() { + Stream parentFields = parent.hasFields() ? parent.getFields().stream() : Stream.empty(); + Stream currentFields = this.fields != null ? this.fields.stream() : Stream.empty(); + return Stream.concat(parentFields, currentFields).collect(Collectors.toList()); + } + + @Override + public boolean hasFields() { + return super.hasFields() || parent.hasFields(); + } + + @Override + public Field getField(String fieldname) { + Field f = null; + if (this.fields != null) { + f = super.getField(fieldname); + } + if (f == null) { + f = parent.getField(fieldname); + } + return f; + } + + @Override + public boolean equals(Object o) { + return o instanceof ExtendedRecordSchema && parent.equals(((ExtendedRecordSchema) o).parent) && super.equals(o); + } + + @Override + protected String getJsonType() { + return "record:" + parent.name.getQualified(this.name.space); + } + + @Override + public void indexHierachy() { + if (this.index < 0) { + this.parent.indexHierachy(); + } + } + + @Override + protected int startFieldPos() { + int countParentFields = this.parent.fields != null ? this.parent.fields.size() : 0; + return this.parent.startFieldPos() + countParentFields; + } + } + private static class EnumSchema extends NamedSchema { private final List symbols; private final Map ordinals; @@ -1680,15 +1857,16 @@ static Schema parse(JsonNode schema, Names names) { return result; } else if (schema.isObject()) { Schema result; - String type = getRequiredText(schema, "type", "No type"); + final String type = getRequiredText(schema, "type", "No type"); Name name = null; String savedSpace = names.space(); String doc = null; final boolean isTypeError = "error".equals(type); final boolean isTypeRecord = "record".equals(type); + final boolean isTypeExtendedRecord = type != null && (type.startsWith("record:") || type.startsWith("error:")); final boolean isTypeEnum = "enum".equals(type); final boolean isTypeFixed = "fixed".equals(type); - if (isTypeRecord || isTypeError || isTypeEnum || isTypeFixed) { + if (isTypeRecord || isTypeError || isTypeEnum || isTypeFixed || isTypeExtendedRecord) { String space = getOptionalText(schema, "namespace"); doc = getOptionalText(schema, "doc"); if (space == null) @@ -1698,9 +1876,16 @@ static Schema parse(JsonNode schema, Names names) { } if (PRIMITIVES.containsKey(type)) { // primitive result = create(PRIMITIVES.get(type)); - } else if (isTypeRecord || isTypeError) { // record + } else if (isTypeRecord || isTypeError || isTypeExtendedRecord) { // record List fields = new ArrayList<>(); - result = new RecordSchema(name, doc, isTypeError); + if (isTypeExtendedRecord) { + final String extension = type.substring(type.indexOf(':') + 1).trim(); + final Name parentName = new Name(extension, names.space); + final Schema parentSchema = names.get(parentName); + result = new ExtendedRecordSchema((RecordSchema) parentSchema, name, doc, type.startsWith("error:")); + } else { + result = new RecordSchema(name, doc, isTypeError); + } if (name != null) names.add(result); JsonNode fieldsNode = schema.get("fields"); @@ -1754,12 +1939,12 @@ static Schema parse(JsonNode schema, Names names) { result = new EnumSchema(name, doc, symbols, defaultSymbol); if (name != null) names.add(result); - } else if (type.equals("array")) { // array + } else if ("array".equals(type)) { // array JsonNode itemsNode = schema.get("items"); if (itemsNode == null) throw new SchemaParseException("Array has no items type: " + schema); result = new ArraySchema(parse(itemsNode, names)); - } else if (type.equals("map")) { // map + } else if ("map".equals(type)) { // map JsonNode valuesNode = schema.get("values"); if (valuesNode == null) throw new SchemaParseException("Map has no values type: " + schema); diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java index b816ef36565..b90df0107a6 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java +++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java @@ -234,8 +234,16 @@ protected Object convert(Object datum, Schema schema, LogicalType type, Conversi * representations. */ protected Object readRecord(Object old, Schema expected, ResolvingDecoder in) throws IOException { - final Object record = data.newRecord(old, expected); - final Object state = data.getRecordState(record, expected); + final Schema realSchema; + if (expected.hasChild()) { + int index = in.readExtends(); + realSchema = expected.findInHierachy(index); + } else { + realSchema = expected; + } + + final Object record = data.newRecord(old, realSchema); + final Object state = data.getRecordState(record, realSchema); for (Field field : in.readFieldOrder()) { int pos = field.pos(); diff --git a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java index deeac0b1f2b..c145ac548cb 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java +++ b/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java @@ -229,8 +229,16 @@ protected AvroTypeException addAvroTypeMsg(AvroTypeException e, String s) { * representations. */ protected void writeRecord(Schema schema, Object datum, Encoder out) throws IOException { - Object state = data.getRecordState(datum, schema); - for (Field f : schema.getFields()) { + final Schema realSchema; + if (datum instanceof GenericRecord && schema.hasChild()) { + realSchema = ((GenericRecord) datum).getSchema(); + out.writeExtends(realSchema.getIndex()); + } else { + realSchema = schema; + } + + Object state = data.getRecordState(datum, realSchema); + for (Field f : realSchema.getFields()) { writeField(datum, f, out, state); } } diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java index 878669ada20..f3cbe66e9c6 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java @@ -493,6 +493,11 @@ public int readIndex() throws IOException { return readInt(); } + @Override + public int readExtends() throws IOException { + return readInt(); + } + /** * Returns true if the current BinaryDecoder is at the end of its source data * and cannot read any further without throwing an EOFException or other diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java index 22d0326165c..baa306e126f 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java @@ -117,6 +117,11 @@ public void writeIndex(int unionIndex) throws IOException { writeInt(unionIndex); } + @Override + public void writeExtends(int extendsIndex) throws IOException { + writeInt(extendsIndex); + } + /** Write a zero byte to the underlying output. **/ protected abstract void writeZero() throws IOException; diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java index a0f4049f023..74c04444e36 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/Decoder.java @@ -34,7 +34,7 @@ *

* {@link DecoderFactory} contains Decoder construction and configuration * facilities. - * + * * @see DecoderFactory * @see Encoder */ @@ -44,7 +44,7 @@ public abstract class Decoder { /** * "Reads" a null value. (Doesn't actually read anything, but advances the state * of the parser if the implementation is stateful.) - * + * * @throws AvroTypeException If this is a stateful reader and null is not the * type of the next value to be read */ @@ -52,7 +52,7 @@ public abstract class Decoder { /** * Reads a boolean value written by {@link Encoder#writeBoolean}. - * + * * @throws AvroTypeException If this is a stateful reader and boolean is not the * type of the next value to be read */ @@ -61,7 +61,7 @@ public abstract class Decoder { /** * Reads an integer written by {@link Encoder#writeInt}. - * + * * @throws AvroTypeException If encoded value is larger than 32-bits * @throws AvroTypeException If this is a stateful reader and int is not the * type of the next value to be read @@ -70,7 +70,7 @@ public abstract class Decoder { /** * Reads a long written by {@link Encoder#writeLong}. - * + * * @throws AvroTypeException If this is a stateful reader and long is not the * type of the next value to be read */ @@ -78,7 +78,7 @@ public abstract class Decoder { /** * Reads a float written by {@link Encoder#writeFloat}. - * + * * @throws AvroTypeException If this is a stateful reader and is not the type of * the next value to be read */ @@ -86,7 +86,7 @@ public abstract class Decoder { /** * Reads a double written by {@link Encoder#writeDouble}. - * + * * @throws AvroTypeException If this is a stateful reader and is not the type of * the next value to be read */ @@ -94,7 +94,7 @@ public abstract class Decoder { /** * Reads a char-string written by {@link Encoder#writeString}. - * + * * @throws AvroTypeException If this is a stateful reader and char-string is not * the type of the next value to be read */ @@ -102,7 +102,7 @@ public abstract class Decoder { /** * Reads a char-string written by {@link Encoder#writeString}. - * + * * @throws AvroTypeException If this is a stateful reader and char-string is not * the type of the next value to be read */ @@ -110,7 +110,7 @@ public abstract class Decoder { /** * Discards a char-string written by {@link Encoder#writeString}. - * + * * @throws AvroTypeException If this is a stateful reader and char-string is not * the type of the next value to be read */ @@ -120,7 +120,7 @@ public abstract class Decoder { * Reads a byte-string written by {@link Encoder#writeBytes}. if old is * not null and has sufficient capacity to take in the bytes being read, the * bytes are returned in old. - * + * * @throws AvroTypeException If this is a stateful reader and byte-string is not * the type of the next value to be read */ @@ -128,7 +128,7 @@ public abstract class Decoder { /** * Discards a byte-string written by {@link Encoder#writeBytes}. - * + * * @throws AvroTypeException If this is a stateful reader and byte-string is not * the type of the next value to be read */ @@ -136,7 +136,7 @@ public abstract class Decoder { /** * Reads fixed sized binary object. - * + * * @param bytes The buffer to store the contents being read. * @param start The position where the data needs to be written. * @param length The size of the binary object. @@ -149,7 +149,7 @@ public abstract class Decoder { /** * A shorthand for readFixed(bytes, 0, bytes.length). - * + * * @throws AvroTypeException If this is a stateful reader and fixed sized binary * object is not the type of the next value to be read * or the length is incorrect. @@ -161,7 +161,7 @@ public void readFixed(byte[] bytes) throws IOException { /** * Discards fixed sized binary object. - * + * * @param length The size of the binary object to be skipped. * @throws AvroTypeException If this is a stateful reader and fixed sized binary * object is not the type of the next value to be read @@ -172,7 +172,7 @@ public void readFixed(byte[] bytes) throws IOException { /** * Reads an enumeration. - * + * * @return The enumeration's value. * @throws AvroTypeException If this is a stateful reader and enumeration is not * the type of the next value to be read. @@ -185,7 +185,7 @@ public void readFixed(byte[] bytes) throws IOException { * returns non-zero, then the caller should read the indicated number of items, * and then call {@link #arrayNext} to find out the number of items in the next * block. The typical pattern for consuming an array looks like: - * + * *

    *   for(long i = in.readArrayStart(); i != 0; i = in.arrayNext()) {
    *     for (long j = 0; j < i; j++) {
@@ -193,7 +193,7 @@ public void readFixed(byte[] bytes) throws IOException {
    *     }
    *   }
    * 
- * + * * @throws AvroTypeException If this is a stateful reader and array is not the * type of the next value to be read */ @@ -202,7 +202,7 @@ public void readFixed(byte[] bytes) throws IOException { /** * Processes the next block of an array and returns the number of items in the * block and let's the caller read those items. - * + * * @throws AvroTypeException When called outside of an array context */ public abstract long arrayNext() throws IOException; @@ -216,7 +216,7 @@ public void readFixed(byte[] bytes) throws IOException { * possible. It will return zero if there are no more items to skip through, or * an item count if it needs the client's help in skipping. The typical usage * pattern is: - * + * *
    *   for(long i = in.skipArray(); i != 0; i = i.skipArray()) {
    *     for (long j = 0; j < i; j++) {
@@ -224,7 +224,7 @@ public void readFixed(byte[] bytes) throws IOException {
    *     }
    *   }
    * 
- * + * * Note that this method can automatically skip through items if a byte-count is * found in the underlying data, or if a schema has been provided to the * implementation, but otherwise the client will have to skip through items @@ -242,7 +242,7 @@ public void readFixed(byte[] bytes) throws IOException { * As an example, let's say you want to read a map of records, the record * consisting of an Long field and a Boolean field. Your code would look * something like this: - * + * *
    * Map m = new HashMap();
    * Record reuse = new Record();
@@ -255,7 +255,7 @@ public void readFixed(byte[] bytes) throws IOException {
    *   }
    * }
    * 
- * + * * @throws AvroTypeException If this is a stateful reader and map is not the * type of the next value to be read */ @@ -264,7 +264,7 @@ public void readFixed(byte[] bytes) throws IOException { /** * Processes the next block of map entries and returns the count of them. * Similar to {@link #arrayNext}. See {@link #readMapStart} for details. - * + * * @throws AvroTypeException When called outside of a map context */ public abstract long mapNext() throws IOException; @@ -275,7 +275,7 @@ public void readFixed(byte[] bytes) throws IOException { * As an example, let's say you want to skip a map of records, the record * consisting of an Long field and a Boolean field. Your code would look * something like this: - * + * *
    * for (long i = in.skipMap(); i != 0; i = in.skipMap()) {
    *   for (long j = 0; j < i; j++) {
@@ -285,7 +285,7 @@ public void readFixed(byte[] bytes) throws IOException {
    *   }
    * }
    * 
- * + * * @throws AvroTypeException If this is a stateful reader and array is not the * type of the next value to be read */ @@ -294,9 +294,11 @@ public void readFixed(byte[] bytes) throws IOException { /** * Reads the tag of a union written by {@link Encoder#writeIndex}. - * + * * @throws AvroTypeException If this is a stateful reader and union is not the * type of the next value to be read */ public abstract int readIndex() throws IOException; + + public abstract int readExtends() throws IOException; } diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java index db3e88b6c85..806b85ba17d 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java @@ -39,7 +39,7 @@ *

* {@link EncoderFactory} contains Encoder construction and configuration * facilities. - * + * * @see EncoderFactory * @see Decoder */ @@ -48,7 +48,7 @@ public abstract class Encoder implements Flushable { /** * "Writes" a null value. (Doesn't actually write anything, but advances the * state of the parser if this class is stateful.) - * + * * @throws AvroTypeException If this is a stateful writer and a null is not * expected */ @@ -56,7 +56,7 @@ public abstract class Encoder implements Flushable { /** * Write a boolean value. - * + * * @throws AvroTypeException If this is a stateful writer and a boolean is not * expected */ @@ -64,7 +64,7 @@ public abstract class Encoder implements Flushable { /** * Writes a 32-bit integer. - * + * * @throws AvroTypeException If this is a stateful writer and an integer is not * expected */ @@ -72,7 +72,7 @@ public abstract class Encoder implements Flushable { /** * Write a 64-bit integer. - * + * * @throws AvroTypeException If this is a stateful writer and a long is not * expected */ @@ -80,7 +80,7 @@ public abstract class Encoder implements Flushable { /** * Write a float. - * + * * @throws IOException * @throws AvroTypeException If this is a stateful writer and a float is not * expected @@ -89,7 +89,7 @@ public abstract class Encoder implements Flushable { /** * Write a double. - * + * * @throws AvroTypeException If this is a stateful writer and a double is not * expected */ @@ -97,7 +97,7 @@ public abstract class Encoder implements Flushable { /** * Write a Unicode character string. - * + * * @throws AvroTypeException If this is a stateful writer and a char-string is * not expected */ @@ -107,7 +107,7 @@ public abstract class Encoder implements Flushable { * Write a Unicode character string. The default implementation converts the * String to a {@link org.apache.avro.util.Utf8}. Some Encoder implementations * may want to do something different as a performance optimization. - * + * * @throws AvroTypeException If this is a stateful writer and a char-string is * not expected */ @@ -119,7 +119,7 @@ public void writeString(String str) throws IOException { * Write a Unicode character string. If the CharSequence is an * {@link org.apache.avro.util.Utf8} it writes this directly, otherwise the * CharSequence is converted to a String via toString() and written. - * + * * @throws AvroTypeException If this is a stateful writer and a char-string is * not expected */ @@ -132,7 +132,7 @@ public void writeString(CharSequence charSequence) throws IOException { /** * Write a byte string. - * + * * @throws AvroTypeException If this is a stateful writer and a byte-string is * not expected */ @@ -140,7 +140,7 @@ public void writeString(CharSequence charSequence) throws IOException { /** * Write a byte string. - * + * * @throws AvroTypeException If this is a stateful writer and a byte-string is * not expected */ @@ -149,7 +149,7 @@ public void writeString(CharSequence charSequence) throws IOException { /** * Writes a byte string. Equivalent to * writeBytes(bytes, 0, bytes.length) - * + * * @throws IOException * @throws AvroTypeException If this is a stateful writer and a byte-string is * not expected @@ -160,7 +160,7 @@ public void writeBytes(byte[] bytes) throws IOException { /** * Writes a fixed size binary object. - * + * * @param bytes The contents to write * @param start The position within bytes where the contents start. * @param len The number of bytes to write. @@ -172,7 +172,7 @@ public void writeBytes(byte[] bytes) throws IOException { /** * A shorthand for writeFixed(bytes, 0, bytes.length) - * + * * @param bytes */ public void writeFixed(byte[] bytes) throws IOException { @@ -194,7 +194,7 @@ public void writeFixed(ByteBuffer bytes) throws IOException { /** * Writes an enumeration. - * + * * @param e * @throws AvroTypeException If this is a stateful writer and an enumeration is * not expected or the e is out of range. @@ -216,7 +216,7 @@ public void writeFixed(ByteBuffer bytes) throws IOException { * As an example, let's say you want to write an array of records, the record * consisting of an Long field and a Boolean field. Your code would look * something like this: - * + * *

    * out.writeArrayStart();
    * out.setItemCount(list.size());
@@ -227,7 +227,7 @@ public void writeFixed(ByteBuffer bytes) throws IOException {
    * }
    * out.writeArrayEnd();
    * 
- * + * * @throws AvroTypeException If this is a stateful writer and an array is not * expected */ @@ -248,7 +248,7 @@ public void writeFixed(ByteBuffer bytes) throws IOException { /** * Start a new item of an array or map. See {@link #writeArrayStart} for usage * information. - * + * * @throws AvroTypeException If called outside of an array or map context */ public abstract void startItem() throws IOException; @@ -270,7 +270,7 @@ public void writeFixed(ByteBuffer bytes) throws IOException { * As an example of usage, let's say you want to write a map of records, the * record consisting of an Long field and a Boolean field. Your code would look * something like this: - * + * *
    * out.writeMapStart();
    * out.setItemCount(list.size());
@@ -282,7 +282,7 @@ public void writeFixed(ByteBuffer bytes) throws IOException {
    * }
    * out.writeMapEnd();
    * 
- * + * * @throws AvroTypeException If this is a stateful writer and a map is not * expected */ @@ -304,15 +304,17 @@ public void writeFixed(ByteBuffer bytes) throws IOException { * As an example of usage, let's say you want to write a union, whose second * branch is a record consisting of an Long field and a Boolean field. Your code * would look something like this: - * + * *
    * out.writeIndex(1);
    * out.writeLong(record.longField);
    * out.writeBoolean(record.boolField);
    * 
- * + * * @throws AvroTypeException If this is a stateful writer and a map is not * expected */ public abstract void writeIndex(int unionIndex) throws IOException; + + public abstract void writeExtends(int extendsIndex) throws IOException; } diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/JsonDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/JsonDecoder.java index c1c38511ab4..493786998f6 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/JsonDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/JsonDecoder.java @@ -86,7 +86,7 @@ private static Symbol getSymbol(Schema schema) { *

* Otherwise, this JsonDecoder will reset its state and then reconfigure its * input. - * + * * @param in The InputStream to read from. Cannot be null. * @throws IOException * @throws NullPointerException if {@code in} is {@code null} @@ -109,7 +109,7 @@ public JsonDecoder configure(InputStream in) throws IOException { *

* Otherwise, this JsonDecoder will reset its state and then reconfigure its * input. - * + * * @param in The String to read from. Cannot be null. * @throws IOException * @throws NullPointerException if {@code in} is {@code null} @@ -436,6 +436,26 @@ public int readIndex() throws IOException { return n; } + @Override + public int readExtends() throws IOException { + advance(Symbol.EXTENDS); + Symbol.Alternative a = (Symbol.Alternative) parser.popSymbol(); + + final String label; + if (in.getCurrentToken() == JsonToken.START_OBJECT && in.nextToken() == JsonToken.FIELD_NAME) { + label = in.getText(); + in.nextToken(); + parser.pushSymbol(Symbol.EXTENDS_END); + } else { + throw error("start-extends"); + } + int n = a.findLabel(label); + if (n < 0) + throw new AvroTypeException("Unknown extension " + label); + parser.pushSymbol(a.getSymbol(n)); + return n; + } + @Override public Symbol doAction(Symbol input, Symbol top) throws IOException { if (top instanceof Symbol.FieldAdjustAction) { diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java index 71cc690b8a4..163873b1b62 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java @@ -302,6 +302,19 @@ public void writeIndex(int unionIndex) throws IOException { parser.pushSymbol(symbol); } + @Override + public void writeExtends(int extendedIndex) throws IOException { + parser.advance(Symbol.EXTENDS); + Symbol.Alternative top = (Symbol.Alternative) parser.popSymbol(); + Symbol symbol = top.getSymbol(extendedIndex); + if (symbol != Symbol.NULL && includeNamespace) { + out.writeStartObject(); + out.writeFieldName(top.getLabel(extendedIndex)); + parser.pushSymbol(Symbol.EXTENDS_END); + } + parser.pushSymbol(symbol); + } + @Override public Symbol doAction(Symbol input, Symbol top) throws IOException { if (top instanceof Symbol.FieldAdjustAction) { @@ -309,7 +322,7 @@ public Symbol doAction(Symbol input, Symbol top) throws IOException { out.writeFieldName(fa.fname); } else if (top == Symbol.RECORD_START) { out.writeStartObject(); - } else if (top == Symbol.RECORD_END || top == Symbol.UNION_END) { + } else if (top == Symbol.RECORD_END || top == Symbol.UNION_END || top == Symbol.EXTENDS_END) { out.writeEndObject(); } else if (top != Symbol.FIELD_END) { throw new AvroTypeException("Unknown action symbol " + top); diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/ResolvingDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/ResolvingDecoder.java index 6f119a39b65..5f37c1e4c80 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/ResolvingDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/ResolvingDecoder.java @@ -286,6 +286,22 @@ public int readIndex() throws IOException { return result; } + @Override + public int readExtends() throws IOException { + parser.advance(Symbol.EXTENDS); + Symbol top = parser.popSymbol(); + final int result; + if (top instanceof Symbol.UnionAdjustAction) { + result = ((Symbol.UnionAdjustAction) top).rindex; + top = ((Symbol.UnionAdjustAction) top).symToParse; + } else { + result = in.readExtends(); + top = ((Symbol.Alternative) top).getSymbol(result); + } + parser.pushSymbol(top); + return result; + } + @Override public Symbol doAction(Symbol input, Symbol top) throws IOException { if (top instanceof Symbol.FieldOrderAction) { diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingDecoder.java index dbee4458575..03e211e43f9 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingDecoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingDecoder.java @@ -36,7 +36,7 @@ * and configure. *

* ValidatingDecoder is not thread-safe. - * + * * @see Decoder * @see DecoderFactory */ @@ -242,6 +242,15 @@ public int readIndex() throws IOException { return result; } + @Override + public int readExtends() throws IOException { + parser.advance(Symbol.EXTENDS); + Symbol.Alternative top = (Symbol.Alternative) parser.popSymbol(); + int result = in.readExtends(); + parser.pushSymbol(top.getSymbol(result)); + return result; + } + @Override public Symbol doAction(Symbol input, Symbol top) throws IOException { return null; diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingEncoder.java index d7440c7406e..ed08c8832a8 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/ValidatingEncoder.java @@ -36,7 +36,7 @@ * and configure. *

* ValidatingEncoder is not thread-safe. - * + * * @see Encoder * @see EncoderFactory */ @@ -60,7 +60,7 @@ public void flush() throws IOException { /** * Reconfigures this ValidatingEncoder to wrap the encoder provided. - * + * * @param encoder The Encoder to wrap for validation. * @return This ValidatingEncoder. */ @@ -205,6 +205,14 @@ public void writeIndex(int unionIndex) throws IOException { out.writeIndex(unionIndex); } + @Override + public void writeExtends(int extendsIndex) throws IOException { + parser.advance(Symbol.EXTENDS); + final Symbol.Alternative top = (Symbol.Alternative) parser.popSymbol(); + parser.pushSymbol(top.getSymbol(extendsIndex)); + out.writeExtends(extendsIndex); + } + @Override public Symbol doAction(Symbol input, Symbol top) throws IOException { return null; diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/JsonGrammarGenerator.java b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/JsonGrammarGenerator.java index 8c1e4b09fe9..4bfd4e53cb9 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/JsonGrammarGenerator.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/JsonGrammarGenerator.java @@ -18,7 +18,9 @@ package org.apache.avro.io.parsing; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; @@ -43,7 +45,7 @@ public Symbol generate(Schema schema) { * schema sc. If there is already an entry for the given schema in the * given map seen then that entry is returned. Otherwise a new symbol * is generated and an entry is inserted into the map. - * + * * @param sc The schema for which the start symbol is required * @param seen A map of schema to symbol mapping done so far. * @return The start symbol for the schema @@ -74,20 +76,16 @@ public Symbol generate(Schema sc, Map seen) { LitS wsc = new LitS(sc); Symbol rresult = seen.get(wsc); if (rresult == null) { - Symbol[] production = new Symbol[sc.getFields().size() * 3 + 2]; - rresult = Symbol.seq(production); - seen.put(wsc, rresult); - - int i = production.length; - int n = 0; - production[--i] = Symbol.RECORD_START; - for (Field f : sc.getFields()) { - production[--i] = Symbol.fieldAdjustAction(n, f.name(), f.aliases()); - production[--i] = generate(f.schema(), seen); - production[--i] = Symbol.FIELD_END; - n++; + if (sc.hasChild()) { + this.generateRecord(sc, seen); // put root before to avoid infinite loop + final List subs = sc.visitHierarchy().collect(Collectors.toList()); + final Symbol alternatives = alternatives(subs, seen); + rresult = Symbol.seq(alternatives, Symbol.EXTENDS); + seen.put(wsc, rresult); + return rresult; + } else { + rresult = this.generateRecord(sc, seen); } - production[--i] = Symbol.RECORD_END; } return rresult; } @@ -95,4 +93,23 @@ public Symbol generate(Schema sc, Map seen) { throw new RuntimeException("Unexpected schema type"); } } + + private Symbol generateRecord(Schema sc, Map seen) { + Symbol[] production = new Symbol[sc.getFields().size() * 3 + 2]; + Symbol rresult = Symbol.seq(production); + LitS wsc = new LitS(sc); + seen.put(wsc, rresult); + + int i = production.length; + int n = 0; + production[--i] = Symbol.RECORD_START; + for (Field f : sc.getFields()) { + production[--i] = Symbol.fieldAdjustAction(n, f.name(), f.aliases()); + production[--i] = generate(f.schema(), seen); + production[--i] = Symbol.FIELD_END; + n++; + } + production[--i] = Symbol.RECORD_END; + return rresult; + } } diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java index 77fbe1c7ad0..abc0ff05845 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.avro.AvroTypeException; import org.apache.avro.Resolver; @@ -131,23 +132,28 @@ private Symbol generate(Resolver.Action action, Map seen) throws Symbol result = seen.get(action); if (result == null) { final Resolver.RecordAdjust ra = (Resolver.RecordAdjust) action; - int defaultCount = ra.readerOrder.length - ra.firstDefault; - int count = 1 + ra.fieldActions.length + 3 * defaultCount; - final Symbol[] production = new Symbol[count]; - result = Symbol.seq(production); - seen.put(action, result); - production[--count] = Symbol.fieldOrderAction(ra.readerOrder); - - final Resolver.Action[] actions = ra.fieldActions; - for (Resolver.Action wfa : actions) { - production[--count] = generate(wfa, seen); - } - for (int i = ra.firstDefault; i < ra.readerOrder.length; i++) { - final Schema.Field rf = ra.readerOrder[i]; - byte[] bb = getBinary(rf.schema(), Accessor.defaultValue(rf)); - production[--count] = Symbol.defaultStartAction(bb); - production[--count] = simpleGen(rf.schema(), seen); - production[--count] = Symbol.DEFAULT_END_ACTION; + if (ra.reader.hasChild()) { + result = simpleRecordGen(ra.reader, seen, true); + seen.put(action, result); + } else { + int defaultCount = ra.readerOrder.length - ra.firstDefault; + int count = 1 + ra.fieldActions.length + 3 * defaultCount; + final Symbol[] production = new Symbol[count]; + result = Symbol.seq(production); + seen.put(action, result); + production[--count] = Symbol.fieldOrderAction(ra.readerOrder); + + final Resolver.Action[] actions = ra.fieldActions; + for (Resolver.Action wfa : actions) { + production[--count] = generate(wfa, seen); + } + for (int i = ra.firstDefault; i < ra.readerOrder.length; i++) { + final Schema.Field rf = ra.readerOrder[i]; + byte[] bb = getBinary(rf.schema(), Accessor.defaultValue(rf)); + production[--count] = Symbol.defaultStartAction(bb); + production[--count] = simpleGen(rf.schema(), seen); + production[--count] = Symbol.DEFAULT_END_ACTION; + } } } return result; @@ -203,14 +209,8 @@ private Symbol simpleGen(Schema s, Map seen) { case RECORD: { Symbol result = seen.get(s); if (result == null) { - final Symbol[] production = new Symbol[s.getFields().size() + 1]; - result = Symbol.seq(production); - seen.put(s, result); - int i = production.length; - production[--i] = Symbol.fieldOrderAction(s.getFields().toArray(new Schema.Field[0])); - for (Field f : s.getFields()) { - production[--i] = simpleGen(f.schema(), seen); - } + result = simpleRecordGen(s, seen, true); + // FieldOrderAction is needed even though the field-order hasn't changed, // because the _reader_ doesn't know the field order hasn't changed, and // thus it will probably call {@ ResolvingDecoder.fieldOrder} to find out. @@ -223,6 +223,35 @@ private Symbol simpleGen(Schema s, Map seen) { } } + private Symbol simpleRecordGen(Schema s, Map seen, boolean exploreHierarchy) { + if (s.hasChild() && exploreHierarchy) { + seen.put(s, Symbol.EXTENDS); + final List subs = s.visitHierarchy().collect(Collectors.toList()); + final Symbol[] symbols = new Symbol[subs.size()]; + final String[] labels = new String[subs.size()]; + int i = 0; + for (Schema b : s.visitHierarchy().collect(Collectors.toList())) { + symbols[i] = simpleRecordGen(b, seen, false); + labels[i++] = b.getFullName(); + } + Symbol seq = Symbol.seq(Symbol.alt(symbols, labels), Symbol.EXTENDS); + seen.put(s, seq); + return seq; + } else { + final Symbol[] production = new Symbol[s.getFields().size() + 1]; + Symbol result = Symbol.seq(production); + seen.put(s, result); + + int i = production.length; + production[--i] = Symbol.fieldOrderAction(s.getFields().toArray(new Schema.Field[0])); + for (Field f : s.getFields()) { + production[--i] = simpleGen(f.schema(), seen); + } + return result; + } + + } + private static EncoderFactory factory = new EncoderFactory().configureBufferSize(32); /** diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/Symbol.java b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/Symbol.java index a18f3fdbcd5..ab35401742e 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/Symbol.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/Symbol.java @@ -94,7 +94,7 @@ static Symbol root(Symbol... symbols) { /** * A convenience method to construct a sequence. - * + * * @param production The constituent symbols of the sequence. */ static Symbol seq(Symbol... production) { @@ -103,7 +103,7 @@ static Symbol seq(Symbol... production) { /** * A convenience method to construct a repeater. - * + * * @param symsToRepeat The symbols to repeat in the repeater. */ static Symbol repeat(Symbol endSymbol, Symbol... symsToRepeat) { @@ -119,7 +119,7 @@ static Symbol alt(Symbol[] symbols, String[] labels) { /** * A convenience method to construct an ErrorAction. - * + * * @param e */ static Symbol error(String e) { @@ -128,7 +128,7 @@ static Symbol error(String e) { /** * A convenience method to construct a ResolvingAction. - * + * * @param w The writer symbol * @param r The reader symbol */ @@ -238,7 +238,7 @@ private static void copyFixups(List fixups, Symbol[] out, int outPos, Sym /** * Returns the amount of space required to flatten the given sub-array of * symbols. - * + * * @param symbols The array of input symbols. * @param start The index where the subarray starts. * @return The number of symbols that will be produced if one expands the given @@ -692,6 +692,7 @@ public int findLabel(String l) { public static final Symbol FIXED = new Symbol.Terminal("fixed"); public static final Symbol ENUM = new Symbol.Terminal("enum"); public static final Symbol UNION = new Symbol.Terminal("union"); + public static final Symbol EXTENDS = new Symbol.Terminal("extends"); public static final Symbol ARRAY_START = new Symbol.Terminal("array-start"); public static final Symbol ARRAY_END = new Symbol.Terminal("array-end"); @@ -707,6 +708,8 @@ public int findLabel(String l) { public static final Symbol RECORD_START = new ImplicitAction(false); public static final Symbol RECORD_END = new ImplicitAction(true); public static final Symbol UNION_END = new ImplicitAction(true); + + public static final Symbol EXTENDS_END = new ImplicitAction(true); public static final Symbol FIELD_END = new ImplicitAction(true); public static final Symbol DEFAULT_END_ACTION = new ImplicitAction(true); diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ValidatingGrammarGenerator.java b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ValidatingGrammarGenerator.java index 7798f520ae6..6f73fca21e2 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ValidatingGrammarGenerator.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/parsing/ValidatingGrammarGenerator.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; @@ -41,7 +42,7 @@ public Symbol generate(Schema schema) { * given schema sc. If there is already an entry for the given schema * in the given map seen then that entry is returned. Otherwise a new * symbol is generated and an entry is inserted into the map. - * + * * @param sc The schema for which the start symbol is required * @param seen A map of schema to symbol mapping done so far. * @return The start symbol for the schema @@ -77,40 +78,60 @@ public Symbol generate(Schema sc, Map seen) { LitS wsc = new LitS(sc); Symbol rresult = seen.get(wsc); if (rresult == null) { - Symbol[] production = new Symbol[sc.getFields().size()]; - - /** - * We construct a symbol without filling the array. Please see - * {@link Symbol#production} for the reason. - */ - rresult = Symbol.seq(production); - seen.put(wsc, rresult); - - int i = production.length; - for (Field f : sc.getFields()) { - production[--i] = generate(f.schema(), seen); + if (sc.hasChild()) { + this.productRecord(sc, seen); + final List subs = sc.visitHierarchy().collect(Collectors.toList()); + final Symbol alternatives = alternatives(subs, seen); + rresult = Symbol.seq(alternatives, Symbol.EXTENDS); + seen.put(wsc, rresult); + } else { + rresult = this.productRecord(sc, seen); } } return rresult; } case UNION: - List subs = sc.getTypes(); - Symbol[] symbols = new Symbol[subs.size()]; - String[] labels = new String[subs.size()]; - - int i = 0; - for (Schema b : sc.getTypes()) { - symbols[i] = generate(b, seen); - labels[i] = b.getFullName(); - i++; - } - return Symbol.seq(Symbol.alt(symbols, labels), Symbol.UNION); + final List subs = sc.getTypes(); + final Symbol alternatives = alternatives(subs, seen); + return Symbol.seq(alternatives, Symbol.UNION); default: throw new RuntimeException("Unexpected schema type"); } } + protected Symbol productRecord(Schema sc, Map seen) { + Symbol[] production = new Symbol[sc.getFields().size()]; + + /** + * We construct a symbol without filling the array. Please see + * {@link Symbol#production} for the reason. + */ + Symbol rresult = Symbol.seq(production); + LitS wsc = new LitS(sc); + seen.put(wsc, rresult); + + int i = production.length; + for (Field f : sc.getFields()) { + production[--i] = generate(f.schema(), seen); + } + return rresult; + } + + protected Symbol alternatives(List subs, Map seen) { + Symbol[] symbols = new Symbol[subs.size()]; + String[] labels = new String[subs.size()]; + + int i = 0; + for (Schema b : subs) { + + symbols[i] = generate(b, seen); + labels[i] = b.getFullName(); + i++; + } + return Symbol.alt(symbols, labels); + } + /** A wrapper around Schema that does "==" equality. */ static class LitS { public final Schema actual; diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java index 95cb367460e..bd4805355dd 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java +++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java @@ -116,6 +116,58 @@ public void testSchemaWithFields() { assertEquals(2, schema.getFields().size()); } + @Test + public void testExtendedRecordWithFields() { + List fields = new ArrayList<>(); + fields.add(new Field("field_name1", Schema.create(Type.NULL), null, null)); + fields.add(new Field("field_name2", Schema.create(Type.INT), null, null)); + Schema schema = Schema.createRecord("int", "doc", "namespace", false); + schema.setFields(fields); + String schemaString = schema.toString(); + assertNotNull(schemaString); + assertEquals(2, schema.getFields().size()); + + Schema extendedSchema = Schema.createRecord(schema, "extended", "doc", "namespace", false); + assertEquals(2, extendedSchema.getFields().size()); + assertNotNull(extendedSchema.getField("field_name1")); + assertTrue(extendedSchema.hasFields()); + + List newFields = new ArrayList<>(); + newFields.add(new Field("field_name3", Schema.create(Type.STRING), null, "def")); + extendedSchema.setFields(newFields); + + assertEquals(3, extendedSchema.getFields().size()); + String json = extendedSchema.toString(true); + System.out.println(json); + + assertNotNull(extendedSchema.getField("field_name1")); + assertNotNull(extendedSchema.getField("field_name2")); + assertNotNull(extendedSchema.getField("field_name3")); + + assertNotEquals(schema, extendedSchema); + + Schema extendedSchema2 = Schema.createRecord(schema, "extended", "doc", "namespace", false); + assertNotEquals(extendedSchema2, extendedSchema); + + List newFields2 = new ArrayList<>(); + newFields2.add(new Field("field_name3", Schema.create(Type.STRING), null, "def")); + extendedSchema2.setFields(newFields2); + + assertEquals(extendedSchema2, extendedSchema); + + Schema grandson = Schema.createRecord(extendedSchema, "ext2", "doc", "namespace", false); + assertEquals(3, grandson.getFields().size()); + assertNotNull(grandson.getField("field_name1")); + assertNotNull(grandson.getField("field_name2")); + assertNotNull(grandson.getField("field_name3")); + + List newFields3 = new ArrayList<>(); + newFields3.add(new Field("field_name4", Schema.create(Type.STRING), null, "def")); + grandson.setFields(newFields3); + assertEquals(4, grandson.getFields().size()); + assertNotNull(grandson.getField("field_name4")); + } + @Test(expected = NullPointerException.class) public void testSchemaWithNullFields() { Schema.createRecord("name", "doc", "namespace", false, null); @@ -395,4 +447,27 @@ public void testQualifiedName() { assertEquals("Int", nameInt.getQualified("space")); } + @Test + public void testSchemaHierarchy() { + final Schema root = Schema.createRecord("root", "doc", "space", false); + assertEquals(0, root.getIndex()); + + final Schema child2 = Schema.createRecord(root, "child2", "doc", "space2", false); + final Schema child1 = Schema.createRecord(root, "child1", "doc", "space", false); + assertEquals(1, child1.getIndex()); + + final Schema child1_2 = Schema.createRecord(child1, "child1_2", "doc", "space", false); + final Schema child1_1 = Schema.createRecord(child1, "child1_1", "doc", "space", false); + + assertEquals(0, root.getIndex()); + assertEquals(1, child1.getIndex()); + assertEquals(2, child1_1.getIndex()); + assertEquals(3, child1_2.getIndex()); + assertEquals(4, child2.getIndex()); + + Schema.Field f1 = new Schema.Field("f1", Schema.create(Type.STRING), "doc"); + child1.setFields(Arrays.asList(f1)); + assertEquals(child1.hashCode(), child1.hashCode()); + } + } diff --git a/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericDatumWriter.java b/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericDatumWriter.java index dcbd5098085..c0619f4a3ce 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericDatumWriter.java +++ b/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericDatumWriter.java @@ -26,6 +26,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; import java.util.ConcurrentModificationException; @@ -37,8 +38,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.avro.AvroTypeException; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.UnresolvedUnionException; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DecoderFactory; @@ -47,6 +54,8 @@ import org.apache.avro.util.Utf8; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestGenericDatumWriter { @Test @@ -81,6 +90,81 @@ public void testWrite() throws IOException { assertEquals(r, o); } + @Test + public void testWriteUnion() throws IOException { + + Schema f1 = SchemaBuilder.record("f1").doc("doc").namespace("space").fields().name("field") + .type(Schema.create(Schema.Type.STRING)).noDefault().endRecord(); + Schema f2 = SchemaBuilder.record("f2").doc("doc").namespace("space").fields().name("field") + .type(Schema.create(Schema.Type.INT)).noDefault().endRecord(); + + Schema root = SchemaBuilder.record("root").doc("doc").namespace("space").fields().name("f") + .type(Schema.createArray(Schema.createUnion(f1, f2, Schema.create(Schema.Type.LONG)))).noDefault().endRecord(); + + GenericRecord mainRecord = new GenericData.Record(root); + + GenericRecord f1Record = new GenericData.Record(f1); + f1Record.put("field", "Hello"); + + mainRecord.put("f", Arrays.asList(f1Record, 3l)); + + ByteArrayOutputStream bao = new ByteArrayOutputStream(); + GenericDatumWriter w = new GenericDatumWriter<>(root); + Encoder e = EncoderFactory.get().jsonEncoder(root, bao); + w.write(mainRecord, e); + e.flush(); + + Object o = new GenericDatumReader(root).read(null, + DecoderFactory.get().jsonDecoder(root, new ByteArrayInputStream(bao.toByteArray()))); + assertEquals(mainRecord, o); + } + + @Test + public void testWriteRecordExtension() throws IOException { + String parent = "{\"type\": \"record\", \"name\": \"r\", \"fields\": [" + "{ \"name\": \"f1\", \"type\": \"long\" }" + + "]}"; + + String child = "{\"type\": \"record: r\", \"name\": \"c\", \"fields\": [" + + "{ \"name\": \"f2\", \"type\": \"string\" }" + "]}"; + + String json = "{\"type\": \"record\", \"name\": \"m\", \"fields\": [" + "{ \"name\": \"f\", \"type\": \"r\" }" + + "]}"; + + Schema.Parser parser = new Schema.Parser(); + final Schema p = parser.parse(parent); + final Schema c = parser.parse(child); + final Schema s = parser.parse(json); + + GenericRecord childObject = new GenericData.Record(c); + childObject.put("f1", 100L); + childObject.put("f2", "Hello"); + + GenericRecord r = new GenericData.Record(s); + r.put("f", childObject); + Object o = this.writeAndRead(r, s); + assertEquals(r, o); + + GenericRecord r1 = new GenericData.Record(s); + GenericRecord parentObject = new GenericData.Record(p); + parentObject.put("f1", 50L); + r1.put("f", parentObject); + Object o1 = this.writeAndRead(r1, s); + assertEquals(r1, o1); + + } + + private Object writeAndRead(GenericRecord r, Schema s) throws IOException { + ByteArrayOutputStream bao = new ByteArrayOutputStream(); + GenericDatumWriter w = new GenericDatumWriter<>(s); + Encoder e = EncoderFactory.get().jsonEncoder(s, bao); + w.write(r, e); + e.flush(); + + Object o = new GenericDatumReader(s).read(null, + DecoderFactory.get().jsonDecoder(s, new ByteArrayInputStream(bao.toByteArray()))); + return o; + } + @Test public void testArrayConcurrentModification() throws Exception { String json = "{\"type\": \"array\", \"items\": \"int\" }"; @@ -284,6 +368,11 @@ public void writeMapEnd() throws IOException { public void writeIndex(int unionIndex) throws IOException { e.writeIndex(unionIndex); } + + @Override + public void writeExtends(int extendsIndex) throws IOException { + e.writeIndex(extendsIndex); + } } @Test(expected = AvroTypeException.class) diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/LegacyBinaryEncoder.java b/lang/java/avro/src/test/java/org/apache/avro/io/LegacyBinaryEncoder.java index e4c601e4d28..e183dfef0db 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/io/LegacyBinaryEncoder.java +++ b/lang/java/avro/src/test/java/org/apache/avro/io/LegacyBinaryEncoder.java @@ -178,6 +178,11 @@ public void writeIndex(int unionIndex) throws IOException { encodeLong(unionIndex, out); } + @Override + public void writeExtends(int extendsIndex) throws IOException { + encodeLong(extendsIndex, out); + } + protected static void encodeLong(long n, OutputStream o) throws IOException { n = (n << 1) ^ (n >> 63); // move sign to low-order bit while ((n & ~0x7F) != 0) { diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestResolvingIO.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestResolvingIO.java index c880d9fd55a..daf96004344 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/io/TestResolvingIO.java +++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestResolvingIO.java @@ -21,44 +21,30 @@ import java.io.IOException; import java.io.InputStream; import java.util.Arrays; -import java.util.Collection; +import java.util.stream.Stream; import org.apache.avro.Schema; import org.apache.avro.io.TestValidatingIO.Encoding; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; -@RunWith(Parameterized.class) public class TestResolvingIO { - protected final Encoding eEnc; - protected final int iSkipL; - protected final String sJsWrtSchm; - protected final String sWrtCls; - protected final String sJsRdrSchm; - protected final String sRdrCls; - - public TestResolvingIO(Encoding encoding, int skipLevel, String jsonWriterSchema, String writerCalls, - String jsonReaderSchema, String readerCalls) { - this.eEnc = encoding; - this.iSkipL = skipLevel; - this.sJsWrtSchm = jsonWriterSchema; - this.sWrtCls = writerCalls; - this.sJsRdrSchm = jsonReaderSchema; - this.sRdrCls = readerCalls; - } - - @Test - public void testIdentical() throws IOException { - performTest(eEnc, iSkipL, sJsWrtSchm, sWrtCls, sJsWrtSchm, sWrtCls); + @ParameterizedTest + @MethodSource("data2") + public void testIdentical(Encoding encoding, int skipLevel, String jsonWriterSchema, String writerCalls) + throws IOException { + performTest(encoding, skipLevel, jsonWriterSchema, writerCalls, jsonWriterSchema, writerCalls); } private static final int COUNT = 10; - @Test - public void testCompatible() throws IOException { - performTest(eEnc, iSkipL, sJsWrtSchm, sWrtCls, sJsRdrSchm, sRdrCls); + @ParameterizedTest + @MethodSource("data2") + public void testCompatible(Encoding encoding, int skipLevel, String jsonWriterSchema, String writerCalls, + String jsonReaderSchema, String readerCalls) throws IOException { + performTest(encoding, skipLevel, jsonWriterSchema, writerCalls, jsonReaderSchema, readerCalls); } private void performTest(Encoding encoding, int skipLevel, String jsonWriterSchema, String writerCalls, @@ -100,9 +86,8 @@ static void check(Schema wsc, Schema rsc, byte[] bytes, String calls, Object[] v TestValidatingIO.check(msg, vi, calls, values, skipLevel); } - @Parameterized.Parameters - public static Collection data2() { - return Arrays.asList(TestValidatingIO.convertTo2dArray(encodings, skipLevels, testSchemas())); + public static Stream data2() { + return TestValidatingIO.convertTo2dArray(encodings, skipLevels, testSchemas()); } static Object[][] encodings = new Object[][] { { Encoding.BINARY }, { Encoding.BLOCKING_BINARY }, { Encoding.JSON } }; diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestResolvingIOResolving.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestResolvingIOResolving.java index 8e3dc8e53d7..76aaa537fd7 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/io/TestResolvingIOResolving.java +++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestResolvingIOResolving.java @@ -18,51 +18,30 @@ package org.apache.avro.io; import java.io.IOException; -import java.util.Arrays; -import java.util.Collection; +import java.util.stream.Stream; import org.apache.avro.Schema; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; -@RunWith(Parameterized.class) public class TestResolvingIOResolving { - protected TestValidatingIO.Encoding eEnc; - protected final int iSkipL; - protected final String sJsWrtSchm; - protected final String sWrtCls; - protected final String sJsRdrSchm; - protected final String sRdrCls; - protected final Object[] oaWrtVals; - protected final Object[] oaRdrVals; - - public TestResolvingIOResolving(TestValidatingIO.Encoding encoding, int skipLevel, String jsonWriterSchema, - String writerCalls, Object[] writerValues, String jsonReaderSchema, String readerCalls, Object[] readerValues) { - this.eEnc = encoding; - this.iSkipL = skipLevel; - this.sJsWrtSchm = jsonWriterSchema; - this.sWrtCls = writerCalls; - this.oaWrtVals = writerValues; - this.sJsRdrSchm = jsonReaderSchema; - this.sRdrCls = readerCalls; - this.oaRdrVals = readerValues; - } - - @Test - public void testResolving() throws IOException { - Schema writerSchema = new Schema.Parser().parse(sJsWrtSchm); - byte[] bytes = TestValidatingIO.make(writerSchema, sWrtCls, oaWrtVals, eEnc); - Schema readerSchema = new Schema.Parser().parse(sJsRdrSchm); - TestValidatingIO.print(eEnc, iSkipL, writerSchema, readerSchema, oaWrtVals, oaRdrVals); - TestResolvingIO.check(writerSchema, readerSchema, bytes, sRdrCls, oaRdrVals, eEnc, iSkipL); + @ParameterizedTest + @MethodSource("data3") + public void testResolving(TestValidatingIO.Encoding encoding, int skipLevel, String jsonWriterSchema, + String writerCalls, Object[] writerValues, String jsonReaderSchema, String readerCalls, Object[] readerValues) + throws IOException { + Schema writerSchema = new Schema.Parser().parse(jsonWriterSchema); + byte[] bytes = TestValidatingIO.make(writerSchema, writerCalls, writerValues, encoding); + Schema readerSchema = new Schema.Parser().parse(jsonReaderSchema); + TestValidatingIO.print(encoding, skipLevel, writerSchema, readerSchema, writerValues, readerValues); + TestResolvingIO.check(writerSchema, readerSchema, bytes, readerCalls, readerValues, encoding, skipLevel); } - @Parameterized.Parameters - public static Collection data3() { - Collection ret = Arrays.asList(TestValidatingIO.convertTo2dArray(TestResolvingIO.encodings, - TestResolvingIO.skipLevels, dataForResolvingTests())); + public static Stream data3() { + Stream ret = TestValidatingIO.convertTo2dArray(TestResolvingIO.encodings, TestResolvingIO.skipLevels, + dataForResolvingTests()); return ret; } diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestValidatingIO.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestValidatingIO.java index 3056d5430af..19dd0539ad2 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/io/TestValidatingIO.java +++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestValidatingIO.java @@ -17,10 +17,6 @@ */ package org.apache.avro.io; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -32,15 +28,22 @@ import java.util.Iterator; import java.util.List; import java.util.Random; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + import org.apache.avro.Schema; import org.apache.avro.util.Utf8; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@RunWith(Parameterized.class) public class TestValidatingIO { enum Encoding { BINARY, BLOCKING_BINARY, JSON, @@ -48,30 +51,19 @@ enum Encoding { private static final Logger LOG = LoggerFactory.getLogger(TestValidatingIO.class); - private Encoding eEnc; - private int iSkipL; - private String sJsSch; - private String sCl; - - public TestValidatingIO(Encoding enc, int skip, String js, String cls) { - this.eEnc = enc; - this.iSkipL = skip; - this.sJsSch = js; - this.sCl = cls; - } - private static final int COUNT = 1; - @Test - public void testMain() throws IOException { + @ParameterizedTest + @MethodSource("data") + public void testMain(Encoding enc, int skip, Schema schema, String cls) throws IOException { for (int i = 0; i < COUNT; i++) { - testOnce(new Schema.Parser().parse(sJsSch), sCl, iSkipL, eEnc); + testOnce(schema, cls, skip, enc); } } private void testOnce(Schema schema, String calls, int skipLevel, Encoding encoding) throws IOException { Object[] values = randomValues(calls); - print(eEnc, iSkipL, schema, schema, values, values); + print(encoding, skipLevel, schema, schema, values, values); byte[] bytes = make(schema, calls, values, encoding); check(schema, bytes, calls, values, skipLevel, encoding); } @@ -152,6 +144,11 @@ public static void generate(Encoder vw, String calls, Object[] values) throws IO double d = (Double) values[p++]; vw.writeDouble(d); break; + case 'E': { + int index = extractInt(cs); + vw.writeExtends(index); + break; + } case 'S': { extractInt(cs); String s = (String) values[p++]; @@ -204,7 +201,7 @@ public static void generate(Encoder vw, String calls, Object[] values) throws IO break; } default: - fail(); + Assertions.fail(); break; } } @@ -220,6 +217,9 @@ public static Object[] randomValues(String calls) { switch (c) { case 'N': break; + case 'E': + extractInt(cs); + break; case 'B': result.add(r.nextBoolean()); break; @@ -254,7 +254,7 @@ public static Object[] randomValues(String calls) { case 's': break; default: - fail(); + Assertions.fail("Can't manage '" + c + "' char"); break; } } @@ -274,6 +274,17 @@ private static int extractInt(InputScanner sc) { return r; } + private static String extractString(InputScanner sc, int size) { + final char[] values = new char[size]; + int pos = 0; + while (!sc.isDone() && pos < size) { + values[pos] = sc.cur(); + sc.next(); + pos++; + } + return new String(values); + } + private static byte[] nextBytes(Random r, int length) { byte[] bb = new byte[length]; r.nextBytes(bb); @@ -324,25 +335,25 @@ public static void check(String msg, Decoder vi, String calls, Object[] values, vi.readNull(); break; case 'B': - assertEquals(msg, values[p++], vi.readBoolean()); + Assertions.assertEquals(values[p++], vi.readBoolean(), msg); break; case 'I': - assertEquals(msg, values[p++], vi.readInt()); + Assertions.assertEquals(values[p++], vi.readInt(), msg); break; case 'L': - assertEquals(msg, values[p++], vi.readLong()); + Assertions.assertEquals(values[p++], vi.readLong(), msg); break; case 'F': if (!(values[p] instanceof Float)) - fail(); + Assertions.fail(); float f = (Float) values[p++]; - assertEquals(msg, f, vi.readFloat(), Math.abs(f / 1000)); + Assertions.assertEquals(f, vi.readFloat(), Math.abs(f / 1000), msg); break; case 'D': if (!(values[p] instanceof Double)) - fail(); + Assertions.fail(); double d = (Double) values[p++]; - assertEquals(msg, d, vi.readDouble(), Math.abs(d / 1000)); + Assertions.assertEquals(d, vi.readDouble(), Math.abs(d / 1000), msg); break; case 'S': extractInt(cs); @@ -351,7 +362,7 @@ public static void check(String msg, Decoder vi, String calls, Object[] values, p++; } else { String s = (String) values[p++]; - assertEquals(msg, new Utf8(s), vi.readString(null)); + Assertions.assertEquals(new Utf8(s), vi.readString(null), msg); } break; case 'K': @@ -361,7 +372,7 @@ public static void check(String msg, Decoder vi, String calls, Object[] values, p++; } else { String s = (String) values[p++]; - assertEquals(msg, new Utf8(s), vi.readString(null)); + Assertions.assertEquals(new Utf8(s), vi.readString(null), msg); } break; case 'b': @@ -374,7 +385,7 @@ public static void check(String msg, Decoder vi, String calls, Object[] values, ByteBuffer bb2 = vi.readBytes(null); byte[] actBytes = new byte[bb2.remaining()]; System.arraycopy(bb2.array(), bb2.position(), actBytes, 0, bb2.remaining()); - assertArrayEquals(msg, bb, actBytes); + Assertions.assertArrayEquals(bb, actBytes, msg); } break; case 'f': { @@ -386,16 +397,21 @@ public static void check(String msg, Decoder vi, String calls, Object[] values, byte[] bb = (byte[]) values[p++]; byte[] actBytes = new byte[len]; vi.readFixed(actBytes); - assertArrayEquals(msg, bb, actBytes); + Assertions.assertArrayEquals(bb, actBytes, msg); } } break; + case 'E': { + extractInt(cs); + vi.readExtends(); + } + break; case 'e': { int e = extractInt(cs); if (level == skipLevel) { vi.readEnum(); } else { - assertEquals(msg, e, vi.readEnum()); + Assertions.assertEquals(e, vi.readEnum(), msg); } } break; @@ -422,16 +438,16 @@ public static void check(String msg, Decoder vi, String calls, Object[] values, continue; } case ']': - assertEquals(msg, 0, counts[level]); + Assertions.assertEquals(0, counts[level], msg); if (!isEmpty[level]) { - assertEquals(msg, 0, vi.arrayNext()); + Assertions.assertEquals(0, vi.arrayNext(), msg); } level--; break; case '}': - assertEquals(0, counts[level]); + Assertions.assertEquals(0, counts[level]); if (!isEmpty[level]) { - assertEquals(msg, 0, vi.mapNext()); + Assertions.assertEquals(0, vi.mapNext(), msg); } level--; break; @@ -450,28 +466,28 @@ public static void check(String msg, Decoder vi, String calls, Object[] values, continue; case 'U': { int idx = extractInt(cs); - assertEquals(msg, idx, vi.readIndex()); + Assertions.assertEquals(idx, vi.readIndex(), msg); continue; } case 'R': ((ResolvingDecoder) vi).readFieldOrder(); continue; default: - fail(msg); + Assertions.fail("char '" + c + "' for :" + msg); } } catch (RuntimeException e) { throw new RuntimeException(msg, e); } } - assertEquals(msg, values.length, p); + Assertions.assertEquals(values.length, p, msg); } private static int skip(String msg, InputScanner cs, Decoder vi, boolean isArray) throws IOException { final char end = isArray ? ']' : '}'; if (isArray) { - assertEquals(msg, 0, vi.skipArray()); + Assertions.assertEquals(0, vi.skipArray(), msg); } else if (end == '}') { - assertEquals(msg, 0, vi.skipMap()); + Assertions.assertEquals(0, vi.skipMap(), msg); } int level = 0; int p = 0; @@ -493,6 +509,7 @@ private static int skip(String msg, InputScanner cs, Decoder vi, boolean isArray case 'B': case 'I': case 'L': + case 'E': case 'F': case 'D': case 'S': @@ -507,9 +524,8 @@ private static int skip(String msg, InputScanner cs, Decoder vi, boolean isArray throw new RuntimeException("Don't know how to skip"); } - @Parameterized.Parameters - public static Collection data() { - return Arrays.asList(convertTo2dArray(encodings, skipLevels, testSchemas())); + public static Stream data() { + return convertTo2dArray(encodings, skipLevels, testSchemas()); } private static Object[][] encodings = new Object[][] { { Encoding.BINARY }, { Encoding.BLOCKING_BINARY }, @@ -517,19 +533,11 @@ public static Collection data() { private static Object[][] skipLevels = new Object[][] { { -1 }, { 0 }, { 1 }, { 2 }, }; - public static Object[][] convertTo2dArray(final Object[][]... values) { - ArrayList ret = new ArrayList<>(); + public static Stream convertTo2dArray(final Object[][]... values) { - Iterator iter = cartesian(values); - while (iter.hasNext()) { - Object[] objects = iter.next(); - ret.add(objects); - } - Object[][] retArrays = new Object[ret.size()][]; - for (int i = 0; i < ret.size(); i++) { - retArrays[i] = ret.get(i); - } - return retArrays; + final Iterator iter = cartesian(values); + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED), false) + .map(Arguments::of); } /** @@ -613,9 +621,10 @@ public static Object[][] testSchemas() { * - Enum and its value [ Start array ] End array { Start map } End map s start * item */ - return new Object[][] { { "\"null\"", "N" }, { "\"boolean\"", "B" }, { "\"int\"", "I" }, { "\"long\"", "L" }, - { "\"float\"", "F" }, { "\"double\"", "D" }, { "\"string\"", "S0" }, { "\"string\"", "S10" }, - { "\"bytes\"", "b0" }, { "\"bytes\"", "b10" }, { "{\"type\":\"fixed\", \"name\":\"fi\", \"size\": 1}", "f1" }, + Object[][] values = new Object[][] { { "\"null\"", "N" }, { "\"boolean\"", "B" }, { "\"int\"", "I" }, + { "\"long\"", "L" }, { "\"float\"", "F" }, { "\"double\"", "D" }, { "\"string\"", "S0" }, + { "\"string\"", "S10" }, { "\"bytes\"", "b0" }, { "\"bytes\"", "b10" }, + { "{\"type\":\"fixed\", \"name\":\"fi\", \"size\": 1}", "f1" }, { "{\"type\":\"fixed\", \"name\":\"fi\", \"size\": 10}", "f10" }, { "{\"type\":\"enum\", \"name\":\"en\", \"symbols\":[\"v1\", \"v2\"]}", "e1" }, @@ -730,6 +739,22 @@ public static Object[][] testSchemas() { "[c1s[c1s[c1s[c1s[c1s[c1s[c1s[c1s[c1s[c1s[c1s[]]]]]]]]]]]]" }, }; + + // Add extended record + Schema root = Schema.createRecord("root", "doc", "space", false); + Schema.Field field = new Schema.Field("value", Schema.create(Schema.Type.STRING), "doc"); + root.setFields(Arrays.asList(field)); + + Schema child = Schema.createRecord(root, "child", "doc", "space", false); + Schema.Field childField = new Schema.Field("valueField", Schema.create(Schema.Type.STRING), "doc"); + child.setFields(Arrays.asList(childField)); + + final Stream stream = Arrays.stream(values).sequential() + .map((Object[] input) -> new Object[] { new Schema.Parser().parse((String) (input[0])), input[1] }); + + final Stream extendedRecordTest = Stream.of(new Object[][] { { root, "E1S10S10" }, { root, "E0S10" } }); + + return Stream.concat(stream, extendedRecordTest).collect(Collectors.toList()).toArray(new Object[][] {}); } static void dump(byte[] bb) { diff --git a/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java b/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java index 7c6bf1a180b..fa1e8269f73 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java +++ b/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java @@ -19,6 +19,7 @@ package org.apache.avro.message; +import java.nio.Buffer; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; diff --git a/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java b/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java index 476df74e689..7b46cdaa858 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java @@ -53,6 +53,9 @@ import static org.junit.Assert.*; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonNode; + public class TestSchema { @Rule @@ -206,6 +209,26 @@ public void testRecord() throws Exception { checkParseError("{\"type\":\"record\",\"name\":\"X\",\"fields\":[" + "{\"name\":\"f.g\",\"type\":\"int\"}]}"); } + @Test + public void testExtendedRecord() throws IOException { + String recordJsonParent = "{\"type\":\"record\", \"name\":\"Test\", \"fields\":" + + "[{\"name\":\"f\", \"type\":\"long\", \"foo\":\"bar\"}]}"; + + String recordJson = "{\"type\":\"record: Test\", \"name\":\"Child\", \"fields\":" + + "[{\"name\":\"f1\", \"type\":\"string\", \"foo1\":\"bar1\"}]}"; + + final JsonNode jsonParent = Schema.MAPPER.readTree(recordJsonParent.getBytes(StandardCharsets.UTF_8)); + final JsonNode jsonSchema = Schema.MAPPER.readTree(recordJson.getBytes(StandardCharsets.UTF_8)); + Schema.Names names = new Schema.Names(); + + Schema.parse(jsonParent, names); + Schema schema = Schema.parse(jsonSchema, names); + assertEquals(Type.RECORD, schema.getType()); + assertEquals(Type.LONG, schema.getField("f").schema().getType()); + assertEquals(Type.STRING, schema.getField("f1").schema().getType()); + assertEquals("org.apache.avro.Schema$ExtendedRecordSchema", schema.getClass().getName()); + } + @Test public void testInvalidNameTolerance() { new Schema.Parser().setValidate(false).parse("{\"type\":\"record\",\"name\":\"1X\",\"fields\":[]}");