Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2956: Use avro SchemaBuilder API to convert record #2957

Merged
merged 2 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static java.util.Optional.empty;
import static java.util.Optional.of;
import static org.apache.avro.JsonProperties.NULL_VALUE;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED_DEFAULT;
import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
Expand Down Expand Up @@ -58,6 +57,7 @@
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
Expand Down Expand Up @@ -296,21 +296,24 @@ Schema convert(GroupType parquetSchema) {
}

private Schema convertFields(String name, List<Type> parquetFields, Map<String, Integer> names) {
String ns = namespace(name, names);
List<Schema.Field> fields = new ArrayList<Schema.Field>();
SchemaBuilder.FieldAssembler<Schema> builder =
SchemaBuilder.builder(namespace(name, names)).record(name).fields();
for (Type parquetType : parquetFields) {
Schema fieldSchema = convertField(parquetType, names);
if (parquetType.isRepetition(REPEATED)) { // If a repeated field is ungrouped, treat as REQUIRED per spec
fields.add(new Schema.Field(parquetType.getName(), Schema.createArray(fieldSchema)));
builder.name(parquetType.getName())
.type()
.array()
.items()
.type(fieldSchema)
.arrayDefault(new ArrayList<>());
} else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) {
fields.add(new Schema.Field(parquetType.getName(), optional(fieldSchema), null, NULL_VALUE));
builder.name(parquetType.getName()).type().optional().type(fieldSchema);
} else { // REQUIRED
fields.add(new Schema.Field(parquetType.getName(), fieldSchema, null, (Object) null));
builder.name(parquetType.getName()).type(fieldSchema).noDefault();
}
}
Schema schema = Schema.createRecord(name, null, ns, false);
schema.setFields(fields);
return schema;
return builder.endRecord();
}

private Schema convertField(final Type parquetType, Map<String, Integer> names) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,43 +72,43 @@ public static void setupConf() {
NEW_BEHAVIOR.setBoolean("parquet.avro.write-old-list-structure", false);
}

public static final String ALL_PARQUET_SCHEMA =
"message org.apache.parquet.avro.myrecord {\n" + " required boolean myboolean;\n"
+ " required int32 myint;\n"
+ " required int64 mylong;\n"
+ " required float myfloat;\n"
+ " required double mydouble;\n"
+ " required binary mybytes;\n"
+ " required binary mystring (UTF8);\n"
+ " required group mynestedrecord {\n"
+ " required int32 mynestedint;\n"
+ " }\n"
+ " required binary myenum (ENUM);\n"
+ " required group myarray (LIST) {\n"
+ " repeated int32 array;\n"
+ " }\n"
+ " optional group myoptionalarray (LIST) {\n"
+ " repeated int32 array;\n"
+ " }\n"
+ " required group myarrayofoptional (LIST) {\n"
+ " repeated group list {\n"
+ " optional int32 element;\n"
+ " }\n"
+ " }\n"
+ " required group myrecordarray (LIST) {\n"
+ " repeated group array {\n"
+ " required int32 a;\n"
+ " required int32 b;\n"
+ " }\n"
+ " }\n"
+ " required group mymap (MAP) {\n"
+ " repeated group map (MAP_KEY_VALUE) {\n"
+ " required binary key (UTF8);\n"
+ " required int32 value;\n"
+ " }\n"
+ " }\n"
+ " required fixed_len_byte_array(1) myfixed;\n"
+ "}\n";
public static final String ALL_PARQUET_SCHEMA = "message org.apache.parquet.avro.myrecord {\n"
+ " required boolean myboolean;\n"
+ " required int32 myint;\n"
+ " required int64 mylong;\n"
+ " required float myfloat;\n"
+ " required double mydouble;\n"
+ " required binary mybytes;\n"
+ " required binary mystring (UTF8);\n"
+ " required group mynestedrecord {\n"
+ " required int32 mynestedint;\n"
+ " }\n"
+ " required binary myenum (ENUM);\n"
+ " required group myarray (LIST) {\n"
+ " repeated int32 array;\n"
+ " }\n"
+ " optional group myoptionalarray (LIST) {\n"
+ " repeated int32 array;\n"
+ " }\n"
+ " required group myarrayofoptional (LIST) {\n"
+ " repeated group list {\n"
+ " optional int32 element;\n"
+ " }\n"
+ " }\n"
+ " required group myrecordarray (LIST) {\n"
+ " repeated group array {\n"
+ " required int32 a;\n"
+ " required int32 b;\n"
+ " }\n"
+ " }\n"
+ " required group mymap (MAP) {\n"
+ " repeated group map (MAP_KEY_VALUE) {\n"
+ " required binary key (UTF8);\n"
+ " required int32 value;\n"
+ " }\n"
+ " }\n"
+ " required fixed_len_byte_array(1) myfixed;\n"
+ "}\n";

private void testAvroToParquetConversion(Schema avroSchema, String schemaString) throws Exception {
testAvroToParquetConversion(new Configuration(false), avroSchema, schemaString);
Expand Down Expand Up @@ -432,7 +432,8 @@ public void testConvertUngroupedRepeatedField() throws Exception {
+ " \"name\": \"SchemaWithRepeatedField\","
+ " \"fields\": [{"
+ " \"name\": \"repeatedField\","
+ " \"type\": {\"type\": \"array\",\"items\": \"int\"}"
+ " \"type\": {\"type\": \"array\",\"items\": \"int\"},"
+ " \"default\": []"
+ " }]"
+ "}"),
"message SchemaWithRepeatedField { repeated int32 repeatedField; }");
Expand Down