Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Support For Flink 1.15 #4553

Merged
merged 1 commit into from
May 3, 2022
Merged

Conversation

kbendick
Copy link
Contributor

@kbendick kbendick commented Apr 13, 2022

The newest Flink release, Flink 1.15, is being voted on right now. Currently, the latest release candidate is Flink 1.15-rc4, which is what's being used in this PR.

As we are deprecating Flink 1.12 in #4551, I've begun the process of migrating the 1.14 code to Flink 1.15.

We are using some deprecated APIs, so this includes the minimum changes needed to make things work.

The (current) diff between 1.14 and 1.15 is located here: https://gist.github.com/kbendick/8265987b7386dc2738b09d7977abe41b

@kbendick kbendick force-pushed the add-flink-1.15-support branch 3 times, most recently from d1929a4 to 40f0992 Compare April 13, 2022 17:55
@kbendick
Copy link
Contributor Author

The following one test, org.apache.iceberg.flink.TestChangeLogTable#testChangeLogOnDataKey (both partitioned and unpartitioned) is failing. It does relate somewhat to updated parts of the code (marked by the comment // UPDATE).

In this test, the primary key fo the changelog table is the same as the partition schema (vs a subset).

org.apache.iceberg.flink.TestChangeLogTable > testChangeLogOnDataKey[PartitionedTable=true] FAILED
    java.lang.AssertionError: Should have the expected records for the checkpoint#2 expected:<[Record(1, aaa), Record(2, ccc), Record(2, aaa), Record(1, ccc)]> but was:<[Record(1, aaa), Record(2, bbb), Record(2, ccc), Record(2, aaa), Record(1, bbb, 1, false), Record(1, ccc)]>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:120)
        at org.apache.iceberg.flink.TestChangeLogTable.testSqlChangeLog(TestChangeLogTable.java:285)
        at org.apache.iceberg.flink.TestChangeLogTable.testChangeLogOnDataKey(TestChangeLogTable.java:167)

org.apache.iceberg.flink.TestChangeLogTable > testChangeLogOnDataKey[PartitionedTable=false] FAILED
    java.lang.AssertionError: Should have the expected records for the checkpoint#2 expected:<[Record(1, aaa), Record(2, ccc), Record(2, aaa), Record(1, ccc)]> but was:<[Record(1, aaa), Record(2, bbb), Record(2, ccc), Record(2, aaa), Record(1, bbb, 2, false), Record(1, ccc)]>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:120)
        at org.apache.iceberg.flink.TestChangeLogTable.testSqlChangeLog(TestChangeLogTable.java:285)
        at org.apache.iceberg.flink.TestChangeLogTable.testChangeLogOnDataKey(TestChangeLogTable.java:167)

cc @yittg as you recently spent time digging into the TestFlinkUpsert tests, which are related.

@kbendick
Copy link
Contributor Author

Oh also cc @openinx @stevenzwu @Reo-LEI @hililiwei if you're interested in taking a look at this test case that fails when updating the codebase to Flink 1.15.

I had to make a few additions that are marked by // UPDATED in the code.

@Override
public DataStream<RowData> produceDataStream(
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
return createDataStream(execEnv);
}
Copy link
Contributor Author

@kbendick kbendick Apr 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated function signature to include ProviderContext. The old signature, without ProviderContext has been deprecated and now can not be used any further.

We likely we want to backport this change to 1.14, where the ProviderContext-based signature is available, to be able to more easily apply future changes people might make in this file. Associated issue: #4634

Comment on lines +66 to +73
return (DataStreamSinkProvider) (providerContext, dataStream) -> FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
.flinkConf(readableConfig)
.append();
}
Copy link
Contributor Author

@kbendick kbendick Apr 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated function signature to include ProviderContext. The old signature, without ProviderContext has been deprecated and now can not be used any further.

We likely we want to backport this change to 1.14, where the ProviderContext-based signature is available, to be able to more easily apply future changes people might make in this file. Associated issue: #4634

@kbendick kbendick force-pushed the add-flink-1.15-support branch from fc85634 to f5d7601 Compare April 14, 2022 21:31
@yittg
Copy link
Contributor

yittg commented Apr 15, 2022

In this test, the primary key fo the changelog table is the same as the partition schema (vs a subset).

org.apache.iceberg.flink.TestChangeLogTable > testChangeLogOnDataKey[PartitionedTable=true] FAILED
    java.lang.AssertionError: Should have the expected records for the checkpoint#2 expected:<[Record(1, aaa), Record(2, ccc), Record(2, aaa), Record(1, ccc)]> but was:<[Record(1, aaa), Record(2, bbb), Record(2, ccc), Record(2, aaa), Record(1, bbb, 1, false), Record(1, ccc)]>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:120)
        at org.apache.iceberg.flink.TestChangeLogTable.testSqlChangeLog(TestChangeLogTable.java:285)
        at org.apache.iceberg.flink.TestChangeLogTable.testChangeLogOnDataKey(TestChangeLogTable.java:167)

