Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-V2][Kafka] Support to specify multiple partition keys #3230

Merged
merged 1 commit into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions docs/en/connector-v2/sink/Kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on

## Options

| name | type | required | default value |
| ------------------ | ---------------------- | -------- | ------------- |
| topic | string | yes | - |
| bootstrap.servers | string | yes | - |
| kafka.* | kafka producer config | no | - |
| semantic | string | no | NON |
| partition_key | string | no | - |
| partition | int | no | - |
| assign_partitions | list | no | - |
| transaction_prefix | string | no | - |
| common-options | config | no | - |
| name | type | required | default value |
|----------------------|-----------------------| -------- | ------------- |
| topic | string | yes | - |
| bootstrap.servers | string | yes | - |
| kafka.* | kafka producer config | no | - |
| semantic | string | no | NON |
| partition_key_fields | array | no | - |
| partition | int | no | - |
| assign_partitions | array | no | - |
| transaction_prefix | string | no | - |
| common-options | config | no | - |

### topic [string]

Expand All @@ -51,11 +51,11 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b

NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.

### partition_key [string]
### partition_key_fields [array]

Configure which field is used as the key of the kafka message.
Configure which fields are used as the key of the kafka message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, if you want to use value of a field from upstream data as key, you can assign it to the field name.
For example, if you want to use value of fields from upstream data as key, you can assign field names to this property.

Upstream data is the following:

Expand All @@ -66,13 +66,13 @@ Upstream data is the following:

If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.

If the field name does not exist in the upstream data, the configured parameter will be used as the key.
If not set partition key fields, the null message key will be sent to.

### partition [int]

We can specify the partition, all messages will be sent to this partition.

### assign_partitions [list]
### assign_partitions [array]

We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information.

Expand Down Expand Up @@ -113,3 +113,6 @@ sink {
### 2.3.0-beta 2022-10-20

- Add Kafka Sink Connector
### next version

- [Feature] Support to specify multiple partition keys [3230](https://github.com/apache/incubator-seatunnel/pull/3230)
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ public class Config {
.withDescription("We can decide which partition to send based on the content of the message. " +
"The function of this parameter is to distribute information.");

public static final Option<String> PARTITION_KEY = Options.key("partition_key")
.stringType()
public static final Option<List<String>> PARTITION_KEY_FIELDS = Options.key("partition_key_fields")
.listType()
.noDefaultValue()
.withDescription("Configure which field is used as the key of the kafka message.");
.withDescription("Configure which fields are used as the key of the kafka message.");

public static final Option<StartMode> START_MODE = Options.key("start_mode")
.objectType(StartMode.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,78 @@

package org.apache.seatunnel.connectors.seatunnel.kafka.serialize;

import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.format.json.JsonSerializationSchema;

import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.List;
import java.util.function.Function;

public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byte[], byte[]> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion

public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byte[], byte[]> {

    private Integer partition;
    private final String topic;
    private final SerializationSchema keySerialization;
    private final SerializationSchema valueSerialization;

    public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType seaTunnelRowType) {
        this(topic, element -> null, createSerializationSchema(seaTunnelRowType));
    }

    public DefaultSeaTunnelRowSerializer(String topic, Integer partition, SeaTunnelRowType seaTunnelRowType) {
        this(topic, seaTunnelRowType);
        this.partition = partition;
    }

    public DefaultSeaTunnelRowSerializer(String topic,
                                         List<String> keyFieldNames,
                                         SeaTunnelRowType seaTunnelRowType) {
        this(topic, createKeySerializationSchema(keyFieldNames, seaTunnelRowType),
            createSerializationSchema(seaTunnelRowType));
    }

    public DefaultSeaTunnelRowSerializer(String topic,
                                         SerializationSchema keySerialization,
                                         SerializationSchema valueSerialization) {
        this.topic = topic;
        this.keySerialization = keySerialization;
        this.valueSerialization = valueSerialization;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
        return new ProducerRecord<>(topic, partition,
            keySerialization.serialize(row), valueSerialization.serialize(row));
    }

    private static SerializationSchema createSerializationSchema(SeaTunnelRowType rowType) {
        return new JsonSerializationSchema(rowType);
    }

    private static SerializationSchema createKeySerializationSchema(List<String> keyFieldNames,
                                                                    SeaTunnelRowType seaTunnelRowType) {
        int[] keyFieldIndexArr = new int[keyFieldNames.size()];
        SeaTunnelDataType[] keyFieldDataTypeArr = new SeaTunnelDataType[keyFieldNames.size()];
        for (int i = 0; i < keyFieldNames.size(); i++) {
            String keyFieldName = keyFieldNames.get(i);
            int rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName);
            keyFieldIndexArr[i] = rowFieldIndex;
            keyFieldDataTypeArr[i] = seaTunnelRowType.getFieldType(rowFieldIndex);
        }
        SeaTunnelRowType keyType = new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
        SerializationSchema keySerializationSchema = createSerializationSchema(keyType);

        Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor = row -> {
            Object[] keyFields = new Object[keyFieldIndexArr.length];
            for (int i = 0; i < keyFieldIndexArr.length; i++) {
                keyFields[i] = row.getField(keyFieldIndexArr[i]);
            }
            return new SeaTunnelRow(keyFields);
        };
        return row -> keySerializationSchema.serialize(keyDataExtractor.apply(row));
    }
}


private int partation = -1;
private Integer partition;
private final String topic;
private final JsonSerializationSchema jsonSerializationSchema;
private final SerializationSchema keySerialization;
private final SerializationSchema valueSerialization;

public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType seaTunnelRowType) {
this.topic = topic;
this.jsonSerializationSchema = new JsonSerializationSchema(seaTunnelRowType);
this(topic, element -> null, createSerializationSchema(seaTunnelRowType));
}

public DefaultSeaTunnelRowSerializer(String topic, int partation, SeaTunnelRowType seaTunnelRowType) {
public DefaultSeaTunnelRowSerializer(String topic, Integer partition, SeaTunnelRowType seaTunnelRowType) {
this(topic, seaTunnelRowType);
this.partation = partation;
this.partition = partition;
}

public DefaultSeaTunnelRowSerializer(String topic,
List<String> keyFieldNames,
SeaTunnelRowType seaTunnelRowType) {
this(topic, createKeySerializationSchema(keyFieldNames, seaTunnelRowType),
createSerializationSchema(seaTunnelRowType));
}

public DefaultSeaTunnelRowSerializer(String topic,
SerializationSchema keySerialization,
SerializationSchema valueSerialization) {
this.topic = topic;
this.keySerialization = keySerialization;
this.valueSerialization = valueSerialization;
}

@Override
public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
if (this.partation != -1) {
return new ProducerRecord<>(topic, this.partation, null, jsonSerializationSchema.serialize(row));
}
else {
return new ProducerRecord<>(topic, null, jsonSerializationSchema.serialize(row));
}
return new ProducerRecord<>(topic, partition,
keySerialization.serialize(row), valueSerialization.serialize(row));
}

@Override
public ProducerRecord<byte[], byte[]> serializeRowByKey(String key, SeaTunnelRow row) {
//if the key is null, kafka will send message to a random partition
return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row));
private static SerializationSchema createSerializationSchema(SeaTunnelRowType rowType) {
return new JsonSerializationSchema(rowType);
}

