From 55d48ba3d80acc1458c31bb9f1fd7ff8b2799ee2 Mon Sep 17 00:00:00 2001 From: Sridevi Nagaraj Date: Wed, 2 Feb 2022 15:14:42 -0800 Subject: [PATCH] Adding comments for kafka 2.4 functionality changes --- .../connectors/kafka/AbstractKafkaBasedConnectorTask.java | 2 ++ .../kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java | 2 ++ 2 files changed, 4 insertions(+) diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java index f1d5ab4aa..63f0c2122 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/AbstractKafkaBasedConnectorTask.java @@ -769,6 +769,8 @@ protected void updateConsumerAssignment(Collection partitions) { public void onPartitionsRevoked(Collection topicPartitions) { _logger.info("Partition ownership revoked for {}, checkpointing.", topicPartitions); _kafkaTopicPartitionTracker.onPartitionsRevoked(topicPartitions); + //_failure flag is used to address 2.4 kafka version behavior changes for onPartitionRevoked calls. + //Prior to this version, consumer close was not calling onPartitionsRevoked. if (!_shutdown && !topicPartitions.isEmpty() && !_failure) { // there is a commit at the end of the run method, skip extra commit in shouldDie mode. try { maybeCommitOffsets(_consumer, true); // happens inline as part of poll diff --git a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java index 4d7c92e04..753e6134e 100644 --- a/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java +++ b/datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnectorTask.java @@ -370,6 +370,8 @@ public void stop() { @Override public void onPartitionsRevoked(Collection partitions) { + //Do not remove super.onPartitionsRevoked call or refactor the code to take care of the kafka 2.4 changes. + //Prior to this version, consumer close was not calling onPartitionsRevoked. super.onPartitionsRevoked(partitions); _topicManager.onPartitionsRevoked(partitions); }