@kbendick After roughly investigating, guess that it is because the condition whether apply upsert materialize changed.
After applying SinkUpsertMaterializer, the records iceberg writer receiving are not the original input records, it causes the failing case.
In 1.14.0, the condition returns false. We can disable it temporarily by setting table.exec.sink.upsert-materialize to None, then the cases can pass.

Maybe the test cases are invalid, we can decide whether to and how to fix it after more investigating.

FYI:
apache/flink#17699

@kbendick
Copy link
Contributor Author

In this test, the primary key fo the changelog table is the same as the partition schema (vs a subset).

org.apache.iceberg.flink.TestChangeLogTable > testChangeLogOnDataKey[PartitionedTable=true] FAILED
    java.lang.AssertionError: Should have the expected records for the checkpoint#2 expected:<[Record(1, aaa), Record(2, ccc), Record(2, aaa), Record(1, ccc)]> but was:<[Record(1, aaa), Record(2, bbb), Record(2, ccc), Record(2, aaa), Record(1, bbb, 1, false), Record(1, ccc)]>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:120)
        at org.apache.iceberg.flink.TestChangeLogTable.testSqlChangeLog(TestChangeLogTable.java:285)
        at org.apache.iceberg.flink.TestChangeLogTable.testChangeLogOnDataKey(TestChangeLogTable.java:167)

@kbendick After roughly investigating, guess that it is because the condition whether apply upsert materialize changed. After applying SinkUpsertMaterializer, the records iceberg writer receiving are not the original input records, it causes the failing case. In 1.14.0, the condition returns false. We can disable it temporarily by setting table.exec.sink.upsert-materialize to None, then the cases can pass.

Maybe the test cases are invalid, we can decide whether to and how to fix it after more investigating.

FYI: apache/flink#17699

Thank you @yittg! In the interests of keeping the code the same for the port, let’s add the config value and then open an issue to update the tests if need be. 👍

@kbendick kbendick force-pushed the add-flink-1.15-support branch 2 times, most recently from cc88b9a to 0dea16e Compare April 15, 2022 07:30
@kbendick kbendick changed the title [WIP] Add Support For Flink 1.15 Add Support For Flink 1.15 Apr 15, 2022
@kbendick kbendick marked this pull request as ready for review April 15, 2022 16:28
@openinx
Copy link
Member

openinx commented Apr 19, 2022

@kbendick I think we've just merged few iceberg PRs for the old flink 1.14, would you mind to update this PR to include those latest changes to flink 1.15 ?

@kbendick kbendick force-pushed the add-flink-1.15-support branch from 0dea16e to 9054f71 Compare April 21, 2022 20:27
@kbendick
Copy link
Contributor Author

@kbendick I think we've just merged few iceberg PRs for the old flink 1.14, would you mind to update this PR to include those latest changes to flink 1.15 ?

Yeah good catch @openinx. I will keep the two in sync. I have merged the changes from 1.14 into 1.15.

Here's the diff between the two

$ diff -qr v1.14/ v1.15/
Files v1.14/build.gradle and v1.15/build.gradle differ
Files v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java and v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java differ
Files v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java and v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java differ
Files v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java and v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java differ
Files v1.14/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java and v1.15/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java differ
Files v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java and v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java differ

I checked those files individually and the diff seems correct. I will post the full diff as well before we merge this.

@kbendick
Copy link
Contributor Author

kbendick commented Apr 21, 2022

This gist contains the difference between the 1.14 and 1.15 Flink directories as of right now: https://gist.github.com/kbendick/e6634fd3010322914d4b49927d2e7d30

Generated by running git diff --no-index ./flink/v1.14 ./flink/v1.15 >> flink_115_diff_20220421.txt

@openinx
Copy link
Member

openinx commented Apr 24, 2022

There is a broken unit test here:

org.apache.iceberg.flink.TestFlinkTableSource > testFilterPushDownBetween FAILED
    java.lang.AssertionError: Should produce the expected record expected:<[+I[1, iceberg, 10.0], +I[2, b, 20.0]]> but was:<[+I[2, b, 20.0], +I[1, iceberg, 10.0]]>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:120)
        at org.apache.iceberg.flink.TestFlinkTableSource.testFilterPushDownBetween(TestFlinkTableSource.java:496)

@openinx
Copy link
Member

openinx commented Apr 24, 2022

