From 42c1b866620f326a8db6560d3956343aa7d310ca Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 27 Jan 2025 13:41:34 +0100 Subject: [PATCH] Flink: Add null check to writers to prevent resurrecting null values Flink's BinaryRowData uses a magic byte to indicate null values in the backing byte arrays. Flink's internal RowData#createFieldGetter method which Iceberg uses, only adds a null check whenever a type is nullable. We map Iceberg's optional attribute to nullable, but Iceberg's required attribute to non-nullable. The latter creates an issue when the user, by mistake, nulls a field. The resulting RowData field will then be interpreted as actual data because the null field is not checked. This yields random values which should have been null and produced an error in the writer. The solution is to always check if a field is nullable before attempting to read data from it. --- .../org/apache/iceberg/data/DataTest.java | 22 ++++++++++ .../iceberg/data/avro/TestGenericData.java | 14 +++++- .../iceberg/data/orc/TestGenericData.java | 5 +++ .../iceberg/data/parquet/TestGenericData.java | 13 +++++- ...TestParquetEncryptionWithWriteSupport.java | 5 +++ .../flink/data/TestFlinkParquetReader.java | 5 +++ .../flink/data/TestFlinkParquetReader.java | 5 +++ .../apache/iceberg/flink/FlinkRowData.java | 42 ++++++++++++++++++ .../apache/iceberg/flink/RowDataWrapper.java | 2 +- .../iceberg/flink/data/FlinkOrcWriters.java | 3 +- .../flink/data/FlinkParquetWriters.java | 3 +- .../iceberg/flink/data/FlinkValueWriters.java | 3 +- .../iceberg/flink/data/RowDataProjection.java | 7 +-- .../source/reader/RowDataRecordFactory.java | 3 +- .../org/apache/iceberg/flink/TestHelpers.java | 4 +- .../AbstractTestFlinkAvroReaderWriter.java | 16 +++---- .../flink/data/TestFlinkOrcReaderWriter.java | 7 ++- .../flink/data/TestFlinkParquetReader.java | 5 +++ .../flink/data/TestFlinkParquetWriter.java | 18 ++++++++ .../iceberg/flink/sink/TestIcebergSink.java | 43 +++++++++++++++++++ 20 files changed, 204 insertions(+), 21 deletions(-) create mode 100644 flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java diff --git a/data/src/test/java/org/apache/iceberg/data/DataTest.java b/data/src/test/java/org/apache/iceberg/data/DataTest.java index 657fa805e5a6..578b61138311 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTest.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTest.java @@ -27,10 +27,12 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.file.Path; +import java.util.List; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; @@ -50,6 +52,8 @@ public abstract class DataTest { protected abstract void writeAndValidate(Schema schema) throws IOException; + protected abstract void writeAndValidate(Schema schema, List data) throws IOException; + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { throw new UnsupportedEncodingException( "Cannot run test, writeAndValidate(Schema, Schema) is not implemented"); @@ -486,4 +490,22 @@ public void testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Object defau writeAndValidate(writeSchema, readSchema); } + + @Test + public void testWriteNullValueForRequiredType() { + Assumptions.assumeThat(supportsDefaultValues()).isTrue(); + + Schema schema = + new Schema( + required(0, "id", LongType.get()), required(1, "string", Types.StringType.get())); + + GenericRecord genericRecord = GenericRecord.create(schema); + genericRecord.set(0, 42L); + genericRecord.set(1, null); + + assertThatThrownBy( + // The actual exception depends on the implementation, e.g. + // NullPointerException or IllegalArgumentException. + () -> writeAndValidate(schema, ImmutableList.of(genericRecord))); + } } diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java index bf5160fd18dc..808fb3317339 100644 --- a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java @@ -35,14 +35,26 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; public class TestGenericData extends DataTest { + @Override protected void writeAndValidate(Schema schema) throws IOException { writeAndValidate(schema, schema); } + @Override + protected void writeAndValidate(Schema writeSchema, List expectedData) + throws IOException { + writeAndValidate(writeSchema, writeSchema, expectedData); + } + @Override protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { - List expected = RandomGenericData.generate(writeSchema, 100, 0L); + List data = RandomGenericData.generate(writeSchema, 100, 0L); + writeAndValidate(writeSchema, expectedSchema, data); + } + + private void writeAndValidate(Schema writeSchema, Schema expectedSchema, List expected) + throws IOException { File testFile = File.createTempFile("junit", null, temp.toFile()); assertThat(testFile.delete()).isTrue(); diff --git a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java index 5147fd377c62..f406bde43c50 100644 --- a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java @@ -61,6 +61,11 @@ protected void writeAndValidate(Schema schema) throws IOException { writeAndValidateRecords(schema, expected); } + @Override + protected void writeAndValidate(Schema schema, List expectedData) throws IOException { + writeAndValidateRecords(schema, expectedData); + } + @Test public void writeAndValidateRepeatingRecords() throws IOException { Schema structSchema = diff --git a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java index 5a63f7a1fc9d..e0a02df97bc0 100644 --- a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java @@ -46,14 +46,25 @@ import org.junit.jupiter.api.Test; public class TestGenericData extends DataTest { + @Override protected void writeAndValidate(Schema schema) throws IOException { writeAndValidate(schema, schema); } + @Override + protected void writeAndValidate(Schema schema, List expected) throws IOException { + writeAndValidate(schema, schema, expected); + } + @Override protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { - List expected = RandomGenericData.generate(writeSchema, 100, 12228L); + List data = RandomGenericData.generate(writeSchema, 100, 12228L); + writeAndValidate(writeSchema, expectedSchema, data); + } + + private void writeAndValidate(Schema writeSchema, Schema expectedSchema, List expected) + throws IOException { File testFile = File.createTempFile("junit", null, temp.toFile()); assertThat(testFile.delete()).isTrue(); diff --git a/data/src/test/java/org/apache/iceberg/data/parquet/TestParquetEncryptionWithWriteSupport.java b/data/src/test/java/org/apache/iceberg/data/parquet/TestParquetEncryptionWithWriteSupport.java index 4b0a10830221..a401c326dea6 100644 --- a/data/src/test/java/org/apache/iceberg/data/parquet/TestParquetEncryptionWithWriteSupport.java +++ b/data/src/test/java/org/apache/iceberg/data/parquet/TestParquetEncryptionWithWriteSupport.java @@ -59,6 +59,11 @@ public class TestParquetEncryptionWithWriteSupport extends DataTest { @Override protected void writeAndValidate(Schema schema) throws IOException { List expected = RandomGenericData.generate(schema, 100, 0L); + writeAndValidate(schema, expected); + } + + @Override + protected void writeAndValidate(Schema schema, List expected) throws IOException { File testFile = File.createTempFile("junit", null, temp.toFile()); assertThat(testFile.delete()).isTrue(); diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 6b1af0202c34..341b38793616 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -250,4 +250,9 @@ protected void writeAndValidate(Schema schema) throws IOException { protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { writeAndValidate(RandomGenericData.generate(writeSchema, 100, 0L), writeSchema, expectedSchema); } + + @Override + protected void writeAndValidate(Schema schema, List expectedData) throws IOException { + writeAndValidate(expectedData, schema, schema); + } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 6b1af0202c34..341b38793616 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -250,4 +250,9 @@ protected void writeAndValidate(Schema schema) throws IOException { protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { writeAndValidate(RandomGenericData.generate(writeSchema, 100, 0L), writeSchema, expectedSchema); } + + @Override + protected void writeAndValidate(Schema schema, List expectedData) throws IOException { + writeAndValidate(expectedData, schema, schema); + } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java new file mode 100644 index 000000000000..ad09079a727b --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkRowData.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; + +public class FlinkRowData { + + private FlinkRowData() {} + + public static RowData.FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) { + RowData.FieldGetter flinkFieldGetter = RowData.createFieldGetter(fieldType, fieldPos); + return rowData -> { + // Be sure to check for null values, even if the field is required. Flink + // RowData.createFieldGetter(..) does not null-check optional / nullable types. Without this + // explicit null check, the null flag of BinaryRowData will be ignored and random bytes will + // be parsed as actual values. This will produce incorrect writes instead of failing with a + // NullPointerException. + if (!fieldType.isNullable() && rowData.isNullAt(fieldPos)) { + return null; + } + return flinkFieldGetter.getFieldOrNull(rowData); + }; + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java index d4cec7a3e80b..3ef611f2ded5 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java @@ -70,7 +70,7 @@ public T get(int pos, Class javaClass) { return javaClass.cast(getters[pos].get(rowData, pos)); } - Object value = RowData.createFieldGetter(types[pos], pos).getFieldOrNull(rowData); + Object value = FlinkRowData.createFieldGetter(types[pos], pos).getFieldOrNull(rowData); return javaClass.cast(value); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java index da2f95cf822f..afce2cda1db1 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java @@ -32,6 +32,7 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.data.orc.GenericOrcWriters; +import org.apache.iceberg.flink.FlinkRowData; import org.apache.iceberg.orc.OrcValueWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -298,7 +299,7 @@ static class RowDataWriter extends GenericOrcWriters.StructWriter { this.fieldGetters = Lists.newArrayListWithExpectedSize(types.size()); for (int i = 0; i < types.size(); i++) { - fieldGetters.add(RowData.createFieldGetter(types.get(i), i)); + fieldGetters.add(FlinkRowData.createFieldGetter(types.get(i), i)); } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java index db4f1730a134..30ec5069ae09 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java @@ -35,6 +35,7 @@ import org.apache.flink.table.types.logical.RowType.RowField; import org.apache.flink.table.types.logical.SmallIntType; import org.apache.flink.table.types.logical.TinyIntType; +import org.apache.iceberg.flink.FlinkRowData; import org.apache.iceberg.parquet.ParquetValueReaders; import org.apache.iceberg.parquet.ParquetValueWriter; import org.apache.iceberg.parquet.ParquetValueWriters; @@ -492,7 +493,7 @@ private static class RowDataWriter extends ParquetValueWriters.StructWriter> writers, List types) { this.getters = new RowData.FieldGetter[writers.size()]; for (int i = 0; i < writers.size(); i += 1) { this.writers[i] = writers.get(i); - this.getters[i] = RowData.createFieldGetter(types.get(i), i); + this.getters[i] = FlinkRowData.createFieldGetter(types.get(i), i); } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java index 33816c97ac29..9395b0e4810e 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataProjection.java @@ -32,6 +32,7 @@ import org.apache.flink.types.RowKind; import org.apache.flink.util.StringUtils; import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.FlinkRowData; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -135,7 +136,7 @@ private static RowData.FieldGetter createFieldGetter( projectField, rowField); - return RowData.createFieldGetter(rowType.getTypeAt(position), position); + return FlinkRowData.createFieldGetter(rowType.getTypeAt(position), position); case LIST: Types.ListType projectedList = projectField.type().asListType(); @@ -150,10 +151,10 @@ private static RowData.FieldGetter createFieldGetter( projectField, rowField); - return RowData.createFieldGetter(rowType.getTypeAt(position), position); + return FlinkRowData.createFieldGetter(rowType.getTypeAt(position), position); default: - return RowData.createFieldGetter(rowType.getTypeAt(position), position); + return FlinkRowData.createFieldGetter(rowType.getTypeAt(position), position); } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataRecordFactory.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataRecordFactory.java index 40d5c28d7bc7..ef2eedcf3cdd 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataRecordFactory.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataRecordFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalSerializers; import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.flink.FlinkRowData; import org.apache.iceberg.flink.data.RowDataUtil; class RowDataRecordFactory implements RecordFactory { @@ -45,7 +46,7 @@ static TypeSerializer[] createFieldSerializers(RowType rowType) { static RowData.FieldGetter[] createFieldGetters(RowType rowType) { RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()]; for (int i = 0; i < rowType.getFieldCount(); ++i) { - fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i); + fieldGetters[i] = FlinkRowData.createFieldGetter(rowType.getTypeAt(i), i); } return fieldGetters; diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java index 3cab89e1ac7d..9fbd5f25c2f4 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java @@ -89,7 +89,7 @@ public static RowData copyRowData(RowData from, RowType rowType) { .toArray(TypeSerializer[]::new); RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowType.getFieldCount()]; for (int i = 0; i < rowType.getFieldCount(); ++i) { - fieldGetters[i] = RowData.createFieldGetter(rowType.getTypeAt(i), i); + fieldGetters[i] = FlinkRowData.createFieldGetter(rowType.getTypeAt(i), i); } return RowDataUtil.clone(from, null, rowType, fieldSerializers, fieldGetters); @@ -200,7 +200,7 @@ public static void assertRowData( Object actual = actualRowData.isNullAt(i) ? null - : RowData.createFieldGetter(logicalType, i).getFieldOrNull(actualRowData); + : FlinkRowData.createFieldGetter(logicalType, i).getFieldOrNull(actualRowData); assertEquals(types.get(i), logicalType, expected, actual); } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java index 4a59dcfd1e09..fc5c688321c1 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java @@ -36,8 +36,8 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.avro.DataWriter; -import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.io.CloseableIterable; @@ -67,13 +67,13 @@ public abstract class AbstractTestFlinkAvroReaderWriter extends DataTest { @Override protected void writeAndValidate(Schema schema) throws IOException { List expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L); - writeAndValidate(schema, expectedRecords, NUM_RECORDS); + writeAndValidate(schema, expectedRecords); } protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema); - private void writeAndValidate(Schema schema, List expectedRecords, int numRecord) - throws IOException { + @Override + protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords)); @@ -93,7 +93,7 @@ private void writeAndValidate(Schema schema, List expectedRecords, int n try (CloseableIterable reader = createAvroReadBuilder(recordsFile, schema).build()) { Iterator expected = expectedRecords.iterator(); Iterator rows = reader.iterator(); - for (int i = 0; i < numRecord; i++) { + for (int i = 0; i < expectedRecords.size(); i++) { assertThat(rows).hasNext(); TestHelpers.assertRowData(schema.asStruct(), flinkSchema, expected.next(), rows.next()); } @@ -116,11 +116,11 @@ private void writeAndValidate(Schema schema, List expectedRecords, int n try (CloseableIterable reader = Avro.read(Files.localInput(rowDataFile)) .project(schema) - .createResolvingReader(PlannedDataReader::create) + .createReaderFunc(DataReader::create) .build()) { Iterator expected = expectedRows.iterator(); Iterator records = reader.iterator(); - for (int i = 0; i < numRecord; i += 1) { + for (int i = 0; i < expectedRecords.size(); i += 1) { assertThat(records).hasNext(); TestHelpers.assertRowData(schema.asStruct(), flinkSchema, records.next(), expected.next()); } @@ -177,6 +177,6 @@ public void testNumericTypes() throws IOException { 1643811742000L, 10.24d)); - writeAndValidate(SCHEMA_NUM_TYPE, expected, 2); + writeAndValidate(SCHEMA_NUM_TYPE, expected); } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java index 72f2ce4f4bce..9231caaf9647 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java @@ -45,8 +45,13 @@ public class TestFlinkOrcReaderWriter extends DataTest { @Override protected void writeAndValidate(Schema schema) throws IOException { - RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1990L); + writeAndValidate(schema, expectedRecords); + } + + @Override + protected void writeAndValidate(Schema schema, List expectedRecords) throws IOException { + RowType flinkSchema = FlinkSchemaUtil.convert(schema); List expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords)); File recordsFile = File.createTempFile("junit", null, temp.toFile()); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 6b1af0202c34..341b38793616 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -250,4 +250,9 @@ protected void writeAndValidate(Schema schema) throws IOException { protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { writeAndValidate(RandomGenericData.generate(writeSchema, 100, 0L), writeSchema, expectedSchema); } + + @Override + protected void writeAndValidate(Schema schema, List expectedData) throws IOException { + writeAndValidate(expectedData, schema, schema); + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java index b1e6f5aa00ff..657a2da542a5 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java @@ -24,7 +24,10 @@ import java.io.IOException; import java.nio.file.Path; import java.util.Iterator; +import java.util.List; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.logical.LogicalType; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; @@ -33,10 +36,12 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.junit.jupiter.api.io.TempDir; public class TestFlinkParquetWriter extends DataTest { @@ -91,4 +96,17 @@ protected void writeAndValidate(Schema schema) throws IOException { schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)), schema); } + + @Override + protected void writeAndValidate(Schema schema, List expectedData) throws IOException { + RowDataSerializer rowDataSerializer = new RowDataSerializer(FlinkSchemaUtil.convert(schema)); + List binaryRowList = Lists.newArrayList(); + for (Record record : expectedData) { + RowData rowData = RowDataConverter.convert(schema, record); + BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData); + binaryRowList.add(binaryRow); + } + + writeAndValidate(binaryRowList, schema); + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java index 7f355c1e8403..7bd82e685ea8 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.sink; +import static org.apache.iceberg.flink.TestFixtures.DATABASE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -38,9 +39,11 @@ import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.flink.MiniFlinkClusterExtension; import org.apache.iceberg.flink.SimpleDataUtil; @@ -51,6 +54,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.junit.Assume; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -414,6 +419,44 @@ void testOperatorsUidNameWitUidSuffix() throws Exception { assertThat(secondTransformation.getName()).isEqualTo("data-ingestion"); } + @TestTemplate + void testErrorOnNullForRequiredField() throws Exception { + Assume.assumeFalse( + "ORC file format supports null values even for required fields.", format == FileFormat.ORC); + + Schema icebergSchema = + new Schema( + Types.NestedField.required(1, "id2", Types.IntegerType.get()), + Types.NestedField.required(2, "data2", Types.StringType.get())); + TableIdentifier tableIdentifier = TableIdentifier.of(DATABASE, "t2"); + Table table2 = + CATALOG_EXTENSION + .catalog() + .createTable( + tableIdentifier, + icebergSchema, + PartitionSpec.unpartitioned(), + ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name())); + + // Null out a required field + List rows = List.of(Row.of(42, null)); + + env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream dataStream = + env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId"); + + TableSchema flinkSchema = FlinkSchemaUtil.toSchema(icebergSchema); + IcebergSink.forRow(dataStream, flinkSchema) + .table(table2) + .tableLoader(TableLoader.fromCatalog(CATALOG_EXTENSION.catalogLoader(), tableIdentifier)) + .tableSchema(flinkSchema) + .writeParallelism(parallelism) + .append(); + + assertThatThrownBy(() -> env.execute()).hasRootCause(new NullPointerException()); + } + private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode) throws Exception { List rows = createRows("");