From 2489f6446bae5b971a26e4cd3094f7d7af9d0208 Mon Sep 17 00:00:00 2001 From: hailin0 Date: Mon, 12 Aug 2024 13:36:15 +0800 Subject: [PATCH] [Improve][Connector] Add multi-table sink option check (#7360) * [Improve][Connector] Add multi-table sink option check * fix --- .../seatunnel/api/sink/SinkCommonOptions.java | 2 +- .../assertion/sink/AssertSinkFactory.java | 6 +++++- .../console/sink/ConsoleSinkFactory.java | 6 +++++- .../connectors/druid/sink/DruidSinkFactory.java | 6 +++++- .../sink/ElasticsearchSinkFactory.java | 4 +++- .../file/local/sink/LocalFileSinkFactory.java | 2 ++ .../file/oss/sink/OssFileSinkFactory.java | 2 ++ .../file/s3/sink/S3FileSinkFactory.java | 2 ++ .../seatunnel/http/sink/HttpSinkFactory.java | 2 ++ .../seatunnel/hudi/sink/HudiSinkFactory.java | 4 +++- .../iceberg/sink/IcebergSinkFactory.java | 4 +++- .../influxdb/sink/InfluxDBSinkFactory.java | 4 +++- .../seatunnel/kudu/sink/KuduSinkFactory.java | 2 ++ .../seatunnel/paimon/sink/PaimonSinkFactory.java | 4 +++- .../seatunnel/redis/sink/RedisSinkFactory.java | 4 +++- .../ConnectorSpecificationCheckTest.java | 16 ++++++++++++++-- 16 files changed, 58 insertions(+), 12 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java index 598193d695f..9c6538ac87c 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java @@ -28,5 +28,5 @@ public class SinkCommonOptions { Options.key("multi_table_sink_replica") .intType() .defaultValue(1) - .withDescription("The replica number of multi table sink"); + .withDescription("The replica number of multi table sink writer"); } diff --git a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java index 376863dc184..ae174d9857f 100644 --- a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java +++ b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/sink/AssertSinkFactory.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.assertion.sink; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; @@ -37,7 +38,10 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { - return OptionRule.builder().required(RULES).build(); + return OptionRule.builder() + .required(RULES) + .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) + .build(); } @Override diff --git a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java index 169a281fc19..fa5c7deae9e 100644 --- a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java +++ b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; @@ -52,7 +53,10 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { - return OptionRule.builder().optional(LOG_PRINT_DATA, LOG_PRINT_DELAY).build(); + return OptionRule.builder() + .optional( + LOG_PRINT_DATA, LOG_PRINT_DELAY, SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) + .build(); } @Override diff --git a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java index 0c6824b521e..3199d3d66f4 100644 --- a/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java +++ b/seatunnel-connectors-v2/connector-druid/src/main/java/org/apache/seatunnel/connectors/druid/sink/DruidSinkFactory.java @@ -20,6 +20,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; @@ -40,7 +41,10 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { - return OptionRule.builder().required(COORDINATOR_URL, DATASOURCE).build(); + return OptionRule.builder() + .required(COORDINATOR_URL, DATASOURCE) + .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) + .build(); } @Override diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java index 56ec1d0ab7b..b290a63c444 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSink; @@ -69,7 +70,8 @@ public OptionRule optionRule() { TLS_KEY_STORE_PATH, TLS_KEY_STORE_PASSWORD, TLS_TRUST_STORE_PATH, - TLS_TRUST_STORE_PASSWORD) + TLS_TRUST_STORE_PASSWORD, + SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java index e8ee8e436d1..1a9bcc1734f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; @@ -48,6 +49,7 @@ public OptionRule optionRule() { .optional(BaseSinkConfig.FILE_FORMAT_TYPE) .optional(BaseSinkConfig.SCHEMA_SAVE_MODE) .optional(BaseSinkConfig.DATA_SAVE_MODE) + .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .conditional( BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java index 5d6cb649f20..6fd3088ddc9 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; @@ -102,6 +103,7 @@ public OptionRule optionRule() { .optional(BaseSinkConfig.DATE_FORMAT) .optional(BaseSinkConfig.DATETIME_FORMAT) .optional(BaseSinkConfig.TIME_FORMAT) + .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .build(); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java index 4ac9f45915e..5c231443e99 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; @@ -103,6 +104,7 @@ public OptionRule optionRule() { .optional(BaseSinkConfig.DATETIME_FORMAT) .optional(BaseSinkConfig.TIME_FORMAT) .optional(BaseSinkConfig.TMP_PATH) + .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .build(); } diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java index 539563ecb62..313d26dd3f7 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkFactory.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.http.sink; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; @@ -49,6 +50,7 @@ public OptionRule optionRule() { .optional(HttpConfig.RETRY) .optional(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS) .optional(HttpConfig.RETRY_BACKOFF_MAX_MS) + .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .build(); } } diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java index d38785de02d..7697842f826 100644 --- a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java @@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.hudi.sink; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; @@ -61,7 +62,8 @@ public OptionRule optionRule() { INSERT_SHUFFLE_PARALLELISM, UPSERT_SHUFFLE_PARALLELISM, MIN_COMMITS_TO_KEEP, - MAX_COMMITS_TO_KEEP) + MAX_COMMITS_TO_KEEP, + SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .build(); } diff --git a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java index b32430b3197..212bb6371d3 100644 --- a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java +++ b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSink; @@ -57,7 +58,8 @@ public OptionRule optionRule() { SinkConfig.TABLE_DEFAULT_PARTITION_KEYS, SinkConfig.TABLE_UPSERT_MODE_ENABLED_PROP, SinkConfig.TABLE_SCHEMA_EVOLUTION_ENABLED_PROP, - SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH) + SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH, + SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .build(); } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java index 81a294e95bc..a8c13cdbff6 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; @@ -65,7 +66,8 @@ public OptionRule optionRule() { KEY_TIME, BATCH_SIZE, MAX_RETRIES, - RETRY_BACKOFF_MULTIPLIER_MS) + RETRY_BACKOFF_MULTIPLIER_MS, + SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .build(); } diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java index 3917d1cd62a..beff65521d8 100644 --- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java +++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkFactory.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; @@ -56,6 +57,7 @@ public OptionRule optionRule() { .optional(KuduSinkConfig.IGNORE_DUPLICATE) .optional(KuduSinkConfig.ENABLE_KERBEROS) .optional(KuduSinkConfig.KERBEROS_KRB5_CONF) + .optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .conditional( KuduSinkConfig.FLUSH_MODE, Arrays.asList(AUTO_FLUSH_BACKGROUND.name(), MANUAL_FLUSH.name()), diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java index 83976d84f94..bbc74df3ce9 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.connector.TableSink; @@ -54,7 +55,8 @@ public OptionRule optionRule() { PaimonSinkConfig.DATA_SAVE_MODE, PaimonSinkConfig.PRIMARY_KEYS, PaimonSinkConfig.PARTITION_KEYS, - PaimonSinkConfig.WRITE_PROPS) + PaimonSinkConfig.WRITE_PROPS, + SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .conditional( PaimonConfig.CATALOG_TYPE, PaimonCatalogEnum.HIVE, PaimonConfig.CATALOG_URI) .build(); diff --git a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java index c4768c0618b..49c2644d707 100644 --- a/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java +++ b/seatunnel-connectors-v2/connector-redis/src/main/java/org/apache/seatunnel/connectors/seatunnel/redis/sink/RedisSinkFactory.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.redis.sink; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; @@ -51,7 +52,8 @@ public OptionRule optionRule() { RedisConfig.USER, RedisConfig.KEY_PATTERN, RedisConfig.FORMAT, - RedisConfig.EXPIRE) + RedisConfig.EXPIRE, + SinkCommonOptions.MULTI_TABLE_SINK_REPLICA) .conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, RedisConfig.NODES) .build(); } diff --git a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java index 62a037a6f65..3628a5dce6d 100644 --- a/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java +++ b/seatunnel-dist/src/test/java/org/apache/seatunnel/api/connector/ConnectorSpecificationCheckTest.java @@ -17,7 +17,9 @@ package org.apache.seatunnel.api.connector; +import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkCommonOptions; import org.apache.seatunnel.api.sink.SinkWriter; import org.apache.seatunnel.api.sink.SupportMultiTableSink; import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; @@ -152,16 +154,26 @@ public void testAllConnectorImplementFactoryWithUpToDateMethod() throws ClassNot log.info( "Check sink connector {} successfully", factory.getClass().getSimpleName()); - checkSupportMultiTableSink(sinkClass); + checkSupportMultiTableSink(factory, sinkClass); } } } - private void checkSupportMultiTableSink(Class sinkClass) { + private void checkSupportMultiTableSink( + TableSinkFactory sinkFactory, Class sinkClass) { if (!SupportMultiTableSink.class.isAssignableFrom(sinkClass)) { return; } + OptionRule sinkOptionRule = sinkFactory.optionRule(); + Assertions.assertTrue( + sinkOptionRule + .getOptionalOptions() + .contains(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA), + "Please add `SinkCommonOptions.MULTI_TABLE_SINK_REPLICA` optional into the `optionRule` method optional of `" + + sinkFactory.getClass().getSimpleName() + + "`"); + // Validate the `createWriter` method return type Optional createWriter = ReflectionUtils.getDeclaredMethod(