From d1929a404b6e450e3f2aaffd6e70407ac00dfb02 Mon Sep 17 00:00:00 2001 From: Kyle Bendickson Date: Wed, 13 Apr 2022 10:06:47 -0700 Subject: [PATCH] Make changes to get 1.15 compiling and tests mostly working --- .github/workflows/flink-ci.yml | 2 +- .../org/apache/iceberg/flink/IcebergTableSink.java | 2 +- .../org/apache/iceberg/flink/IcebergTableSource.java | 11 +++++++++++ .../apache/iceberg/flink/TestIcebergConnector.java | 3 ++- .../iceberg/flink/source/BoundedTableFactory.java | 9 ++++++--- 5 files changed, 21 insertions(+), 6 deletions(-) diff --git a/.github/workflows/flink-ci.yml b/.github/workflows/flink-ci.yml index b563a4b3e317..711a54291e69 100644 --- a/.github/workflows/flink-ci.yml +++ b/.github/workflows/flink-ci.yml @@ -53,7 +53,7 @@ concurrency: jobs: - # Only test the latest flink version that with scala 2.11 for saving testing time. + # Only test the latest flink version with scala 2.11 for saving testing time. flink-scala-2-11-tests: runs-on: ubuntu-latest strategy: diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java index f2eecb9b6646..855e9e73cba1 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java @@ -63,7 +63,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { .map(UniqueConstraint::getColumns) .orElseGet(ImmutableList::of); - return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream) + return (DataStreamSinkProvider) (providerContext, dataStream) -> FlinkSink.forRowData(dataStream) .tableLoader(tableLoader) .tableSchema(tableSchema) .equalityFieldColumns(equalityColumns) diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java index dd8f6454ebc4..ec99bb691c93 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSource.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; @@ -158,6 +159,16 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { return new DataStreamScanProvider() { + + // UPDATED - Needs to be added as support for other signature is entirely removed. + // This should probably be ported to 1.14 as well to make future changes + // easier to backport. + @Override + public DataStream produceDataStream( + ProviderContext providerContext, StreamExecutionEnvironment execEnv) { + return createDataStream(execEnv); + } + @Override public DataStream produceDataStream(StreamExecutionEnvironment execEnv) { return createDataStream(execEnv); diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java index 272f7a716530..b46e489db7b1 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java @@ -272,8 +272,9 @@ public void testCatalogDatabaseConflictWithFlinkDatabase() { try { testCreateConnectorTable(); // Ensure that the table was created under the specific database. + // UPDATED AssertHelpers.assertThrows("Table should already exists", - ValidationException.class, + org.apache.flink.table.api.TableException.class, "Could not execute CreateTable in path", () -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME)); } finally { diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java index b0041c3bc04d..a373d21b084d 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTableFactory.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; @@ -133,9 +134,11 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { return new DataStreamScanProvider() { + // UPDATED @Override - public DataStream produceDataStream(StreamExecutionEnvironment env) { - boolean checkpointEnabled = env.getCheckpointConfig().isCheckpointingEnabled(); + public DataStream produceDataStream( + ProviderContext providerContext, StreamExecutionEnvironment execEnv) { + boolean checkpointEnabled = execEnv.getCheckpointConfig().isCheckpointingEnabled(); SourceFunction source = new BoundedTestSource<>(elementsPerCheckpoint, checkpointEnabled); RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType(); @@ -143,7 +146,7 @@ public DataStream produceDataStream(StreamExecutionEnvironment env) { DataFormatConverters.RowConverter rowConverter = new DataFormatConverters .RowConverter(tableSchema.getFieldDataTypes()); - return env.addSource(source, new RowTypeInfo(tableSchema.getFieldTypes())) + return execEnv.addSource(source, new RowTypeInfo(tableSchema.getFieldTypes())) .map(rowConverter::toInternal, FlinkCompatibilityUtil.toTypeInfo(rowType)); }