Skip to content

Commit

Permalink
MINOR: Add type check to classic group timeout operations (#15587)
Browse files Browse the repository at this point in the history
When implementing the group type conversion from a classic group to a consumer group, if the replay of conversion records fails, the group should be reverted back including its timeouts.

A possible solution is to keep all the classic group timeouts and add a type check to the timeout operations. If the group is successfully upgraded, it won't be able to pass the type check and its operations will be executed without actually doing anything; if the group upgrade fails, the group map will be reverted and the timeout operations will be executed as is.

We've already have group type check in consumer group timeout operations. This patch adds similar type check to those classic group timeout operations.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
dongnuo123 authored Apr 10, 2024
1 parent 5c855be commit 9bc48af
Showing 1 changed file with 54 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2286,7 +2286,7 @@ private CoordinatorResult<Void, Record> classicGroupJoinNewDynamicMember(
request.sessionTimeoutMs(),
TimeUnit.MILLISECONDS,
false,
() -> expireClassicGroupMemberHeartbeat(group, newMemberId)
() -> expireClassicGroupMemberHeartbeat(group.groupId(), newMemberId)
);

responseFuture.complete(new JoinGroupResponseData()
Expand Down Expand Up @@ -2457,6 +2457,22 @@ private CoordinatorResult<Void, Record> classicGroupJoinExistingMember(
return EMPTY_RESULT;
}

/**
* An overload of {@link GroupMetadataManager#completeClassicGroupJoin(ClassicGroup)} used as
* timeout operation. It additionally looks up the group by the id and checks the group type.
* completeClassicGroupJoin will only be called if the group is CLASSIC.
*/
private CoordinatorResult<Void, Record> completeClassicGroupJoin(String groupId) {
ClassicGroup group;
try {
group = getOrMaybeCreateClassicGroup(groupId, false);
} catch (UnknownMemberIdException | GroupIdNotFoundException exception) {
log.debug("Cannot find the group, skipping rebalance stage.", exception);
return EMPTY_RESULT;
}
return completeClassicGroupJoin(group);
}

/**
* Complete the join group phase. Remove all dynamic members that have not rejoined
* during this stage and proceed with the next generation for this group. The generation id
Expand Down Expand Up @@ -2504,7 +2520,7 @@ private CoordinatorResult<Void, Record> completeClassicGroupJoin(
group.rebalanceTimeoutMs(),
TimeUnit.MILLISECONDS,
false,
() -> completeClassicGroupJoin(group)
() -> completeClassicGroupJoin(group.groupId())
);

return EMPTY_RESULT;
Expand Down Expand Up @@ -2575,22 +2591,31 @@ private void schedulePendingSync(ClassicGroup group) {
group.rebalanceTimeoutMs(),
TimeUnit.MILLISECONDS,
false,
() -> expirePendingSync(group, group.generationId()));
() -> expirePendingSync(group.groupId(), group.generationId()));
}

/**
* Invoked when the heartbeat operation is expired from the timer. Possibly remove the member and
* try complete the join phase.
*
* @param group The group.
* @param groupId The group id.
* @param memberId The member id.
*
* @return The coordinator result that will be appended to the log.
*/
private CoordinatorResult<Void, Record> expireClassicGroupMemberHeartbeat(
ClassicGroup group,
String groupId,
String memberId
) {
ClassicGroup group;
try {
group = getOrMaybeCreateClassicGroup(groupId, false);
} catch (UnknownMemberIdException | GroupIdNotFoundException exception) {
log.debug("Received notification of heartbeat expiration for member {} after group {} " +
"had already been deleted or upgraded.", memberId, groupId);
return EMPTY_RESULT;
}

if (group.isInState(DEAD)) {
log.info("Received notification of heartbeat expiration for member {} after group {} " +
"had already been unloaded or deleted.",
Expand Down Expand Up @@ -2805,7 +2830,7 @@ CoordinatorResult<Void, Record> prepareRebalance(
delayMs,
TimeUnit.MILLISECONDS,
false,
() -> tryCompleteInitialRebalanceElseSchedule(group, delayMs, remainingMs)
() -> tryCompleteInitialRebalanceElseSchedule(group.groupId(), delayMs, remainingMs)
);
}

Expand Down Expand Up @@ -2837,7 +2862,7 @@ private CoordinatorResult<Void, Record> maybeCompleteJoinElseSchedule(
group.rebalanceTimeoutMs(),
TimeUnit.MILLISECONDS,
false,
() -> completeClassicGroupJoin(group)
() -> completeClassicGroupJoin(group.groupId())
);
return EMPTY_RESULT;
}
Expand All @@ -2847,15 +2872,23 @@ private CoordinatorResult<Void, Record> maybeCompleteJoinElseSchedule(
* Try to complete the join phase of the initial rebalance.
* Otherwise, extend the rebalance.
*
* @param group The group under initial rebalance.
* @param groupId The group under initial rebalance.
*
* @return The coordinator result that will be appended to the log.
*/
private CoordinatorResult<Void, Record> tryCompleteInitialRebalanceElseSchedule(
ClassicGroup group,
String groupId,
int delayMs,
int remainingMs
) {
ClassicGroup group;
try {
group = getOrMaybeCreateClassicGroup(groupId, false);
} catch (UnknownMemberIdException | GroupIdNotFoundException exception) {
log.debug("Cannot find the group, skipping the initial rebalance stage.", exception);
return EMPTY_RESULT;
}

if (group.newMemberAdded() && remainingMs != 0) {
// A new member was added. Extend the delay.
group.setNewMemberAdded(false);
Expand All @@ -2867,7 +2900,7 @@ private CoordinatorResult<Void, Record> tryCompleteInitialRebalanceElseSchedule(
newDelayMs,
TimeUnit.MILLISECONDS,
false,
() -> tryCompleteInitialRebalanceElseSchedule(group, newDelayMs, newRemainingMs)
() -> tryCompleteInitialRebalanceElseSchedule(group.groupId(), newDelayMs, newRemainingMs)
);
} else {
// No more time remaining. Complete the join phase.
Expand Down Expand Up @@ -2979,7 +3012,7 @@ private void rescheduleClassicGroupMemberHeartbeat(
timeoutMs,
TimeUnit.MILLISECONDS,
false,
() -> expireClassicGroupMemberHeartbeat(group, member.memberId()));
() -> expireClassicGroupMemberHeartbeat(group.groupId(), member.memberId()));
}

/**
Expand All @@ -2996,15 +3029,23 @@ private void removeSyncExpiration(ClassicGroup group) {
/**
* Expire pending sync.
*
* @param group The group.
* @param groupId The group id.
* @param generationId The generation when the pending sync was originally scheduled.
*
* @return The coordinator result that will be appended to the log.
* */
private CoordinatorResult<Void, Record> expirePendingSync(
ClassicGroup group,
String groupId,
int generationId
) {
ClassicGroup group;
try {
group = getOrMaybeCreateClassicGroup(groupId, false);
} catch (UnknownMemberIdException | GroupIdNotFoundException exception) {
log.debug("Received notification of sync expiration for an unknown classic group {}.", groupId);
return EMPTY_RESULT;
}

if (generationId != group.generationId()) {
log.error("Received unexpected notification of sync expiration for {} with an old " +
"generation {} while the group has {}.", group.groupId(), generationId, group.generationId());
Expand All @@ -3027,7 +3068,6 @@ private CoordinatorResult<Void, Record> expirePendingSync(
}
}
}

return EMPTY_RESULT;
}

Expand Down

0 comments on commit 9bc48af

Please sign in to comment.