private static SerializationSchema createKeySerializationSchema(List<String> keyFieldNames,
SeaTunnelRowType seaTunnelRowType) {
int[] keyFieldIndexArr = new int[keyFieldNames.size()];
SeaTunnelDataType[] keyFieldDataTypeArr = new SeaTunnelDataType[keyFieldNames.size()];
for (int i = 0; i < keyFieldNames.size(); i++) {
String keyFieldName = keyFieldNames.get(i);
int rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName);
keyFieldIndexArr[i] = rowFieldIndex;
keyFieldDataTypeArr[i] = seaTunnelRowType.getFieldType(rowFieldIndex);
}
SeaTunnelRowType keyType = new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
SerializationSchema keySerializationSchema = createSerializationSchema(keyType);

Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor = row -> {
Object[] keyFields = new Object[keyFieldIndexArr.length];
for (int i = 0; i < keyFieldIndexArr.length; i++) {
keyFields[i] = row.getField(keyFieldIndexArr[i]);
}
return new SeaTunnelRow(keyFields);
};
return row -> keySerializationSchema.serialize(keyDataExtractor.apply(row));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,4 @@ public interface SeaTunnelRowSerializer<K, V> {
* @return kafka record.
*/
ProducerRecord<K, V> serializeRow(SeaTunnelRow row);

/**
* Use Key serialize the {@link SeaTunnelRow} to a Kafka {@link ProducerRecord}.
*
* @param key String
* @param row seatunnel row
* @return kafka record.
*/
ProducerRecord<K, V> serializeRowByKey(String key, SeaTunnelRow row);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public OptionRule optionRule() {
return OptionRule.builder()
.required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
.optional(Config.KAFKA_CONFIG_PREFIX, Config.ASSIGN_PARTITIONS, Config.TRANSACTION_PREFIX)
.exclusive(Config.PARTITION, Config.PARTITION_KEY)
.exclusive(Config.PARTITION, Config.PARTITION_KEY_FIELDS)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;

import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;

Expand All @@ -40,56 +41,33 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.function.Function;

/**
* KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Kafka.
*/
public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaSinkState> {

private final SinkWriter.Context context;
private final Config pluginConfig;
private final Function<SeaTunnelRow, String> partitionExtractor;

private String transactionPrefix;
private long lastCheckpointId = 0;
private int partition;

private final KafkaProduceSender<byte[], byte[]> kafkaProducerSender;
private final SeaTunnelRowSerializer<byte[], byte[]> seaTunnelRowSerializer;

private static final int PREFIX_RANGE = 10000;

// check config
@Override
public void write(SeaTunnelRow element) {
ProducerRecord<byte[], byte[]> producerRecord = null;
//Determine the partition of the kafka send message based on the field name
if (pluginConfig.hasPath(PARTITION_KEY.key())){
String key = partitionExtractor.apply(element);
producerRecord = seaTunnelRowSerializer.serializeRowByKey(key, element);
}
else {
producerRecord = seaTunnelRowSerializer.serializeRow(element);
}
kafkaProducerSender.send(producerRecord);
}

public KafkaSinkWriter(
SinkWriter.Context context,
SeaTunnelRowType seaTunnelRowType,
Config pluginConfig,
List<KafkaSinkState> kafkaStates) {
this.context = context;
this.pluginConfig = pluginConfig;
this.partitionExtractor = createPartitionExtractor(pluginConfig, seaTunnelRowType);
if (pluginConfig.hasPath(PARTITION.key())) {
this.partition = pluginConfig.getInt(PARTITION.key());
}
if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
MessageContentPartitioner.setAssignPartitions(pluginConfig.getStringList(ASSIGN_PARTITIONS.key()));
}
Expand All @@ -116,6 +94,12 @@ public KafkaSinkWriter(
}
}

@Override
public void write(SeaTunnelRow element) {
ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(element);
kafkaProducerSender.send(producerRecord);
}

@Override
public List<KafkaSinkState> snapshotState(long checkpointId) {
List<KafkaSinkState> states = kafkaProducerSender.snapshotState(checkpointId);
Expand Down Expand Up @@ -145,8 +129,7 @@ public void close() {
}

private Properties getKafkaProperties(Config pluginConfig) {
Config kafkaConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig,
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX.key(), false);
Config kafkaConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig, KAFKA_CONFIG_PREFIX.key(), false);
Properties kafkaProperties = new Properties();
kafkaConfig.entrySet().forEach(entry -> {
kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
Expand All @@ -160,13 +143,13 @@ private Properties getKafkaProperties(Config pluginConfig) {
return kafkaProperties;
}

// todo: parse the target field from config
private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion

    private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
        if (pluginConfig.hasPath(PARTITION)){
            return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC),
                pluginConfig.getInt(PARTITION), seaTunnelRowType);
        } else {
            return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC),
                getPartitionKeyFields(pluginConfig, seaTunnelRowType), seaTunnelRowType);
        }
    }

