Skip to content

Commit

Permalink
[Feature][API] Add options check before create source and sink and tr…
Browse files Browse the repository at this point in the history
…ansform in FactoryUtil (#4424)
  • Loading branch information
EricJoy2048 authored Mar 28, 2023
1 parent 9ce220b commit 38f1903
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
Expand Down Expand Up @@ -114,6 +115,7 @@ SeaTunnelSource<T, SplitT, StateT> createAndPrepareSource(
ReadonlyConfig options,
ClassLoader classLoader) {
TableFactoryContext context = new TableFactoryContext(acceptedTables, options, classLoader);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
TableSource<T, SplitT, StateT> tableSource = factory.createSource(context);
validateAndApplyMetadata(acceptedTables, tableSource);
return tableSource.createSource();
Expand All @@ -136,6 +138,7 @@ SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createAndPrepareSi
TableFactoryContext context =
new TableFactoryContext(
Collections.singletonList(catalogTable), options, classLoader);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
return factory.createSink(context).createSink();
} catch (Throwable t) {
throw new FactoryException(
Expand Down Expand Up @@ -321,6 +324,7 @@ public static SeaTunnelTransform<?> createAndPrepareTransform(
TableFactoryContext context =
new TableFactoryContext(
Collections.singletonList(catalogTable), options, classLoader);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
return factory.createTransform(context).createTransform();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,11 @@ public class Config {
"The processing method of data format error. The default value is fail, and the optional value is (fail, skip). "
+ "When fail is selected, data format error will block and an exception will be thrown. "
+ "When skip is selected, data format error will skip this line data.");

public static final Option<KafkaSemantics> SEMANTICS =
Options.key("semantics")
.enumType(KafkaSemantics.class)
.defaultValue(KafkaSemantics.NON)
.withDescription(
"Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
Expand All @@ -29,60 +30,40 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;

import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;

/**
* Kafka Sink implementation by using SeaTunnel sink API. This class contains the method to create
* {@link KafkaSinkWriter} and {@link KafkaSinkCommitter}.
*/
@AutoService(SeaTunnelSink.class)
@NoArgsConstructor
public class KafkaSink
implements SeaTunnelSink<
SeaTunnelRow, KafkaSinkState, KafkaCommitInfo, KafkaAggregatedCommitInfo> {

private Config pluginConfig;
private ReadonlyConfig pluginConfig;
private SeaTunnelRowType seaTunnelRowType;

public KafkaSink() {}

public KafkaSink(Config pluginConfig, SeaTunnelRowType rowType) {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, BOOTSTRAP_SERVERS.key());
if (!result.isSuccess()) {
throw new KafkaConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, result.getMsg()));
}
public KafkaSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) {
this.pluginConfig = pluginConfig;
this.seaTunnelRowType = rowType;
}

@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, BOOTSTRAP_SERVERS.key());
if (!result.isSuccess()) {
throw new KafkaConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, result.getMsg()));
}
this.pluginConfig = pluginConfig;
ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
.validate(new KafkaSinkFactory().optionRule());
this.pluginConfig = ReadonlyConfig.fromConfig(pluginConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.kafka.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;

Expand All @@ -33,11 +32,11 @@
@Slf4j
public class KafkaSinkCommitter implements SinkCommitter<KafkaCommitInfo> {

private final Config pluginConfig;
private final ReadonlyConfig pluginConfig;

private KafkaInternalProducer<?, ?> kafkaProducer;

public KafkaSinkCommitter(Config pluginConfig) {
public KafkaSinkCommitter(ReadonlyConfig pluginConfig) {
this.pluginConfig = pluginConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.seatunnel.connectors.seatunnel.kafka.sink;

import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
Expand Down Expand Up @@ -47,7 +45,11 @@ public OptionRule optionRule() {
Arrays.asList(
MessageFormat.JSON, MessageFormat.CANAL_JSON, MessageFormat.TEXT),
Config.TOPIC)
.optional(Config.KAFKA_CONFIG, Config.ASSIGN_PARTITIONS, Config.TRANSACTION_PREFIX)
.optional(
Config.KAFKA_CONFIG,
Config.ASSIGN_PARTITIONS,
Config.TRANSACTION_PREFIX,
Config.SEMANTICS)
.exclusive(Config.PARTITION, Config.PARTITION_KEY_FIELDS)
.build();
}
Expand All @@ -56,7 +58,7 @@ public OptionRule optionRule() {
public TableSink createSink(TableFactoryContext context) {
return () ->
new KafkaSink(
ConfigFactory.parseMap(context.getOptions().toMap()),
context.getOptions(),
context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

package org.apache.seatunnel.connectors.seatunnel.kafka.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
Expand All @@ -33,6 +30,7 @@
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
Expand All @@ -45,12 +43,14 @@
import java.util.Random;

import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SEMANTICS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;

Expand All @@ -71,20 +71,22 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
public KafkaSinkWriter(
SinkWriter.Context context,
SeaTunnelRowType seaTunnelRowType,
Config pluginConfig,
ReadonlyConfig pluginConfig,
List<KafkaSinkState> kafkaStates) {
this.context = context;
this.seaTunnelRowType = seaTunnelRowType;
if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
MessageContentPartitioner.setAssignPartitions(
pluginConfig.getStringList(ASSIGN_PARTITIONS.key()));
if (pluginConfig.get(ASSIGN_PARTITIONS) != null
&& !CollectionUtils.isEmpty(pluginConfig.get(ASSIGN_PARTITIONS))) {
MessageContentPartitioner.setAssignPartitions(pluginConfig.get(ASSIGN_PARTITIONS));
}
if (pluginConfig.hasPath(TRANSACTION_PREFIX.key())) {
this.transactionPrefix = pluginConfig.getString(TRANSACTION_PREFIX.key());

if (pluginConfig.get(TRANSACTION_PREFIX) != null) {
this.transactionPrefix = pluginConfig.get(TRANSACTION_PREFIX);
} else {
Random random = new Random();
this.transactionPrefix = String.format("SeaTunnel%04d", random.nextInt(PREFIX_RANGE));
}

restoreState(kafkaStates);
this.seaTunnelRowSerializer = getSerializer(pluginConfig, seaTunnelRowType);
if (KafkaSemantics.EXACTLY_ONCE.equals(getKafkaSemantics(pluginConfig))) {
Expand Down Expand Up @@ -141,21 +143,20 @@ public void close() {
}
}

private Properties getKafkaProperties(Config pluginConfig) {
private Properties getKafkaProperties(ReadonlyConfig pluginConfig) {
Properties kafkaProperties = new Properties();
if (CheckConfigUtil.isValidParam(pluginConfig, KAFKA_CONFIG.key())) {
pluginConfig
.getObject(KAFKA_CONFIG.key())
.forEach((key, value) -> kafkaProperties.put(key, value.unwrapped()));
if (pluginConfig.get(KAFKA_CONFIG) != null) {
pluginConfig.get(KAFKA_CONFIG).forEach((key, value) -> kafkaProperties.put(key, value));
}
if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {

if (pluginConfig.get(ASSIGN_PARTITIONS) != null) {
kafkaProperties.put(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
"org.apache.seatunnel.connectors.seatunnel.kafka.sink.MessageContentPartitioner");
}

kafkaProperties.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
pluginConfig.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, pluginConfig.get(BOOTSTRAP_SERVERS));
kafkaProperties.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put(
Expand All @@ -164,24 +165,18 @@ private Properties getKafkaProperties(Config pluginConfig) {
}

private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(
Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);
MessageFormat messageFormat = readonlyConfig.get(FORMAT);
ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
MessageFormat messageFormat = pluginConfig.get(FORMAT);
String delimiter = DEFAULT_FIELD_DELIMITER;
if (pluginConfig.hasPath(FIELD_DELIMITER.key())) {
delimiter = pluginConfig.getString(FIELD_DELIMITER.key());
}
String topic = null;
if (pluginConfig.hasPath(TOPIC.key())) {
topic = pluginConfig.getString(TOPIC.key());

if (pluginConfig.get(FIELD_DELIMITER) != null) {
delimiter = pluginConfig.get(FIELD_DELIMITER);
}
if (pluginConfig.hasPath(PARTITION.key())) {

String topic = pluginConfig.get(TOPIC);
if (pluginConfig.get(PARTITION) != null) {
return DefaultSeaTunnelRowSerializer.create(
topic,
pluginConfig.getInt(PARTITION.key()),
seaTunnelRowType,
messageFormat,
delimiter);
topic, pluginConfig.get(PARTITION), seaTunnelRowType, messageFormat, delimiter);
} else {
return DefaultSeaTunnelRowSerializer.create(
topic,
Expand All @@ -192,9 +187,9 @@ private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(
}
}

private KafkaSemantics getKafkaSemantics(Config pluginConfig) {
if (pluginConfig.hasPath("semantics")) {
return pluginConfig.getEnum(KafkaSemantics.class, "semantics");
private KafkaSemantics getKafkaSemantics(ReadonlyConfig pluginConfig) {
if (pluginConfig.get(SEMANTICS) != null) {
return pluginConfig.get(SEMANTICS);
}
return KafkaSemantics.NON;
}
Expand All @@ -211,10 +206,10 @@ private void restoreState(List<KafkaSinkState> states) {
}

private List<String> getPartitionKeyFields(
Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
if (pluginConfig.hasPath(PARTITION_KEY_FIELDS.key())) {
List<String> partitionKeyFields =
pluginConfig.getStringList(PARTITION_KEY_FIELDS.key());
ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {

if (pluginConfig.get(PARTITION_KEY_FIELDS) != null) {
List<String> partitionKeyFields = pluginConfig.get(PARTITION_KEY_FIELDS);
List<String> rowTypeFieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
for (String partitionKeyField : partitionKeyFields) {
if (!rowTypeFieldNames.contains(partitionKeyField)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
Expand Down Expand Up @@ -141,10 +142,12 @@ private void initializeCassandraTable() {
session.execute(
SimpleStatement.builder(config.getString(SOURCE_TABLE))
.setKeyspace(KEYSPACE)
.setTimeout(Duration.ofSeconds(10))
.build());
session.execute(
SimpleStatement.builder(config.getString(SINK_TABLE))
.setKeyspace(KEYSPACE)
.setTimeout(Duration.ofSeconds(10))
.build());
} catch (Exception e) {
throw new RuntimeException("Initializing Cassandra table failed!", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ sink {
source_table_name = "fake1"
bootstrap.servers = "kafkaCluster:9092"
topic = "${c_string}"
format = json
partition_key_fields = ["c_map","c_string"]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ sink {
Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_topic"
format = json
partition_key_fields = ["c_map","c_string"]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,6 @@ sink {
bootstrap.servers = "kafkaCluster:9092"
topic = "test-canal-sink"
format = canal_json
partition = 0
}
}

0 comments on commit 38f1903

Please sign in to comment.