+    compileOnly "org.apache.flink:flink-connector-base:${flinkVersion}"
+    compileOnly "org.apache.flink:flink-connector-files:${flinkVersion}"
+    // This should be connectors-base, plus some other things that are needed
+    // compileOnly "org.apache.flink:flink-connectors:${flinkVersion}"

As we've already added the specific children artifactId, so I think the org.apache.flink:flink-connectors is not required now. we can just remove the comments.

@openinx
Copy link
Member

openinx commented Apr 24, 2022

   @Override
   public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
     return new DataStreamScanProvider() {
+
+      // UPDATED - Needs to be added as support for other signature is entirely removed.
+      // TODO -    This should probably be ported to 1.14 as well to make future changes
+      //           easier to backport.
       @Override
-      public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
+      public DataStream<RowData> produceDataStream(
+          ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
         return createDataStream(execEnv);
       }

Let's just open a separate github issue to track this thing, seems it does not make sense to add backport TODO for flink 1.14 in the flink 1.15 source code.

@openinx
Copy link
Member

openinx commented Apr 24, 2022

+    // https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-sink-upsert-materialize
+    // UPDATED - Needed to make testChangeLogOnDataKey work.
+    // TODO - Add tests with all configuraiton values as follow up - and possibly remove our own injected shuffle
+    //        as Flink can now do it.
+    getTableEnv().getConfig().set("table.exec.sink.upsert-materialize", "NONE");

This is interesting. Let's just replace those comments by the following ?

+  // Set the table.exec.sink.upsert-materialize=NONE, so that downstream operators will receive the records with the same orders as the source operator.

@openinx
Copy link
Member

openinx commented Apr 24, 2022

+      // UPDATED
       AssertHelpers.assertThrows("Table should already exists",
-          ValidationException.class,
+          org.apache.flink.table.api.TableException.class,
           "Could not execute CreateTable in path",
           () -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME));

The UPDATED comments seems can be removed now.

@openinx
Copy link
Member

openinx commented Apr 24, 2022

diff --git a/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java b/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
index b0041c3bc..d73363395 100644
--- a/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
+++ b/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java
@@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
 import org.apache.flink.table.connector.source.DataStreamScanProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
