Skip to content

Commit

Permalink
KAFKA-16832; LeaveGroup API for upgrading ConsumerGroup (apache#16057)
Browse files Browse the repository at this point in the history
This patch implements the LeaveGroup API to the consumer groups that are in the mixed mode.

Reviewers: Jeff Kim <[email protected]>, David Jacot <[email protected]>
  • Loading branch information
dongnuo123 authored May 29, 2024
1 parent 9562143 commit eefd114
Show file tree
Hide file tree
Showing 5 changed files with 565 additions and 10 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@
<suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest).java"/>
<suppress checks="JavaNCSS"
files="GroupMetadataManagerTest.java"/>
files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>

<!-- storage -->
<suppress checks="CyclomaticComplexity"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2065,6 +2065,7 @@ private void removeMember(List<CoordinatorRecord> records, String groupId, Strin
private void cancelTimers(String groupId, String memberId) {
cancelConsumerGroupSessionTimeout(groupId, memberId);
cancelConsumerGroupRebalanceTimeout(groupId, memberId);
cancelConsumerGroupJoinTimeout(groupId, memberId);
cancelConsumerGroupSyncTimeout(groupId, memberId);
}

Expand Down Expand Up @@ -3985,10 +3986,17 @@ public CoordinatorResult<Void, CoordinatorRecord> classicGroupSync(
RequestContext context,
SyncGroupRequestData request,
CompletableFuture<SyncGroupResponseData> responseFuture
) throws UnknownMemberIdException, GroupIdNotFoundException {
Group group = groups.get(request.groupId(), Long.MAX_VALUE);
) throws UnknownMemberIdException {
Group group;
try {
group = group(request.groupId());
} catch (GroupIdNotFoundException e) {
responseFuture.complete(new SyncGroupResponseData()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
return EMPTY_RESULT;
}

if (group == null || group.isEmpty()) {
if (group.isEmpty()) {
responseFuture.complete(new SyncGroupResponseData()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
return EMPTY_RESULT;
Expand Down Expand Up @@ -4250,9 +4258,10 @@ public CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupH
RequestContext context,
HeartbeatRequestData request
) {
Group group = groups.get(request.groupId(), Long.MAX_VALUE);

if (group == null) {
Group group;
try {
group = group(request.groupId());
} catch (GroupIdNotFoundException e) {
throw new UnknownMemberIdException(
String.format("Group %s not found.", request.groupId())
);
Expand Down Expand Up @@ -4426,14 +4435,129 @@ private ConsumerGroupMember validateConsumerGroupMember(
* @param context The request context.
* @param request The actual LeaveGroup request.
*
* @return The LeaveGroup response and the records to append.
*/
public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeave(
RequestContext context,
LeaveGroupRequestData request
) throws UnknownMemberIdException {
Group group;
try {
group = group(request.groupId());
} catch (GroupIdNotFoundException e) {
throw new UnknownMemberIdException(String.format("Group %s not found.", request.groupId()));
}

if (group.type() == CLASSIC) {
return classicGroupLeaveToClassicGroup((ClassicGroup) group, context, request);
} else {
return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, context, request);
}
}

/**
* Handle a classic LeaveGroupRequest to a ConsumerGroup.
*
* @param group The ConsumerGroup.
* @param context The request context.
* @param request The actual LeaveGroup request.
*
* @return The LeaveGroup response and the records to append.
*/
private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeaveToConsumerGroup(
ConsumerGroup group,
RequestContext context,
LeaveGroupRequestData request
) throws UnknownMemberIdException {
String groupId = group.groupId();
List<MemberResponse> memberResponses = new ArrayList<>();
Set<ConsumerGroupMember> validLeaveGroupMembers = new HashSet<>();
List<CoordinatorRecord> records = new ArrayList<>();

for (MemberIdentity memberIdentity : request.members()) {
String memberId = memberIdentity.memberId();
String instanceId = memberIdentity.groupInstanceId();
String reason = memberIdentity.reason() != null ? memberIdentity.reason() : "not provided";

ConsumerGroupMember member;
try {
if (instanceId == null) {
member = group.getOrMaybeCreateMember(memberId, false);
throwIfMemberDoesNotUseClassicProtocol(member);

log.info("[Group {}] Dynamic Member {} has left group " +
"through explicit `LeaveGroup` request; client reason: {}",
groupId, memberId, reason);
} else {
member = group.staticMember(instanceId);
throwIfStaticMemberIsUnknown(member, instanceId);
// The LeaveGroup API allows administrative removal of members by GroupInstanceId
// in which case we expect the MemberId to be undefined.
if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
}
throwIfMemberDoesNotUseClassicProtocol(member);

memberId = member.memberId();
log.info("[Group {}] Static Member {} with instance id {} has left group " +
"through explicit `LeaveGroup` request; client reason: {}",
groupId, memberId, instanceId, reason);
}

removeMember(records, groupId, memberId);
cancelTimers(groupId, memberId);
memberResponses.add(
new MemberResponse()
.setMemberId(memberId)
.setGroupInstanceId(instanceId)
);
validLeaveGroupMembers.add(member);
} catch (KafkaException e) {
memberResponses.add(
new MemberResponse()
.setMemberId(memberId)
.setGroupInstanceId(instanceId)
.setErrorCode(Errors.forException(e).code())
);
}
}

if (!records.isEmpty()) {
// Maybe update the subscription metadata.
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
group.computeSubscribedTopicNames(validLeaveGroupMembers),
metadataImage.topics(),
metadataImage.cluster()
);

if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
log.info("[GroupId {}] Computed new subscription metadata: {}.",
group.groupId(), subscriptionMetadata);
records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata));
}

// Bump the group epoch.
records.add(newGroupEpochRecord(groupId, group.groupEpoch() + 1));
}

return new CoordinatorResult<>(records, new LeaveGroupResponseData().setMembers(memberResponses));
}

/**
* Handle a classic LeaveGroupRequest to a ClassicGroup.
*
* @param group The ClassicGroup.
* @param context The request context.
* @param request The actual LeaveGroup request.
*
* @return The LeaveGroup response and the GroupMetadata record to append if the group
* no longer has any members.
*/
public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeave(
private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeaveToClassicGroup(
ClassicGroup group,
RequestContext context,
LeaveGroupRequestData request
) throws UnknownMemberIdException, GroupIdNotFoundException {
ClassicGroup group = getOrMaybeCreateClassicGroup(request.groupId(), false);
) throws UnknownMemberIdException {
if (group.isInState(DEAD)) {
return new CoordinatorResult<>(
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,29 @@ public Map<String, Integer> computeSubscribedTopicNames(
return subscribedTopicNames;
}

/**
* Updates the subscription count with a set of members removed.
*
* @param removedMembers The set of removed members.
*
* @return Copy of the map of topics to the count of number of subscribers.
*/
public Map<String, Integer> computeSubscribedTopicNames(
Set<ConsumerGroupMember> removedMembers
) {
Map<String, Integer> subscribedTopicNames = new HashMap<>(this.subscribedTopicNames);
if (removedMembers != null) {
removedMembers.forEach(removedMember ->
maybeUpdateSubscribedTopicNames(
subscribedTopicNames,
removedMember,
null
)
);
}
return subscribedTopicNames;
}

/**
* Compute the subscription type of the consumer group.
*
Expand Down
Loading

0 comments on commit eefd114

Please sign in to comment.