diff --git a/docs/en/connector-v2/sink/Kafka.md b/docs/en/connector-v2/sink/Kafka.md index 3dbc6af8992..b5f4ef300e5 100644 --- a/docs/en/connector-v2/sink/Kafka.md +++ b/docs/en/connector-v2/sink/Kafka.md @@ -21,6 +21,7 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on | bootstrap.servers | string | yes | - | | kafka.* | kafka producer config | no | - | | semantic | string | no | NON | +| partition_key | string | no | - | | partition | int | no | - | | assign_partitions | list | no | - | | transaction_prefix | string | no | - | @@ -50,6 +51,23 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated. +### partition_key [string] + +Configure which field is used as the key of the kafka message. + +For example, if you want to use value of a field from upstream data as key, you can assign it to the field name. + +Upstream data is the following: + +| name | age | data | +| ---- | ---- | ------------- | +| Jack | 16 | data-example1 | +| Mary | 23 | data-example2 | + +If name is set as the key, then the hash value of the name column will determine which partition the message is sent to. + +If the field name does not exist in the upstream data, the configured parameter will be used as the key. + ### partition [int] We can specify the partition, all messages will be sent to this partition. @@ -93,7 +111,9 @@ sink { ### change log #### next version - + - Add kafka sink doc - New feature : Kafka specified partition to send - - New feature : Determine the partition that kafka send based on the message content + - New feature : Determine the partition that kafka send message based on the message content + - New feature : Configure which field is used as the key of the kafka message + diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java index 0502afda330..d577b2badfa 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java @@ -59,4 +59,9 @@ public class Config { * Determine the partition to send based on the content of the message. */ public static final String ASSIGN_PARTITIONS = "assign_partitions"; + + /** + * Determine the key of the kafka send partition + */ + public static final String PARTITION_KEY = "partition_key"; } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java index 29599bbdbe7..24a2b242f69 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java @@ -48,4 +48,11 @@ public ProducerRecord serializeRow(SeaTunnelRow row) { return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row)); } } + + @Override + public ProducerRecord serializeRowByKey(String key, SeaTunnelRow row) { + //if the key is null, kafka will send message to a random partition + return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row)); + } + } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java index 9f12591ea2d..d96753155d5 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/SeaTunnelRowSerializer.java @@ -30,4 +30,13 @@ public interface SeaTunnelRowSerializer { * @return kafka record. */ ProducerRecord serializeRow(SeaTunnelRow row); + + /** + * Use Key serialize the {@link SeaTunnelRow} to a Kafka {@link ProducerRecord}. + * + * @param key String + * @param row seatunnel row + * @return kafka record. + */ + ProducerRecord serializeRowByKey(String key, SeaTunnelRow row); } diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java index 1f61482e785..9712ba33162 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java @@ -19,6 +19,7 @@ import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX; @@ -38,10 +39,12 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.Properties; import java.util.Random; +import java.util.function.Function; /** * KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Kafka. @@ -50,6 +53,7 @@ public class KafkaSinkWriter implements SinkWriter partitionExtractor; private String transactionPrefix; private long lastCheckpointId = 0; @@ -63,7 +67,15 @@ public class KafkaSinkWriter implements SinkWriter producerRecord = seaTunnelRowSerializer.serializeRow(element); + ProducerRecord producerRecord = null; + //Determine the partition of the kafka send message based on the field name + if (pluginConfig.hasPath(PARTITION_KEY)){ + String key = partitionExtractor.apply(element); + producerRecord = seaTunnelRowSerializer.serializeRowByKey(key, element); + } + else { + producerRecord = seaTunnelRowSerializer.serializeRow(element); + } kafkaProducerSender.send(producerRecord); } @@ -74,6 +86,7 @@ public KafkaSinkWriter( List kafkaStates) { this.context = context; this.pluginConfig = pluginConfig; + this.partitionExtractor = createPartitionExtractor(pluginConfig, seaTunnelRowType); if (pluginConfig.hasPath(PARTITION)) { this.partition = pluginConfig.getInt(PARTITION); } @@ -175,4 +188,20 @@ private void restoreState(List states) { } } + private Function createPartitionExtractor(Config pluginConfig, + SeaTunnelRowType seaTunnelRowType) { + String partitionKey = pluginConfig.getString(PARTITION_KEY); + List fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames()); + if (!fieldNames.contains(partitionKey)) { + return row -> partitionKey; + } + int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey); + return row -> { + Object partitionFieldValue = row.getField(partitionFieldIndex); + if (partitionFieldValue != null) { + return partitionFieldValue.toString(); + } + return null; + }; + } }