-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Add null check to writers to prevent resurrecting null values #12049
base: main
Are you sure you want to change the base?
Conversation
// This will produce incorrect writes instead of failing with a NullPointerException. | ||
if (struct.isNullAt(index)) { | ||
return null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actual fix is here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to fix the FlinkOrcWriters
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. I revised the approach because it was too easy to miss instances. I'm instead wrapping RowData#createFieldGetter to make sure to null-check also for required / NonNull types. I'll raise an issue on the Flink side as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also see #12049 (comment).
@mxm: Please remove the 1.18, 1.19 changes from the PR. It is much easier to review this way, and apply changes required by the reviewer. When the PR has been merged, we can backport the changes to the other Flink versions. QQ: What happens when we have a type discrepancy between the Iceberg type and the RawData type? Could we have issues with other conversions? Do we have a way to prevent those? |
b8700c9
to
734b7bb
Compare
Makes sense! Done.
Type discrepancies between Iceberg and Flink types will error in Flink's TypeSerializer for a given field. For example, an int field will use IntSerializer which only accepts Integer. This will raise an NoSuchMethodError during serialization. As long as we use the same serializer also for deserialization, we should be fine. That is the case. |
@pvary I had to re-add the 1.18 and 1.19 changes, but they are in a separate commit. The reason is that I modified a test base class which affects also 1.18 and 1.19. We can't build otherwise. |
f4893cc
to
59dacfb
Compare
Tests are green. |
CC @stevenzwu |
data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java
Outdated
Show resolved
Hide resolved
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
Outdated
Show resolved
Hide resolved
Rebased. |
def98b6
to
46f8c23
Compare
Flaky Spark test, otherwise passing. |
bbc627a
to
8f90712
Compare
data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java
Outdated
Show resolved
Hide resolved
data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java
Outdated
Show resolved
Hide resolved
data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java
Outdated
Show resolved
Hide resolved
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java
Outdated
Show resolved
Hide resolved
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetWriter.java
Show resolved
Hide resolved
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSink.java
Outdated
Show resolved
Hide resolved
6b69d52
to
ead98ae
Compare
public static RowData.FieldGetter createFieldGetter(LogicalType fieldType, int fieldPos) { | ||
RowData.FieldGetter flinkFieldGetter = RowData.createFieldGetter(fieldType, fieldPos); | ||
return rowData -> { | ||
// Be sure to check for null values, even if the field is required. Flink | ||
// RowData.createFieldGetter(..) does not null-check optional / nullable types. Without this | ||
// explicit null check, the null flag of BinaryRowData will be ignored and random bytes will | ||
// be parsed as actual values. This will produce incorrect writes instead of failing with a | ||
// NullPointerException. | ||
if (!fieldType.isNullable() && rowData.isNullAt(fieldPos)) { | ||
return null; | ||
} | ||
return flinkFieldGetter.getFieldOrNull(rowData); | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the core change here to replace all the RowData#createFieldGetter
calls. The idea is to also perform a null check for non-null types to prevent interpreting nulled fields in BinaryRowData as actual values. Unfortunately, Flink itself only adds the null check for nullable types and defers additional null checks to the caller. I'll report this in upstream Flink as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this new approach.
I am also wondering if this is also a bug in Flink where RowData.createFieldGetter
couldn't handle the BinaryRowData
properly regarding null value.
Flink's BinaryRowData uses a magic byte to indicate null values in the backing byte arrays. Flink's internal RowData#createFieldGetter method which Iceberg uses, only adds a null check whenever a type is nullable. We map Iceberg's optional attribute to nullable, but Iceberg's required attribute to non-nullable. The latter creates an issue when the user, by mistake, nulls a field. The resulting RowData field will then be interpreted as actual data because the null field is not checked. This yields random values which should have been null and produced an error in the writer. The solution is to always check if a field is nullable before attempting to read data from it.
|
||
@Test | ||
public void testWriteNullValueForRequiredType() { | ||
Assumptions.assumeThat(supportsDefaultValues()).isTrue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is supportsDefaultValues()
defined?
Flink's BinaryRowData uses a magic byte to indicate null values in the backing byte arrays. Flink's internal RowData#createFieldGetter method which Iceberg uses, only adds a null check whenever a type is nullable. We map Iceberg's optional attribute to nullable, but Iceberg's required attribute to non-nullable. The latter creates an issue when the user, by mistake, nulls a field. The resulting RowData field will then be interpreted as actual data because the null field is not checked. This yields random values which should have been null and produced an error in the writer.
The solution is to always check if a field is nullable before attempting to read data from it.