Skip to content

Commit

Permalink
Make changes to get 1.15 compiling and tests mostly working
Browse files Browse the repository at this point in the history
  • Loading branch information
kbendick committed Apr 13, 2022
1 parent eb442eb commit d1929a4
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flink-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ concurrency:

jobs:

# Only test the latest flink version that with scala 2.11 for saving testing time.
# Only test the latest flink version with scala 2.11 for saving testing time.
flink-scala-2-11-tests:
runs-on: ubuntu-latest
strategy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.map(UniqueConstraint::getColumns)
.orElseGet(ImmutableList::of);

return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream)
return (DataStreamSinkProvider) (providerContext, dataStream) -> FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
Expand Down Expand Up @@ -158,6 +159,16 @@ public ChangelogMode getChangelogMode() {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
return new DataStreamScanProvider() {

// UPDATED - Needs to be added as support for other signature is entirely removed.
// This should probably be ported to 1.14 as well to make future changes
// easier to backport.
@Override
public DataStream<RowData> produceDataStream(
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
return createDataStream(execEnv);
}

@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
return createDataStream(execEnv);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,9 @@ public void testCatalogDatabaseConflictWithFlinkDatabase() {
try {
testCreateConnectorTable();
// Ensure that the table was created under the specific database.
// UPDATED
AssertHelpers.assertThrows("Table should already exists",
ValidationException.class,
org.apache.flink.table.api.TableException.class,
"Could not execute CreateTable in path",
() -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME));
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
Expand Down Expand Up @@ -133,17 +134,19 @@ public ChangelogMode getChangelogMode() {
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
return new DataStreamScanProvider() {
// UPDATED
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment env) {
boolean checkpointEnabled = env.getCheckpointConfig().isCheckpointingEnabled();
public DataStream<RowData> produceDataStream(
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
boolean checkpointEnabled = execEnv.getCheckpointConfig().isCheckpointingEnabled();
SourceFunction<Row> source = new BoundedTestSource<>(elementsPerCheckpoint, checkpointEnabled);

RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
// Converter to convert the Row to RowData.
DataFormatConverters.RowConverter rowConverter = new DataFormatConverters
.RowConverter(tableSchema.getFieldDataTypes());

return env.addSource(source, new RowTypeInfo(tableSchema.getFieldTypes()))
return execEnv.addSource(source, new RowTypeInfo(tableSchema.getFieldTypes()))
.map(rowConverter::toInternal, FlinkCompatibilityUtil.toTypeInfo(rowType));
}

Expand Down

0 comments on commit d1929a4

Please sign in to comment.