Skip to content

Commit

Permalink
[Improve][Connector-V2][Kafka] Improve the sink assertion of KafkaIT
Browse files Browse the repository at this point in the history
  • Loading branch information
harveyyue committed Nov 4, 2022
1 parent c1742e8 commit 22cfbbb
Showing 1 changed file with 50 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,20 @@
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.format.text.TextSerializationSchema;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
Expand All @@ -50,9 +59,13 @@

import java.io.IOException;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
Expand Down Expand Up @@ -99,7 +112,33 @@ public void tearDown() throws Exception {
@TestTemplate
public void testSinkKafka(TestContainer container) throws IOException, InterruptedException {
Container.ExecResult execResult = container.executeJob("/kafkasink_fake_to_kafka.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());

String topicName = "test_topic";
Map<String, String> data = new HashMap<>();
ObjectMapper objectMapper = new ObjectMapper();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConsumerConfig())) {
consumer.subscribe(Arrays.asList(topicName));
Map<TopicPartition, Long> offsets = consumer.endOffsets(Arrays.asList(new TopicPartition(topicName, 0)));
Long endOffset = offsets.entrySet().iterator().next().getValue();
Long lastProcessedOffset = -1L;

do {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (lastProcessedOffset < record.offset()) {

data.put(record.key(), record.value());
}
lastProcessedOffset = record.offset();
}
} while (lastProcessedOffset < endOffset - 1);
}
String key = data.keySet().iterator().next();
ObjectNode objectNode = objectMapper.readValue(key, ObjectNode.class);
Assertions.assertTrue(objectNode.has("c_map"));
Assertions.assertTrue(objectNode.has("c_string"));
Assertions.assertEquals(10, data.size());
}

@TestTemplate
Expand Down Expand Up @@ -130,6 +169,16 @@ private void initKafkaProducer() {
producer = new KafkaProducer<>(props);
}

private Properties kafkaConsumerConfig() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "seatunnel-kafka-sink-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

@SuppressWarnings("checkstyle:Indentation")
private void generateTestData(ProducerRecordConverter converter) {
for (int i = 0; i < 100; i++) {
Expand Down

0 comments on commit 22cfbbb

Please sign in to comment.