@@ -133,8 +134,10 @@ public class BoundedTableFactory implements DynamicTableSourceFactory {
     @Override
     public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
       return new DataStreamScanProvider() {
+        // UPDATED
         @Override
-        public DataStream<RowData> produceDataStream(StreamExecutionEnvironment env) {
+        public DataStream<RowData> produceDataStream(
+            ProviderContext providerContext, StreamExecutionEnvironment env) {
           boolean checkpointEnabled = env.getCheckpointConfig().isCheckpointingEnabled();
           SourceFunction<Row> source = new BoundedTestSource<>(elementsPerCheckpoint, checkpointEnabled);

Ditto.

@openinx
Copy link
Member

openinx commented Apr 24, 2022

The PR looks good to me overall except the above comments, thanks @kbendick for the nice work !

Comment on lines +486 to +491
protected ArrayData buildList(ReusableArrayData list) {
// Since ReusableArrayData is not accepted by Flink, use GenericArrayData temporarily to walk around it.
// Revert this to use ReusableArrayData once it is fixed in Flink.
// For your reference, https://issues.apache.org/jira/browse/FLINK-25238.
return new GenericArrayData(Arrays.copyOf(list.values, writePos));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue has been fixed in 1.15.0, so i think we can return the ReusableArrayData now.
Here we can fix it directly? Or do it separately in the following PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can file a separate issue to address this :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, create a issue #4624 to track it.

@kbendick
Copy link
Contributor Author

   @Override
   public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
     return new DataStreamScanProvider() {
+
+      // UPDATED - Needs to be added as support for other signature is entirely removed.
+      // TODO -    This should probably be ported to 1.14 as well to make future changes
+      //           easier to backport.
       @Override
-      public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
+      public DataStream<RowData> produceDataStream(
+          ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
         return createDataStream(execEnv);
       }

Let's just open a separate github issue to track this thing, seems it does not make sense to add backport TODO for flink 1.14 in the flink 1.15 source code.

I opened a new issue for this: #4634

@kbendick
Copy link
Contributor Author

+    // https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-exec-sink-upsert-materialize
+    // UPDATED - Needed to make testChangeLogOnDataKey work.
+    // TODO - Add tests with all configuraiton values as follow up - and possibly remove our own injected shuffle
+    //        as Flink can now do it.
+    getTableEnv().getConfig().set("table.exec.sink.upsert-materialize", "NONE");

This is interesting. Let's just replace those comments by the following ?

+  // Set the table.exec.sink.upsert-materialize=NONE, so that downstream operators will receive the records with the same orders as the source operator.

I updated it (and added a note about bypassing Flink's shuffle). We should open an issue to consider allowing for Flink to handle the shuffling where necessary. Or set the necessary configuration by default for users - though I'm generally not in favor of changing the defaults and would prefer to let Flink handle the shuffle if at all possible.

@kbendick kbendick force-pushed the add-flink-1.15-support branch from 6548287 to 38d61a8 Compare April 26, 2022 18:18
@kbendick
Copy link
Contributor Author

There is a broken unit test here:

org.apache.iceberg.flink.TestFlinkTableSource > testFilterPushDownBetween FAILED
    java.lang.AssertionError: Should produce the expected record expected:<[+I[1, iceberg, 10.0], +I[2, b, 20.0]]> but was:<[+I[2, b, 20.0], +I[1, iceberg, 10.0]]>
        at org.junit.Assert.fail(Assert.java:89)
        at org.junit.Assert.failNotEquals(Assert.java:835)
        at org.junit.Assert.assertEquals(Assert.java:120)
        at org.apache.iceberg.flink.TestFlinkTableSource.testFilterPushDownBetween(TestFlinkTableSource.java:496)

Hmmm this seems to be a newer breaking unit test. At least with The RC-2 there were not any breaking tests.

Now that i've made the updates suggested here, let me diff the two again, catch 1.15 up to date with 1.14, and then we can go from there.

@kbendick
Copy link
Contributor Author

kbendick commented Apr 26, 2022

I rebased off of master and made the suggested changes. Here's the file diff:

$ diff -qr flink/v1.14/ flink/v1.15/
Files flink/v1.14/build.gradle and flink/v1.15/build.gradle differ
Files flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java and flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java differ
Files flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java and flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java differ
Files flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java and flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java differ
Files flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java and flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java differ
Files flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java and flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java differ

I've uploaded a gist with the new diff here:

https://gist.github.com/kbendick/8265987b7386dc2738b09d7977abe41b

The above diff was generated using git diff --no-index ./flink/v1.14 ./flink/v1.15 >> flink_115_diff_20220426.txt

@kbendick
Copy link
Contributor Author

For the test that is failing, I took a look into it and it seems that the test is relying on the order of records returned from the query, which is flakey in general.

Given that we're not testing the order of records, just the actual records, I've opened a PR to update Flink 1.14's tests to use Set semantics for comparison (adding size tests where needed) and will port that to 1.15 once it's merged: #4635

@kbendick kbendick force-pushed the add-flink-1.15-support branch from 83fad55 to 1342b50 Compare April 28, 2022 23:25
@kbendick
Copy link
Contributor Author

1.15.0 was released today - There hasn't been a release announcement but it's on maven central and the staged release candidate haas been pulled.

I removed the release candidate maven repository from the build file.

The only thing now that needs to be updated is to merge this PR #4635 to handle the flaky tests (I've noticed that different tests in the same file will fail with different orderings on different runs) and then this can be merged in.

@kbendick kbendick force-pushed the add-flink-1.15-support branch from 4d60e04 to 5d7cc57 Compare April 29, 2022 16:31
@kbendick kbendick force-pushed the add-flink-1.15-support branch from 6883bd7 to c8c0450 Compare April 29, 2022 16:38
@kbendick
Copy link
Contributor Author

Now that #4635 has been merged, and 1.15.0 has been released, I rebased. The final diff between Flink 1.14 and Flink 1.15 can be found here: https://gist.github.com/kbendick/7564c455589b61a9ea1e7c0ff1003031

To generate the diff: git diff --no-index ./flink/v1.14 ./flink/v1.15

List of files that are different as output by diff -qr flink/v1.14/ flink/v1.15/:

$ diff -qr flink/v1.14/ flink/v1.15/
Files flink/v1.14/build.gradle and flink/v1.15/build.gradle differ
Files flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java and flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java differ
Files flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java and flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java differ
Files flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java and flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestChangeLogTable.java differ
Files flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java and flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java differ
Files flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java and flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java differ

@kbendick
Copy link
Contributor Author

kbendick commented May 2, 2022

Confirming I just verified that Flink 1.14 and the proposed Flink 1.15 still have the same diff as mentioned 3 days ago.

@rdblue rdblue merged commit 3584c79 into apache:master May 3, 2022
@rdblue
Copy link
Contributor

rdblue commented May 3, 2022

Thanks, @kbendick! This looks great.

Should we also remove 1.12 from master?

@kbendick kbendick deleted the add-flink-1.15-support branch May 3, 2022 16:10
@kbendick
Copy link
Contributor Author

kbendick commented May 3, 2022

Thanks, @kbendick! This looks great.

Should we also remove 1.12 from master?

1.12 has been removed from master already: e45c19e

@yittg
Copy link
Contributor

yittg commented May 4, 2022

Great job @kbendick

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants