diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java new file mode 100644 index 000000000000..ad09079a727b --- /dev/null +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; + +public class FlinkRowData { + + private FlinkRowData() {} + + public static RowData.FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) { + RowData.FieldGetter flinkFieldGetter = RowData.createFieldGetter(fieldType, fieldPos); + return rowData -> { + // Be sure to check for null values, even if the field is required. Flink + // RowData.createFieldGetter(..) does not null-check optional / nullable types. Without this + // explicit null check, the null flag of BinaryRowData will be ignored and random bytes will + // be parsed as actual values. This will produce incorrect writes instead of failing with a + // NullPointerException. + if (!fieldType.isNullable() && rowData.isNullAt(fieldPos)) { + return null; + } + return flinkFieldGetter.getFieldOrNull(rowData); + }; + } +} diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java index d4cec7a3e80b..3ef611f2ded5 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java @@ -70,7 +70,7 @@ public T get(int pos, Class javaClass) { return javaClass.cast(getters[pos].get(rowData, pos)); } - Object value = RowData.createFieldGetter(types[pos], pos).getFieldOrNull(rowData); + Object value = FlinkRowData.createFieldGetter(types[pos], pos).getFieldOrNull(rowData); return javaClass.cast(value); } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java index da2f95cf822f..275718f7c570 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java @@ -298,7 +298,7 @@ static class RowDataWriter extends GenericOrcWriters.StructWriter { this.fieldGetters = Lists.newArrayListWithExpectedSize(types.size()); for (int i = 0; i < types.size(); i++) { - fieldGetters.add(RowData.createFieldGetter(types.get(i), i)); + fieldGetters.add(FlinkRowData.createFieldGetter(types.get(i), i)); } } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index db4f1730a134..30ec5069ae09 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -35,6 +35,7 @@ import org.apache.flink.table.types.logical.RowType.RowField; import org.apache.flink.table.types.logical.SmallIntType; import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.iceberg.flink.FlinkRowData; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; @@ -492,7 +493,7 @@ private static class RowDataWriter extends ParquetValueWriters.StructWriter> writers, List types) { this.getters = new RowData.FieldGetter[writers.size()]; for (int i = 0; i < writers.size(); i += 1) { this.writers[i] = writers.get(i); - this.getters[i] = RowData.createFieldGetter(types.get(i), i); + this.getters[i] = FlinkRowData.createFieldGetter(types.get(i), i); } } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java index 33816c97ac29..561f37b41bc9 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java @@ -135,7 +135,7 @@ private static RowData.FieldGetter createFieldGetter( projectField, rowField); - return RowData.createFieldGetter(rowType.getTypeAt(position), position); + return FlinkRowData.createFieldGetter(rowType.getTypeAt(position), position); case LIST: Types.ListType projectedList = projectField.type().asListType(); @@ -150,10 +150,10 @@ private static RowData.FieldGetter createFieldGetter( projectField, rowField); - return RowData.createFieldGetter(rowType.getTypeAt(position), position); + return FlinkRowData.createFieldGetter(rowType.getTypeAt(position), position); default: - return RowData.createFieldGetter(rowType.getTypeAt(position), position); + return FlinkRowData.createFieldGetter(rowType.getTypeAt(position), position); } } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataRecordFactory.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataRecordFactory.java index 40d5c28d7bc7..5e131740305e 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataRecordFactory.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataRecordFactory.java @@ -45,7 +45,7 @@ static TypeSerializer[] createFieldSerializers(RowType rowType) { static RowData.FieldGetter[] createFieldGetters(RowType rowType) { RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()]; for (int i = 0; i < rowType.getFieldCount(); ++i) { - fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i); + fieldGetters[i] = FlinkRowData.createFieldGetter(rowType.getTypeAt(i), i); } return fieldGetters; diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java index 8cebf950c5f0..5808e972a9df 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java @@ -89,7 +89,7 @@ public static RowData copyRowData(RowData from, RowType rowType) { .toArray(TypeSerializer[]::new); RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()]; for (int i = 0; i < rowType.getFieldCount(); ++i) { - fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i); + fieldGetters[i] = FlinkRowData.createFieldGetter(rowType.getTypeAt(i), i); } return RowDataUtil.clone(from, null, rowType, fieldSerializers, fieldGetters); @@ -200,7 +200,7 @@ public static void assertRowData( Object actual = actualRowData.isNullAt(i) ? null - : RowData.createFieldGetter(logicalType, i).getFieldOrNull(actualRowData); + : FlinkRowData.createFieldGetter(logicalType, i).getFieldOrNull(actualRowData); assertEquals(types.get(i), logicalType, expected, actual); } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java index cbf49ae6faa9..fc5c688321c1 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java @@ -67,13 +67,13 @@ public abstract class AbstractTestFlinkAvroReaderWriter extends DataTest { @Override protected void writeAndValidate(Schema schema) throws IOException { List expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L); - writeAndValidate(schema, expectedRecords, NUM_RECORDS); + writeAndValidate(schema, expectedRecords); } protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema); - private void writeAndValidate(Schema schema, List expectedRecords, int numRecord) - throws IOException { + @Override + protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords)); @@ -93,7 +93,7 @@ private void writeAndValidate(Schema schema, List expectedRecords, int n try (CloseableIterable reader = createAvroReadBuilder(recordsFile, schema).build()) { Iterator expected = expectedRecords.iterator(); Iterator rows = reader.iterator(); - for (int i = 0; i < numRecord; i++) { + for (int i = 0; i < expectedRecords.size(); i++) { assertThat(rows).hasNext(); TestHelpers.assertRowData(schema.asStruct(), flinkSchema, expected.next(), rows.next()); } @@ -120,7 +120,7 @@ private void writeAndValidate(Schema schema, List expectedRecords, int n .build()) { Iterator expected = expectedRows.iterator(); Iterator records = reader.iterator(); - for (int i = 0; i < numRecord; i += 1) { + for (int i = 0; i < expectedRecords.size(); i += 1) { assertThat(records).hasNext(); TestHelpers.assertRowData(schema.asStruct(), flinkSchema, records.next(), expected.next()); } @@ -177,6 +177,6 @@ public void testNumericTypes() throws IOException { 1643811742000L, 10.24d)); - writeAndValidate(SCHEMA_NUM_TYPE, expected, 2); + writeAndValidate(SCHEMA_NUM_TYPE, expected); } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java index 72f2ce4f4bce..9231caaf9647 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java @@ -45,8 +45,13 @@ public class TestFlinkOrcReaderWriter extends DataTest { @Override protected void writeAndValidate(Schema schema) throws IOException { - RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1990L); + writeAndValidate(schema, expectedRecords); + } + + @Override + protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { + RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords)); File recordsFile = File.createTempFile("junit", null, temp.toFile()); diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java index b1e6f5aa00ff..657a2da542a5 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java @@ -24,7 +24,10 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Iterator; +import java.util.List; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.logical.LogicalType; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; @@ -33,10 +36,12 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.io.TempDir; public class TestFlinkParquetWriter extends DataTest { @@ -91,4 +96,17 @@ protected void writeAndValidate(Schema schema) throws IOException { schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)), schema); } + + @Override + protected void writeAndValidate(Schema schema, List expectedData) throws IOException { + RowDataSerializer rowDataSerializer = new RowDataSerializer(FlinkSchemaUtil.convert(schema)); + List binaryRowList = Lists.newArrayList(); + for (Record record : expectedData) { + RowData rowData = RowDataConverter.convert(schema, record); + BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData); + binaryRowList.add(binaryRow); + } + + writeAndValidate(binaryRowList, schema); + } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java new file mode 100644 index 000000000000..ad09079a727b --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; + +public class FlinkRowData { + + private FlinkRowData() {} + + public static RowData.FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) { + RowData.FieldGetter flinkFieldGetter = RowData.createFieldGetter(fieldType, fieldPos); + return rowData -> { + // Be sure to check for null values, even if the field is required. Flink + // RowData.createFieldGetter(..) does not null-check optional / nullable types. Without this + // explicit null check, the null flag of BinaryRowData will be ignored and random bytes will + // be parsed as actual values. This will produce incorrect writes instead of failing with a + // NullPointerException. + if (!fieldType.isNullable() && rowData.isNullAt(fieldPos)) { + return null; + } + return flinkFieldGetter.getFieldOrNull(rowData); + }; + } +} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java index d4cec7a3e80b..3ef611f2ded5 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java @@ -70,7 +70,7 @@ public T get(int pos, Class javaClass) { return javaClass.cast(getters[pos].get(rowData, pos)); } - Object value = RowData.createFieldGetter(types[pos], pos).getFieldOrNull(rowData); + Object value = FlinkRowData.createFieldGetter(types[pos], pos).getFieldOrNull(rowData); return javaClass.cast(value); } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java index da2f95cf822f..275718f7c570 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java @@ -298,7 +298,7 @@ static class RowDataWriter extends GenericOrcWriters.StructWriter { this.fieldGetters = Lists.newArrayListWithExpectedSize(types.size()); for (int i = 0; i < types.size(); i++) { - fieldGetters.add(RowData.createFieldGetter(types.get(i), i)); + fieldGetters.add(FlinkRowData.createFieldGetter(types.get(i), i)); } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index db4f1730a134..30ec5069ae09 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -35,6 +35,7 @@ import org.apache.flink.table.types.logical.RowType.RowField; import org.apache.flink.table.types.logical.SmallIntType; import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.iceberg.flink.FlinkRowData; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; @@ -492,7 +493,7 @@ private static class RowDataWriter extends ParquetValueWriters.StructWriter> writers, List types) { this.getters = new RowData.FieldGetter[writers.size()]; for (int i = 0; i < writers.size(); i += 1) { this.writers[i] = writers.get(i); - this.getters[i] = RowData.createFieldGetter(types.get(i), i); + this.getters[i] = FlinkRowData.createFieldGetter(types.get(i), i); } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java index 33816c97ac29..561f37b41bc9 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java @@ -135,7 +135,7 @@ private static RowData.FieldGetter createFieldGetter( projectField, rowField); - return RowData.createFieldGetter(rowType.getTypeAt(position), position); + return FlinkRowData.createFieldGetter(rowType.getTypeAt(position), position); case LIST: Types.ListType projectedList = projectField.type().asListType(); @@ -150,10 +150,10 @@ private static RowData.FieldGetter createFieldGetter( projectField, rowField); - return RowData.createFieldGetter(rowType.getTypeAt(position), position); + return FlinkRowData.createFieldGetter(rowType.getTypeAt(position), position); default: - return RowData.createFieldGetter(rowType.getTypeAt(position), position); + return FlinkRowData.createFieldGetter(rowType.getTypeAt(position), position); } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataRecordFactory.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataRecordFactory.java index 40d5c28d7bc7..5e131740305e 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataRecordFactory.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataRecordFactory.java @@ -45,7 +45,7 @@ static TypeSerializer[] createFieldSerializers(RowType rowType) { static RowData.FieldGetter[] createFieldGetters(RowType rowType) { RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()]; for (int i = 0; i < rowType.getFieldCount(); ++i) { - fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i); + fieldGetters[i] = FlinkRowData.createFieldGetter(rowType.getTypeAt(i), i); } return fieldGetters; diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java index 3cab89e1ac7d..9fbd5f25c2f4 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java @@ -89,7 +89,7 @@ public static RowData copyRowData(RowData from, RowType rowType) { .toArray(TypeSerializer[]::new); RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()]; for (int i = 0; i < rowType.getFieldCount(); ++i) { - fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i); + fieldGetters[i] = FlinkRowData.createFieldGetter(rowType.getTypeAt(i), i); } return RowDataUtil.clone(from, null, rowType, fieldSerializers, fieldGetters); @@ -200,7 +200,7 @@ public static void assertRowData( Object actual = actualRowData.isNullAt(i) ? null - : RowData.createFieldGetter(logicalType, i).getFieldOrNull(actualRowData); + : FlinkRowData.createFieldGetter(logicalType, i).getFieldOrNull(actualRowData); assertEquals(types.get(i), logicalType, expected, actual); } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java index cbf49ae6faa9..fc5c688321c1 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java @@ -67,13 +67,13 @@ public abstract class AbstractTestFlinkAvroReaderWriter extends DataTest { @Override protected void writeAndValidate(Schema schema) throws IOException { List expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L); - writeAndValidate(schema, expectedRecords, NUM_RECORDS); + writeAndValidate(schema, expectedRecords); } protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema); - private void writeAndValidate(Schema schema, List expectedRecords, int numRecord) - throws IOException { + @Override + protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords)); @@ -93,7 +93,7 @@ private void writeAndValidate(Schema schema, List expectedRecords, int n try (CloseableIterable reader = createAvroReadBuilder(recordsFile, schema).build()) { Iterator expected = expectedRecords.iterator(); Iterator rows = reader.iterator(); - for (int i = 0; i < numRecord; i++) { + for (int i = 0; i < expectedRecords.size(); i++) { assertThat(rows).hasNext(); TestHelpers.assertRowData(schema.asStruct(), flinkSchema, expected.next(), rows.next()); } @@ -120,7 +120,7 @@ private void writeAndValidate(Schema schema, List expectedRecords, int n .build()) { Iterator expected = expectedRows.iterator(); Iterator records = reader.iterator(); - for (int i = 0; i < numRecord; i += 1) { + for (int i = 0; i < expectedRecords.size(); i += 1) { assertThat(records).hasNext(); TestHelpers.assertRowData(schema.asStruct(), flinkSchema, records.next(), expected.next()); } @@ -177,6 +177,6 @@ public void testNumericTypes() throws IOException { 1643811742000L, 10.24d)); - writeAndValidate(SCHEMA_NUM_TYPE, expected, 2); + writeAndValidate(SCHEMA_NUM_TYPE, expected); } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java index 72f2ce4f4bce..9231caaf9647 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java @@ -45,8 +45,13 @@ public class TestFlinkOrcReaderWriter extends DataTest { @Override protected void writeAndValidate(Schema schema) throws IOException { - RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1990L); + writeAndValidate(schema, expectedRecords); + } + + @Override + protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { + RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords)); File recordsFile = File.createTempFile("junit", null, temp.toFile()); diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java index b1e6f5aa00ff..657a2da542a5 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java @@ -24,7 +24,10 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Iterator; +import java.util.List; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.logical.LogicalType; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; @@ -33,10 +36,12 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.io.TempDir; public class TestFlinkParquetWriter extends DataTest { @@ -91,4 +96,17 @@ protected void writeAndValidate(Schema schema) throws IOException { schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)), schema); } + + @Override + protected void writeAndValidate(Schema schema, List expectedData) throws IOException { + RowDataSerializer rowDataSerializer = new RowDataSerializer(FlinkSchemaUtil.convert(schema)); + List binaryRowList = Lists.newArrayList(); + for (Record record : expectedData) { + RowData rowData = RowDataConverter.convert(schema, record); + BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData); + binaryRowList.add(binaryRow); + } + + writeAndValidate(binaryRowList, schema); + } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java index 7f355c1e8403..7bd82e685ea8 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.sink; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -38,9 +39,11 @@ import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.flink.MiniFlinkClusterExtension; import org.apache.iceberg.flink.SimpleDataUtil; @@ -51,6 +54,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.junit.Assume; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -414,6 +419,44 @@ void testOperatorsUidNameWitUidSuffix() throws Exception { assertThat(secondTransformation.getName()).isEqualTo("data-ingestion"); } + @TestTemplate + void testErrorOnNullForRequiredField() throws Exception { + Assume.assumeFalse( + "ORC file format supports null values even for required fields.", format == FileFormat.ORC); + + Schema icebergSchema = + new Schema( + Types.NestedField.required(1, "id2", Types.IntegerType.get()), + Types.NestedField.required(2, "data2", Types.StringType.get())); + TableIdentifier tableIdentifier = TableIdentifier.of(DATABASE, "t2"); + Table table2 = + CATALOG_EXTENSION + .catalog() + .createTable( + tableIdentifier, + icebergSchema, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + + // Null out a required field + List rows = List.of(Row.of(42, null)); + + env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream dataStream = + env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId"); + + TableSchema flinkSchema = FlinkSchemaUtil.toSchema(icebergSchema); + IcebergSink.forRow(dataStream, flinkSchema) + .table(table2) + .tableLoader(TableLoader.fromCatalog(CATALOG_EXTENSION.catalogLoader(), tableIdentifier)) + .tableSchema(flinkSchema) + .writeParallelism(parallelism) + .append(); + + assertThatThrownBy(() -> env.execute()).hasRootCause(new NullPointerException()); + } + private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode) throws Exception { List rows = createRows("");