Skip to content

Commit

Permalink
Flink: Test both "new" Flink Avro planned reader and "deprecated" Avr…
Browse files Browse the repository at this point in the history
…o reader (#11430)
  • Loading branch information
jbonofre authored Nov 26, 2024
1 parent 430ebff commit f356087
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.iceberg.util.DateTimeUtil;
import org.junit.jupiter.api.Test;

public class TestFlinkAvroReaderWriter extends DataTest {
public abstract class AbstractTestFlinkAvroReaderWriter extends DataTest {

private static final int NUM_RECORDS = 100;

Expand All @@ -70,6 +70,8 @@ protected void writeAndValidate(Schema schema) throws IOException {
writeAndValidate(schema, expectedRecords, NUM_RECORDS);
}

protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema);

private void writeAndValidate(Schema schema, List<Record> expectedRecords, int numRecord)
throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
Expand All @@ -88,11 +90,7 @@ private void writeAndValidate(Schema schema, List<Record> expectedRecords, int n
writer.addAll(expectedRecords);
}

try (CloseableIterable<RowData> reader =
Avro.read(Files.localInput(recordsFile))
.project(schema)
.createResolvingReader(FlinkPlannedAvroReader::create)
.build()) {
try (CloseableIterable<RowData> reader = createAvroReadBuilder(recordsFile, schema).build()) {
Iterator<Record> expected = expectedRecords.iterator();
Iterator<RowData> rows = reader.iterator();
for (int i = 0; i < numRecord; i++) {
Expand Down Expand Up @@ -156,7 +154,6 @@ private Record recordNumType(

@Test
public void testNumericTypes() throws IOException {

List<Record> expected =
ImmutableList.of(
recordNumType(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.data;

import java.io.File;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.Avro;

/**
* @deprecated should be removed in 1.8.0; along with FlinkAvroReader.
*/
@Deprecated
public class TestFlinkAvroDeprecatedReaderWriter extends AbstractTestFlinkAvroReaderWriter {

@Override
protected Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema) {
return Avro.read(Files.localInput(recordsFile))
.project(schema)
.createReaderFunc(FlinkAvroReader::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.data;

import java.io.File;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.Avro;

public class TestFlinkAvroPlannedReaderWriter extends AbstractTestFlinkAvroReaderWriter {

@Override
protected Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema) {
return Avro.read(Files.localInput(recordsFile))
.project(schema)
.createResolvingReader(FlinkPlannedAvroReader::create);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.GenericArrayData;
Expand All @@ -32,6 +34,9 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.iceberg.Files;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.flink.FlinkSchemaUtil;
Expand All @@ -41,13 +46,23 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(ParameterizedTestExtension.class)
public class TestRowProjection {

@TempDir private Path temp;

@Parameter(index = 0)
protected Boolean useAvroPlannedReader;

@Parameters(name = "useAvroPlannedReader={0}")
protected static List<Object[]> parameters() {
return Arrays.asList(new Object[] {Boolean.FALSE}, new Object[] {Boolean.TRUE});
}

private RowData writeAndRead(String desc, Schema writeSchema, Schema readSchema, RowData row)
throws IOException {
File file = File.createTempFile("junit", desc + ".avro", temp.toFile());
Expand All @@ -61,16 +76,23 @@ private RowData writeAndRead(String desc, Schema writeSchema, Schema readSchema,
appender.add(row);
}

Iterable<RowData> records =
Avro.ReadBuilder builder =
Avro.read(Files.localInput(file))
.project(readSchema)
.createResolvingReader(FlinkPlannedAvroReader::create)
.build();
.createReaderFunc(FlinkAvroReader::new);
if (useAvroPlannedReader) {
builder =
Avro.read(Files.localInput(file))
.project(readSchema)
.createResolvingReader(FlinkPlannedAvroReader::create);
}

Iterable<RowData> records = builder.build();

return Iterables.getOnlyElement(records);
}

@Test
@TestTemplate
public void testFullProjection() throws Exception {
Schema schema =
new Schema(
Expand All @@ -85,7 +107,7 @@ public void testFullProjection() throws Exception {
assertThat(projected.getString(1)).asString().isEqualTo("test");
}

@Test
@TestTemplate
public void testSpecialCharacterProjection() throws Exception {
Schema schema =
new Schema(
Expand All @@ -105,7 +127,7 @@ public void testSpecialCharacterProjection() throws Exception {
assertThat(projected.getString(0)).asString().isEqualTo("test");
}

@Test
@TestTemplate
public void testReorderedFullProjection() throws Exception {
Schema schema =
new Schema(
Expand All @@ -125,7 +147,7 @@ public void testReorderedFullProjection() throws Exception {
assertThat(projected.getLong(1)).isEqualTo(34);
}

@Test
@TestTemplate
public void testReorderedProjection() throws Exception {
Schema schema =
new Schema(
Expand All @@ -147,7 +169,7 @@ public void testReorderedProjection() throws Exception {
assertThat(projected.isNullAt(2)).isTrue();
}

@Test
@TestTemplate
public void testRenamedAddedField() throws Exception {
Schema schema =
new Schema(
Expand Down Expand Up @@ -177,7 +199,7 @@ public void testRenamedAddedField() throws Exception {
assertThat(projected.isNullAt(3)).as("Should contain empty value on new column 4").isTrue();
}

@Test
@TestTemplate
public void testEmptyProjection() throws Exception {
Schema schema =
new Schema(
Expand All @@ -192,7 +214,7 @@ public void testEmptyProjection() throws Exception {
assertThat(projected.getArity()).isEqualTo(0);
}

@Test
@TestTemplate
public void testBasicProjection() throws Exception {
Schema writeSchema =
new Schema(
Expand All @@ -216,7 +238,7 @@ public void testBasicProjection() throws Exception {
assertThat(projected.getString(0)).asString().isEqualTo("test");
}

@Test
@TestTemplate
public void testRename() throws Exception {
Schema writeSchema =
new Schema(
Expand All @@ -239,7 +261,7 @@ public void testRename() throws Exception {
.isEqualTo("test");
}

@Test
@TestTemplate
public void testNestedStructProjection() throws Exception {
Schema writeSchema =
new Schema(
Expand Down Expand Up @@ -305,7 +327,7 @@ public void testNestedStructProjection() throws Exception {
.isEqualTo(-1.539054f, withPrecision(0.000001f));
}

@Test
@TestTemplate
public void testMapProjection() throws IOException {
Schema writeSchema =
new Schema(
Expand Down Expand Up @@ -359,7 +381,7 @@ public void testMapProjection() throws IOException {
return stringMap;
}

@Test
@TestTemplate
public void testMapOfStructsProjection() throws IOException {
Schema writeSchema =
new Schema(
Expand Down Expand Up @@ -459,7 +481,7 @@ public void testMapOfStructsProjection() throws IOException {
.isEqualTo(52.995143f, withPrecision(0.000001f));
}

@Test
@TestTemplate
public void testListProjection() throws IOException {
Schema writeSchema =
new Schema(
Expand Down Expand Up @@ -488,7 +510,7 @@ public void testListProjection() throws IOException {
assertThat(projected.getArray(0)).isEqualTo(values);
}

@Test
@TestTemplate
@SuppressWarnings("unchecked")
public void testListOfStructsProjection() throws IOException {
Schema writeSchema =
Expand Down Expand Up @@ -565,7 +587,7 @@ public void testListOfStructsProjection() throws IOException {
assertThat(projectedP2.isNullAt(0)).as("Should project null z").isTrue();
}

@Test
@TestTemplate
public void testAddedFieldsWithRequiredChildren() throws Exception {
Schema schema = new Schema(Types.NestedField.required(1, "a", Types.LongType.get()));

Expand Down

0 comments on commit f356087

Please sign in to comment.