if (pluginConfig.hasPath(PARTITION.key())){
return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()), this.partition, seaTunnelRowType);
}
else {
return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()), seaTunnelRowType);
if (pluginConfig.hasPath(PARTITION.key())) {
return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
pluginConfig.getInt(PARTITION.key()), seaTunnelRowType);
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest

if () {
   return xx;
} else if () {
  return xx;
} else {
  return xx;
}

replace to

if () {
    return xx;
}

if () {
  return xx;
}

return xx;

return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC.key()),
getPartitionKeyFields(pluginConfig, seaTunnelRowType), seaTunnelRowType);
}
}

Expand All @@ -188,23 +171,18 @@ private void restoreState(List<KafkaSinkState> states) {
}
}

private Function<SeaTunnelRow, String> createPartitionExtractor(Config pluginConfig,
SeaTunnelRowType seaTunnelRowType) {
if (!pluginConfig.hasPath(PARTITION_KEY.key())){
return row -> null;
}
String partitionKey = pluginConfig.getString(PARTITION_KEY.key());
List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
if (!fieldNames.contains(partitionKey)) {
return row -> partitionKey;
}
int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey);
return row -> {
Object partitionFieldValue = row.getField(partitionFieldIndex);
if (partitionFieldValue != null) {
return partitionFieldValue.toString();
private List<String> getPartitionKeyFields(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
if (pluginConfig.hasPath(PARTITION_KEY_FIELDS.key())) {
List<String> partitionKeyFields = pluginConfig.getStringList(PARTITION_KEY_FIELDS.key());
List<String> rowTypeFieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
for (String partitionKeyField : partitionKeyFields) {
if (!rowTypeFieldNames.contains(partitionKeyField)) {
throw new IllegalArgumentException(String.format(
"Partition key field not found: %s, rowType: %s", partitionKeyField, rowTypeFieldNames));
}
}
return null;
};
return partitionKeyFields;
}
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,36 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
<artifactId>seatunnel-connector-v2-e2e</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>connector-kafka-flink-e2e</artifactId>
<artifactId>connector-kafka-e2e</artifactId>

<dependencies>
<!-- SeaTunnel connectors -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-flink-e2e-base</artifactId>
<artifactId>connector-kafka</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- SeaTunnel connectors -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-kafka</artifactId>
<artifactId>connector-console</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-console</artifactId>
<artifactId>connector-assert</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-assert</artifactId>
<artifactId>connector-fake</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
Expand Down
Loading