Skip to content

Commit

Permalink
[Improve][Connector] Add multi-table sink option check (apache#7360)
Browse files Browse the repository at this point in the history
* [Improve][Connector] Add multi-table sink option check

* fix
  • Loading branch information
hailin0 authored Aug 12, 2024
1 parent fa34ac9 commit 2489f64
Show file tree
Hide file tree
Showing 16 changed files with 58 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ public class SinkCommonOptions {
Options.key("multi_table_sink_replica")
.intType()
.defaultValue(1)
.withDescription("The replica number of multi table sink");
.withDescription("The replica number of multi table sink writer");
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.assertion.sink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
Expand All @@ -37,7 +38,10 @@ public String factoryIdentifier() {

@Override
public OptionRule optionRule() {
return OptionRule.builder().required(RULES).build();
return OptionRule.builder()
.required(RULES)
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
Expand Down Expand Up @@ -52,7 +53,10 @@ public String factoryIdentifier() {

@Override
public OptionRule optionRule() {
return OptionRule.builder().optional(LOG_PRINT_DATA, LOG_PRINT_DELAY).build();
return OptionRule.builder()
.optional(
LOG_PRINT_DATA, LOG_PRINT_DELAY, SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
Expand All @@ -40,7 +41,10 @@ public String factoryIdentifier() {

@Override
public OptionRule optionRule() {
return OptionRule.builder().required(COORDINATOR_URL, DATASOURCE).build();
return OptionRule.builder()
.required(COORDINATOR_URL, DATASOURCE)
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
Expand Down Expand Up @@ -69,7 +70,8 @@ public OptionRule optionRule() {
TLS_KEY_STORE_PATH,
TLS_KEY_STORE_PASSWORD,
TLS_TRUST_STORE_PATH,
TLS_TRUST_STORE_PASSWORD)
TLS_TRUST_STORE_PASSWORD,
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
Expand Down Expand Up @@ -48,6 +49,7 @@ public OptionRule optionRule() {
.optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.optional(BaseSinkConfig.SCHEMA_SAVE_MODE)
.optional(BaseSinkConfig.DATA_SAVE_MODE)
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(
BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
Expand Down Expand Up @@ -102,6 +103,7 @@ public OptionRule optionRule() {
.optional(BaseSinkConfig.DATE_FORMAT)
.optional(BaseSinkConfig.DATETIME_FORMAT)
.optional(BaseSinkConfig.TIME_FORMAT)
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
Expand Down Expand Up @@ -103,6 +104,7 @@ public OptionRule optionRule() {
.optional(BaseSinkConfig.DATETIME_FORMAT)
.optional(BaseSinkConfig.TIME_FORMAT)
.optional(BaseSinkConfig.TMP_PATH)
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.http.sink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
Expand Down Expand Up @@ -49,6 +50,7 @@ public OptionRule optionRule() {
.optional(HttpConfig.RETRY)
.optional(HttpConfig.RETRY_BACKOFF_MULTIPLIER_MS)
.optional(HttpConfig.RETRY_BACKOFF_MAX_MS)
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.seatunnel.connectors.seatunnel.hudi.sink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
Expand Down Expand Up @@ -61,7 +62,8 @@ public OptionRule optionRule() {
INSERT_SHUFFLE_PARALLELISM,
UPSERT_SHUFFLE_PARALLELISM,
MIN_COMMITS_TO_KEEP,
MAX_COMMITS_TO_KEEP)
MAX_COMMITS_TO_KEEP,
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
Expand Down Expand Up @@ -57,7 +58,8 @@ public OptionRule optionRule() {
SinkConfig.TABLE_DEFAULT_PARTITION_KEYS,
SinkConfig.TABLE_UPSERT_MODE_ENABLED_PROP,
SinkConfig.TABLE_SCHEMA_EVOLUTION_ENABLED_PROP,
SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH)
SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH,
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
Expand Down Expand Up @@ -65,7 +66,8 @@ public OptionRule optionRule() {
KEY_TIME,
BATCH_SIZE,
MAX_RETRIES,
RETRY_BACKOFF_MULTIPLIER_MS)
RETRY_BACKOFF_MULTIPLIER_MS,
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
Expand Down Expand Up @@ -56,6 +57,7 @@ public OptionRule optionRule() {
.optional(KuduSinkConfig.IGNORE_DUPLICATE)
.optional(KuduSinkConfig.ENABLE_KERBEROS)
.optional(KuduSinkConfig.KERBEROS_KRB5_CONF)
.optional(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(
KuduSinkConfig.FLUSH_MODE,
Arrays.asList(AUTO_FLUSH_BACKGROUND.name(), MANUAL_FLUSH.name()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
Expand Down Expand Up @@ -54,7 +55,8 @@ public OptionRule optionRule() {
PaimonSinkConfig.DATA_SAVE_MODE,
PaimonSinkConfig.PRIMARY_KEYS,
PaimonSinkConfig.PARTITION_KEYS,
PaimonSinkConfig.WRITE_PROPS)
PaimonSinkConfig.WRITE_PROPS,
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(
PaimonConfig.CATALOG_TYPE, PaimonCatalogEnum.HIVE, PaimonConfig.CATALOG_URI)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.redis.sink;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
Expand Down Expand Up @@ -51,7 +52,8 @@ public OptionRule optionRule() {
RedisConfig.USER,
RedisConfig.KEY_PATTERN,
RedisConfig.FORMAT,
RedisConfig.EXPIRE)
RedisConfig.EXPIRE,
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
.conditional(RedisConfig.MODE, RedisConfig.RedisMode.CLUSTER, RedisConfig.NODES)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.seatunnel.api.connector;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
Expand Down Expand Up @@ -152,16 +154,26 @@ public void testAllConnectorImplementFactoryWithUpToDateMethod() throws ClassNot
log.info(
"Check sink connector {} successfully", factory.getClass().getSimpleName());

checkSupportMultiTableSink(sinkClass);
checkSupportMultiTableSink(factory, sinkClass);
}
}
}

private void checkSupportMultiTableSink(Class<? extends SeaTunnelSink> sinkClass) {
private void checkSupportMultiTableSink(
TableSinkFactory sinkFactory, Class<? extends SeaTunnelSink> sinkClass) {
if (!SupportMultiTableSink.class.isAssignableFrom(sinkClass)) {
return;
}

OptionRule sinkOptionRule = sinkFactory.optionRule();
Assertions.assertTrue(
sinkOptionRule
.getOptionalOptions()
.contains(SinkCommonOptions.MULTI_TABLE_SINK_REPLICA),
"Please add `SinkCommonOptions.MULTI_TABLE_SINK_REPLICA` optional into the `optionRule` method optional of `"
+ sinkFactory.getClass().getSimpleName()
+ "`");

// Validate the `createWriter` method return type
Optional<Method> createWriter =
ReflectionUtils.getDeclaredMethod(
Expand Down

0 comments on commit 2489f64

Please sign in to comment.