Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-V2][Kafka] Support to specify multiple partition keys #3230

Merged
merged 1 commit into from
Nov 17, 2022

Conversation

harveyyue
Copy link
Contributor

Purpose of this pull request

Check list

@@ -51,7 +51,7 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b

NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.

### partition_key [string]
### partition_key [array]

Configure which field is used as the key of the kafka message.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
SeaTunnelRowType keySeaTunnelRowType = new SeaTunnelRowType(keyFieldNames, keyFieldTypes);
return new DefaultSeaTunnelRowSerializer(this.topic, keySeaTunnelRowType, seaTunnelRowType);
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest

if () {
   return xx;
} else if () {
  return xx;
} else {
  return xx;
}

replace to

if () {
    return xx;
}

if () {
  return xx;
}

return xx;

.filter(f -> {
if (Arrays.asList(seaTunnelRowType.getFieldNames()).contains(f)) {
return true;
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else is redundant.

@harveyyue
Copy link
Contributor Author

@EricJoy2048 Done.

@harveyyue harveyyue force-pushed the dev-kafka-2 branch 2 times, most recently from 26d77a8 to 8cf9685 Compare October 30, 2022 07:01
EricJoy2048
EricJoy2048 previously approved these changes Oct 31, 2022
Copy link
Member

@EricJoy2048 EricJoy2048 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM @hailin0 PTAL

EricJoy2048
EricJoy2048 previously approved these changes Nov 1, 2022
Copy link
Member

@EricJoy2048 EricJoy2048 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@TestTemplate
public void testSinkKafka(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/kafkasink_fake_to_kafka.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your contribution. In addition to checking that the job exits properly, you also need to check that data is successfully written to kafka

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the consumer data size and key message schema check

.build();
generateTestData(row -> new ProducerRecord<>("test_topic_text", null, serializer.serialize(row)));
Container.ExecResult execResult = container.executeJob("/kafkasource_text_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, it is suggest to use AssertSink or fileSink to check whether the output data is as expected

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already use AssertSink

DefaultSeaTunnelRowSerializer serializer = new DefaultSeaTunnelRowSerializer("test_topic_json", SEATUNNEL_ROW_TYPE);
generateTestData(row -> serializer.serializeRow(row));
Container.ExecResult execResult = container.executeJob("/kafkasource_json_to_console.conf");
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already use AssertSink

@hailin0
Copy link
Member

hailin0 commented Nov 4, 2022

@TaoZex please help to review

Kafka {
bootstrap.servers = "kafkaCluster:9092"
topic = "test_topic"
partition_key = ["c_map","c_string"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to use a fixed string partition key value? eg: partition_key = 'aaa'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is recommended to implement in this way, which is simpler for users to use

partition_key = "${c_map},${c_string}"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of partition_key can be a normal string or an expression to extract the field content. We have already supported single field extraction before

#3085

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hailin0 You want to keep the logic of following code snippet, right?
if (!fieldNames.contains(partitionKey)) { return row -> partitionKey; }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor Author

@harveyyue harveyyue Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, the fixed string partition key value will cause to write all of data to a partition of topic, specify or omit partition key(s) is good practice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree

I'm worried about breaking the released feature

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps rename config key to partition_key_fields is more appropriate

Copy link
Member

@hailin0 hailin0 Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and need to update the examples & description in the docs

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@@ -0,0 +1,22 @@
#
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
.withNetwork(NETWORK)
.withNetworkAliases(KAFKA_HOST)
.withLogConsumer(new Slf4jLogConsumer(log));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.withLogConsumer(new Slf4jLogConsumer(log));
.withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("confluentinc/cp-kafka:6.2.1")));

Comment on lines 230 to 187
private List<String> createPartitionKeys(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
if (pluginConfig.hasPath(PARTITION_KEY_FIELDS)) {
return pluginConfig.getStringList(PARTITION_KEY_FIELDS).stream()
.filter(f -> Arrays.asList(seaTunnelRowType.getFieldNames()).contains(f))
.collect(Collectors.toList());
}
return null;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion

    private List<String> getPartitionKeyFields(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
        if (pluginConfig.hasPath(PARTITION_KEY_FIELDS)) {
            List<String> partitionKeyFields = pluginConfig.getStringList(PARTITION_KEY_FIELDS);
            List<String> rowTypeFieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
            for (String partitionKeyField : partitionKeyFields) {
                if (!rowTypeFieldNames.contains(partitionKeyField)) {
                    throw new IllegalArgumentException(String.format(
                        "Partition key field not found: %s, rowType: %s", partitionKeyField, rowTypeFieldNames));
                }
            }
            return partitionKeyFields;
        }
        return Collections.emptyList();
    }

private Function<SeaTunnelRow, String> createPartitionExtractor(Config pluginConfig,
SeaTunnelRowType seaTunnelRowType) {
if (!pluginConfig.hasPath(PARTITION_KEY)){
private Function<SeaTunnelRow, SeaTunnelRow> createPartitionExtractor(SeaTunnelRowType seaTunnelRowType) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion

remove this method

@@ -162,12 +168,25 @@ private Properties getKafkaProperties(Config pluginConfig) {

// todo: parse the target field from config
private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion

    private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
        if (pluginConfig.hasPath(PARTITION)){
            return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC),
                pluginConfig.getInt(PARTITION), seaTunnelRowType);
        } else {
            return new DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC),
                getPartitionKeyFields(pluginConfig, seaTunnelRowType), seaTunnelRowType);
        }
    }

* @param row seatunnel row
* @return kafka record.
*/
ProducerRecord<K, V> serializeRowByKey(String key, SeaTunnelRow row);
ProducerRecord<K, V> serializeRowByKey(SeaTunnelRow key, SeaTunnelRow row);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion

remove this method

Suggested change
ProducerRecord<K, V> serializeRowByKey(SeaTunnelRow key, SeaTunnelRow row);

@@ -67,13 +72,12 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
// check config
@Override
public void write(SeaTunnelRow element) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion

    public void write(SeaTunnelRow element) {
        ProducerRecord<byte[], byte[]> producerRecord = seaTunnelRowSerializer.serializeRow(element);
        kafkaProducerSender.send(producerRecord);
    }

@@ -25,34 +25,42 @@

public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byte[], byte[]> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion

public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer<byte[], byte[]> {

    private Integer partition;
    private final String topic;
    private final SerializationSchema keySerialization;
    private final SerializationSchema valueSerialization;

    public DefaultSeaTunnelRowSerializer(String topic, SeaTunnelRowType seaTunnelRowType) {
        this(topic, element -> null, createSerializationSchema(seaTunnelRowType));
    }

    public DefaultSeaTunnelRowSerializer(String topic, Integer partition, SeaTunnelRowType seaTunnelRowType) {
        this(topic, seaTunnelRowType);
        this.partition = partition;
    }

    public DefaultSeaTunnelRowSerializer(String topic,
                                         List<String> keyFieldNames,
                                         SeaTunnelRowType seaTunnelRowType) {
        this(topic, createKeySerializationSchema(keyFieldNames, seaTunnelRowType),
            createSerializationSchema(seaTunnelRowType));
    }

    public DefaultSeaTunnelRowSerializer(String topic,
                                         SerializationSchema keySerialization,
                                         SerializationSchema valueSerialization) {
        this.topic = topic;
        this.keySerialization = keySerialization;
        this.valueSerialization = valueSerialization;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serializeRow(SeaTunnelRow row) {
        return new ProducerRecord<>(topic, partition,
            keySerialization.serialize(row), valueSerialization.serialize(row));
    }

    private static SerializationSchema createSerializationSchema(SeaTunnelRowType rowType) {
        return new JsonSerializationSchema(rowType);
    }

    private static SerializationSchema createKeySerializationSchema(List<String> keyFieldNames,
                                                                    SeaTunnelRowType seaTunnelRowType) {
        int[] keyFieldIndexArr = new int[keyFieldNames.size()];
        SeaTunnelDataType[] keyFieldDataTypeArr = new SeaTunnelDataType[keyFieldNames.size()];
        for (int i = 0; i < keyFieldNames.size(); i++) {
            String keyFieldName = keyFieldNames.get(i);
            int rowFieldIndex = seaTunnelRowType.indexOf(keyFieldName);
            keyFieldIndexArr[i] = rowFieldIndex;
            keyFieldDataTypeArr[i] = seaTunnelRowType.getFieldType(rowFieldIndex);
        }
        SeaTunnelRowType keyType = new SeaTunnelRowType(keyFieldNames.toArray(new String[0]), keyFieldDataTypeArr);
        SerializationSchema keySerializationSchema = createSerializationSchema(keyType);

        Function<SeaTunnelRow, SeaTunnelRow> keyDataExtractor = row -> {
            Object[] keyFields = new Object[keyFieldIndexArr.length];
            for (int i = 0; i < keyFieldIndexArr.length; i++) {
                keyFields[i] = row.getField(keyFieldIndexArr[i]);
            }
            return new SeaTunnelRow(keyFields);
        };
        return row -> keySerializationSchema.serialize(keyDataExtractor.apply(row));
    }
}

@harveyyue
Copy link
Contributor Author

@hailin0 Great suggestions! Actually, you have refactor the sink serialization.

hailin0
hailin0 previously approved these changes Nov 5, 2022
Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

ic4y
ic4y previously approved these changes Nov 7, 2022
Copy link
Contributor

@ic4y ic4y left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@Hisoka-X
Copy link
Member

Hisoka-X commented Nov 9, 2022

Pleaes resolve conflict, Thanks!

@EricJoy2048
Copy link
Member

Hi, @harveyyue . Resolve the conflicts please.

@harveyyue harveyyue dismissed stale reviews from ic4y and hailin0 via c7eaae7 November 11, 2022 16:12
@harveyyue harveyyue force-pushed the dev-kafka-2 branch 2 times, most recently from c7eaae7 to 96ce728 Compare November 11, 2022 16:18
@harveyyue
Copy link
Contributor Author

Sorry late response, have fixed conflicts.

@EricJoy2048
Copy link
Member

Please merge the dev code after #3401 merged.

Copy link
Member

@TyrantLucifer TyrantLucifer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@TaoZex TaoZex left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

@ic4y ic4y left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ic4y ic4y merged commit f65f44f into apache:dev Nov 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants