From 9bc48af1c119a951f398010dbfc416754cc76fe1 Mon Sep 17 00:00:00 2001 From: Dongnuo Lyu <139248811+dongnuo123@users.noreply.github.com> Date: Wed, 10 Apr 2024 03:36:49 -0400 Subject: [PATCH] MINOR: Add type check to classic group timeout operations (#15587) When implementing the group type conversion from a classic group to a consumer group, if the replay of conversion records fails, the group should be reverted back including its timeouts. A possible solution is to keep all the classic group timeouts and add a type check to the timeout operations. If the group is successfully upgraded, it won't be able to pass the type check and its operations will be executed without actually doing anything; if the group upgrade fails, the group map will be reverted and the timeout operations will be executed as is. We've already have group type check in consumer group timeout operations. This patch adds similar type check to those classic group timeout operations. Reviewers: David Jacot --- .../group/GroupMetadataManager.java | 68 +++++++++++++++---- 1 file changed, 54 insertions(+), 14 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 9cfe8f617e630..d12ed6a9dead2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2286,7 +2286,7 @@ private CoordinatorResult classicGroupJoinNewDynamicMember( request.sessionTimeoutMs(), TimeUnit.MILLISECONDS, false, - () -> expireClassicGroupMemberHeartbeat(group, newMemberId) + () -> expireClassicGroupMemberHeartbeat(group.groupId(), newMemberId) ); responseFuture.complete(new JoinGroupResponseData() @@ -2457,6 +2457,22 @@ private CoordinatorResult classicGroupJoinExistingMember( return EMPTY_RESULT; } + /** + * An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as + * timeout operation. It additionally looks up the group by the id and checks the group type. + * completeClassicGroupJoin will only be called if the group is CLASSIC. + */ + private CoordinatorResult completeClassicGroupJoin(String groupId) { + ClassicGroup group; + try { + group = getOrMaybeCreateClassicGroup(groupId, false); + } catch (UnknownMemberIdException | GroupIdNotFoundException exception) { + log.debug("Cannot find the group, skipping rebalance stage.", exception); + return EMPTY_RESULT; + } + return completeClassicGroupJoin(group); + } + /** * Complete the join group phase. Remove all dynamic members that have not rejoined * during this stage and proceed with the next generation for this group. The generation id @@ -2504,7 +2520,7 @@ private CoordinatorResult completeClassicGroupJoin( group.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, - () -> completeClassicGroupJoin(group) + () -> completeClassicGroupJoin(group.groupId()) ); return EMPTY_RESULT; @@ -2575,22 +2591,31 @@ private void schedulePendingSync(ClassicGroup group) { group.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, - () -> expirePendingSync(group, group.generationId())); + () -> expirePendingSync(group.groupId(), group.generationId())); } /** * Invoked when the heartbeat operation is expired from the timer. Possibly remove the member and * try complete the join phase. * - * @param group The group. + * @param groupId The group id. * @param memberId The member id. * * @return The coordinator result that will be appended to the log. */ private CoordinatorResult expireClassicGroupMemberHeartbeat( - ClassicGroup group, + String groupId, String memberId ) { + ClassicGroup group; + try { + group = getOrMaybeCreateClassicGroup(groupId, false); + } catch (UnknownMemberIdException | GroupIdNotFoundException exception) { + log.debug("Received notification of heartbeat expiration for member {} after group {} " + + "had already been deleted or upgraded.", memberId, groupId); + return EMPTY_RESULT; + } + if (group.isInState(DEAD)) { log.info("Received notification of heartbeat expiration for member {} after group {} " + "had already been unloaded or deleted.", @@ -2805,7 +2830,7 @@ CoordinatorResult prepareRebalance( delayMs, TimeUnit.MILLISECONDS, false, - () -> tryCompleteInitialRebalanceElseSchedule(group, delayMs, remainingMs) + () -> tryCompleteInitialRebalanceElseSchedule(group.groupId(), delayMs, remainingMs) ); } @@ -2837,7 +2862,7 @@ private CoordinatorResult maybeCompleteJoinElseSchedule( group.rebalanceTimeoutMs(), TimeUnit.MILLISECONDS, false, - () -> completeClassicGroupJoin(group) + () -> completeClassicGroupJoin(group.groupId()) ); return EMPTY_RESULT; } @@ -2847,15 +2872,23 @@ private CoordinatorResult maybeCompleteJoinElseSchedule( * Try to complete the join phase of the initial rebalance. * Otherwise, extend the rebalance. * - * @param group The group under initial rebalance. + * @param groupId The group under initial rebalance. * * @return The coordinator result that will be appended to the log. */ private CoordinatorResult tryCompleteInitialRebalanceElseSchedule( - ClassicGroup group, + String groupId, int delayMs, int remainingMs ) { + ClassicGroup group; + try { + group = getOrMaybeCreateClassicGroup(groupId, false); + } catch (UnknownMemberIdException | GroupIdNotFoundException exception) { + log.debug("Cannot find the group, skipping the initial rebalance stage.", exception); + return EMPTY_RESULT; + } + if (group.newMemberAdded() && remainingMs != 0) { // A new member was added. Extend the delay. group.setNewMemberAdded(false); @@ -2867,7 +2900,7 @@ private CoordinatorResult tryCompleteInitialRebalanceElseSchedule( newDelayMs, TimeUnit.MILLISECONDS, false, - () -> tryCompleteInitialRebalanceElseSchedule(group, newDelayMs, newRemainingMs) + () -> tryCompleteInitialRebalanceElseSchedule(group.groupId(), newDelayMs, newRemainingMs) ); } else { // No more time remaining. Complete the join phase. @@ -2979,7 +3012,7 @@ private void rescheduleClassicGroupMemberHeartbeat( timeoutMs, TimeUnit.MILLISECONDS, false, - () -> expireClassicGroupMemberHeartbeat(group, member.memberId())); + () -> expireClassicGroupMemberHeartbeat(group.groupId(), member.memberId())); } /** @@ -2996,15 +3029,23 @@ private void removeSyncExpiration(ClassicGroup group) { /** * Expire pending sync. * - * @param group The group. + * @param groupId The group id. * @param generationId The generation when the pending sync was originally scheduled. * * @return The coordinator result that will be appended to the log. * */ private CoordinatorResult expirePendingSync( - ClassicGroup group, + String groupId, int generationId ) { + ClassicGroup group; + try { + group = getOrMaybeCreateClassicGroup(groupId, false); + } catch (UnknownMemberIdException | GroupIdNotFoundException exception) { + log.debug("Received notification of sync expiration for an unknown classic group {}.", groupId); + return EMPTY_RESULT; + } + if (generationId != group.generationId()) { log.error("Received unexpected notification of sync expiration for {} with an old " + "generation {} while the group has {}.", group.groupId(), generationId, group.generationId()); @@ -3027,7 +3068,6 @@ private CoordinatorResult expirePendingSync( } } } - return EMPTY_RESULT; }