diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java index 3e493ee9ad8b..5518ef8c2844 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java @@ -32,11 +32,20 @@ import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.format.text.TextSerializationSchema; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -50,9 +59,13 @@ import java.io.IOException; import java.math.BigDecimal; +import java.time.Duration; import java.time.LocalDate; import java.time.LocalDateTime; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -99,7 +112,33 @@ public void tearDown() throws Exception { @TestTemplate public void testSinkKafka(TestContainer container) throws IOException, InterruptedException { Container.ExecResult execResult = container.executeJob("/kafkasink_fake_to_kafka.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + + String topicName = "test_topic"; + Map data = new HashMap<>(); + ObjectMapper objectMapper = new ObjectMapper(); + try (KafkaConsumer consumer = new KafkaConsumer<>(kafkaConsumerConfig())) { + consumer.subscribe(Arrays.asList(topicName)); + Map offsets = consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0))); + Long endOffset = offsets.entrySet().iterator().next().getValue(); + Long lastProcessedOffset = -1L; + + do { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + if (lastProcessedOffset < record.offset()) { + + data.put(record.key(), record.value()); + } + lastProcessedOffset = record.offset(); + } + } while (lastProcessedOffset < endOffset - 1); + } + String key = data.keySet().iterator().next(); + ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class); + Assertions.assertTrue(objectNode.has("c_map")); + Assertions.assertTrue(objectNode.has("c_string")); + Assertions.assertEquals(10, data.size()); } @TestTemplate @@ -130,6 +169,16 @@ private void initKafkaProducer() { producer = new KafkaProducer<>(props); } + private Properties kafkaConsumerConfig() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "seatunnel-kafka-sink-group"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + return props; + } + @SuppressWarnings("checkstyle:Indentation") private void generateTestData(ProducerRecordConverter converter) { for (int i = 0; i < 100; i++) {