Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Support extract partition from SeaTunnelRow fields #3085

Merged
merged 12 commits into from
Oct 18, 2022
24 changes: 22 additions & 2 deletions docs/en/connector-v2/sink/Kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on
| bootstrap.servers | string | yes | - |
| kafka.* | kafka producer config | no | - |
| semantic | string | no | NON |
| partition_key | string | no | - |
| partition | int | no | - |
| assign_partitions | list | no | - |
| transaction_prefix | string | no | - |
Expand Down Expand Up @@ -50,6 +51,23 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b

NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.

### partition_key [string]

Configure which field is used as the key of the kafka message.

For example, if you want to use value of a field from upstream data as key, you can assign it to the field name.

Upstream data is the following:

| name | age | data |
| ---- | ---- | ------------- |
| Jack | 16 | data-example1 |
| Mary | 23 | data-example2 |

If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.

If the field name does not exist in the upstream data, the configured parameter will be used as the key.

### partition [int]

We can specify the partition, all messages will be sent to this partition.
Expand Down Expand Up @@ -93,7 +111,9 @@ sink {

### change log
#### next version

- Add kafka sink doc
- New feature : Kafka specified partition to send
- New feature : Determine the partition that kafka send based on the message content
- New feature : Determine the partition that kafka send message based on the message content
- New feature : Configure which field is used as the key of the kafka message

Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ public class Config {
* Determine the partition to send based on the content of the message.
*/
public static final String ASSIGN_PARTITIONS = "assign_partitions";

/**
* Determine the key of the kafka send partition
*/
public static final String PARTITION_KEY = "partition_key";
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,11 @@ public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row));
}
}

@Override
public ProducerRecord<byte[], byte[]> serializeRowByKey(String key, SeaTunnelRow row) {
//if the key is null, kafka will send message to a random partition
return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,13 @@ public interface SeaTunnelRowSerializer<K, V> {
* @return kafka record.
*/
ProducerRecord<K, V> serializeRow(SeaTunnelRow row);

/**
* Use Key serialize the {@link SeaTunnelRow} to a Kafka {@link ProducerRecord}.
*
* @param key String
* @param row seatunnel row
* @return kafka record.
*/
ProducerRecord<K, V> serializeRowByKey(String key, SeaTunnelRow row);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;

Expand All @@ -38,10 +39,12 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.function.Function;

/**
* KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Kafka.
Expand All @@ -50,6 +53,7 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo

private final SinkWriter.Context context;
private final Config pluginConfig;
private final Function<SeaTunnelRow, String> partitionExtractor;

private String transactionPrefix;
private long lastCheckpointId = 0;
Expand All @@ -63,7 +67,15 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
// check config
@Override
public void write(SeaTunnelRow element) {
ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(element);
ProducerRecord<byte[], byte[]> producerRecord = null;
//Determine the partition of the kafka send message based on the field name
if (pluginConfig.hasPath(PARTITION_KEY)){
String key = partitionExtractor.apply(element);
producerRecord = seaTunnelRowSerializer.serializeRowByKey(key, element);
}
else {
producerRecord = seaTunnelRowSerializer.serializeRow(element);
}
kafkaProducerSender.send(producerRecord);
}

Expand All @@ -74,6 +86,7 @@ public KafkaSinkWriter(
List<KafkaSinkState> kafkaStates) {
this.context = context;
this.pluginConfig = pluginConfig;
this.partitionExtractor = createPartitionExtractor(pluginConfig, seaTunnelRowType);
if (pluginConfig.hasPath(PARTITION)) {
this.partition = pluginConfig.getInt(PARTITION);
}
Expand Down Expand Up @@ -175,4 +188,20 @@ private void restoreState(List<KafkaSinkState> states) {
}
}

private Function<SeaTunnelRow, String> createPartitionExtractor(Config pluginConfig,
SeaTunnelRowType seaTunnelRowType) {
String partitionKey = pluginConfig.getString(PARTITION_KEY);
List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
if (!fieldNames.contains(partitionKey)) {
return row -> partitionKey;
}
int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey);
return row -> {
Object partitionFieldValue = row.getField(partitionFieldIndex);
if (partitionFieldValue != null) {
return partitionFieldValue.toString();
}
return null;
};
}
}