From 9582870dbf1237b053612aa5e83e01ffef711716 Mon Sep 17 00:00:00 2001 From: fanchengbo <664283422@qq.com> Date: Mon, 21 Oct 2024 11:20:07 +0800 Subject: [PATCH] [Hotfix][Connector-V2][kafka] code review --- .../kafka/sink/KafkaTransactionSender.java | 1 + .../e2e/connector/kafka/KafkaIT.java | 15 ++++++++++-- ....conf => kafka_to_kafka_exactly_once.conf} | 23 ++++++++----------- 3 files changed, 23 insertions(+), 16 deletions(-) rename seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/{fake_to_kafka_exactly_once.conf => kafka_to_kafka_exactly_once.conf} (78%) diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java index b9503c2ec803..7dc55bf20ed5 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java @@ -56,6 +56,7 @@ public KafkaTransactionSender(String transactionPrefix, Properties kafkaProperti @Override public void send(ProducerRecord producerRecord) { kafkaProducer.send(producerRecord); + recordNumInTransaction++; } @Override diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 086d8ed1eacf..b95da64f4e89 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -770,11 +770,22 @@ public void testKafkaProtobufToAssert(TestContainer container) } @TestTemplate - public void testKafkaExactlyOnce(TestContainer container) throws Exception { + public void testKafkaToKafkaExactlyOnce(TestContainer container) throws Exception { + TextSerializationSchema serializer = + TextSerializationSchema.builder() + .seaTunnelRowType(SEATUNNEL_ROW_TYPE) + .delimiter(",") + .build(); + generateTestData( + row -> + new ProducerRecord<>( + "kafka_topic_exactly_once", null, serializer.serialize(row)), + 0, + 10); container.executeJob("/kafka/fake_to_kafka_exactly_once.conf"); String topicName = "kafka_topic_exactly_once"; Map data = getKafkaConsumerData(topicName); - Assertions.assertEquals(4, data.size()); + Assertions.assertEquals(10, data.size()); } public static String getTestConfigFile(String configFile) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/fake_to_kafka_exactly_once.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once.conf similarity index 78% rename from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/fake_to_kafka_exactly_once.conf rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once.conf index e57aa89565c3..56f56fbdb805 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/fake_to_kafka_exactly_once.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka/kafka_to_kafka_exactly_once.conf @@ -16,25 +16,20 @@ # env { execution.parallelism = 1 - job.mode = "BATCH" + job.mode = "STREAMING" } source { - FakeSource { - parallelism = 1 - result_table_name = "fake" - split.read-interval = 30000 - split.num = 2 - row.num = 4 - schema = { - fields { - name = "string" - age = "int" - } - } - } + Kafka { + bootstrap.servers = "kafkaCluster:9092" + topic = "kafka_topic_exactly_once" + result_table_name = "kafka_topic_exactly_once" + # The default format is json, which is optional + format = json + start_mode = earliest } +} transform {}