From 719ab93e9413c438db780c23208ddeae4547b11e Mon Sep 17 00:00:00 2001 From: Kyle Phelps Date: Thu, 14 Mar 2024 13:32:58 -0400 Subject: [PATCH] Fix commit metadata too large partial failure --- .../group/GroupMetadataManager.scala | 4 +- .../group/GroupMetadataManagerTest.scala | 59 +++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 6012722332d56..80493eddcbb4e 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -494,9 +494,9 @@ class GroupMetadataManager(brokerId: Int, if (isTxnOffsetCommit) { addProducerGroup(producerId, group.groupId) - group.prepareTxnOffsetCommit(producerId, offsetMetadata) + group.prepareTxnOffsetCommit(producerId, filteredOffsetMetadata) } else { - group.prepareOffsetCommit(offsetMetadata) + group.prepareOffsetCommit(filteredOffsetMetadata) } appendForGroup(group, records, requestLocal, putCacheCallback, verificationGuards) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index f67899eea2b95..4df4199326172 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -1661,6 +1661,65 @@ class GroupMetadataManagerTest { assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) } + @Test + def testOffsetMetadataTooLargePartialFailure(): Unit = { + val memberId = "" + val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo") + val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo") + val offset = 37 + val requireStable = true; + + groupMetadataManager.addOwnedPartition(groupPartitionId) + val group = new GroupMetadata(groupId, Empty, time) + groupMetadataManager.addGroup(group) + + val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId)) + val offsets = immutable.Map( + topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()), + validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds())) + + expectAppendMessage(Errors.NONE) + + var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None + def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = { + commitErrors = Some(errors) + } + + assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None) + assertTrue(group.hasOffsets) + + assertFalse(commitErrors.isEmpty) + assertEquals( + Some(Errors.OFFSET_METADATA_TOO_LARGE), + commitErrors.get.get(topicIdPartition) + ) + assertEquals( + Some(Errors.NONE), + commitErrors.get.get(validTopicIdPartition) + ) + + val cachedOffsets = groupMetadataManager.getOffsets( + groupId, + requireStable, + Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition)) + ) + assertEquals( + Some(OffsetFetchResponse.INVALID_OFFSET), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset) + ) + assertEquals( + Some(Errors.NONE), + cachedOffsets.get(topicIdPartition.topicPartition).map(_.error) + ) + assertEquals( + Some(offset), + cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset) + ) + + assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) + } + @Test def testExpireOffset(): Unit = { val memberId = ""