Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16832: LeaveGroup API for upgrading ConsumerGroup #16057

Merged
merged 19 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@
<suppress checks="ClassDataAbstractionCouplingCheck"
files="(RecordHelpersTest|GroupMetadataManager|GroupMetadataManagerTest|OffsetMetadataManagerTest|GroupCoordinatorServiceTest|GroupCoordinatorShardTest).java"/>
<suppress checks="JavaNCSS"
files="GroupMetadataManagerTest.java"/>
files="(GroupMetadataManager|GroupMetadataManagerTest).java"/>

<!-- storage -->
<suppress checks="CyclomaticComplexity"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2065,6 +2065,7 @@ private void removeMember(List<CoordinatorRecord> records, String groupId, Strin
private void cancelTimers(String groupId, String memberId) {
cancelConsumerGroupSessionTimeout(groupId, memberId);
cancelConsumerGroupRebalanceTimeout(groupId, memberId);
cancelConsumerGroupJoinTimeout(groupId, memberId);
cancelConsumerGroupSyncTimeout(groupId, memberId);
}

Expand Down Expand Up @@ -3985,10 +3986,17 @@ public CoordinatorResult<Void, CoordinatorRecord> classicGroupSync(
RequestContext context,
SyncGroupRequestData request,
CompletableFuture<SyncGroupResponseData> responseFuture
) throws UnknownMemberIdException, GroupIdNotFoundException {
Group group = groups.get(request.groupId(), Long.MAX_VALUE);
) throws UnknownMemberIdException {
Group group;
try {
group = group(request.groupId());
} catch (GroupIdNotFoundException e) {
responseFuture.complete(new SyncGroupResponseData()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
return EMPTY_RESULT;
}

if (group == null || group.isEmpty()) {
if (group.isEmpty()) {
responseFuture.complete(new SyncGroupResponseData()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
return EMPTY_RESULT;
Expand Down Expand Up @@ -4250,9 +4258,10 @@ public CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> classicGroupH
RequestContext context,
HeartbeatRequestData request
) {
Group group = groups.get(request.groupId(), Long.MAX_VALUE);

if (group == null) {
Group group;
try {
group = group(request.groupId());
} catch (GroupIdNotFoundException e) {
throw new UnknownMemberIdException(
String.format("Group %s not found.", request.groupId())
);
Expand Down Expand Up @@ -4426,14 +4435,129 @@ private ConsumerGroupMember validateConsumerGroupMember(
* @param context The request context.
* @param request The actual LeaveGroup request.
*
* @return The LeaveGroup response and the records to append.
*/
public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeave(
RequestContext context,
LeaveGroupRequestData request
) throws UnknownMemberIdException {
Group group;
try {
group = group(request.groupId());
} catch (GroupIdNotFoundException e) {
throw new UnknownMemberIdException(String.format("Group %s not found.", request.groupId()));
}

if (group.type() == CLASSIC) {
return classicGroupLeaveToClassicGroup((ClassicGroup) group, context, request);
} else {
return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, context, request);
}
}

/**
* Handle a classic LeaveGroupRequest to a ConsumerGroup.
*
* @param group The ConsumerGroup.
* @param context The request context.
* @param request The actual LeaveGroup request.
*
* @return The LeaveGroup response and the records to append.
*/
private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeaveToConsumerGroup(
ConsumerGroup group,
RequestContext context,
LeaveGroupRequestData request
) throws UnknownMemberIdException {
String groupId = group.groupId();
List<MemberResponse> memberResponses = new ArrayList<>();
Set<ConsumerGroupMember> validLeaveGroupMembers = new HashSet<>();
List<CoordinatorRecord> records = new ArrayList<>();

for (MemberIdentity memberIdentity : request.members()) {
String memberId = memberIdentity.memberId();
String instanceId = memberIdentity.groupInstanceId();
String reason = memberIdentity.reason() != null ? memberIdentity.reason() : "not provided";

ConsumerGroupMember member;
try {
if (instanceId == null) {
member = group.getOrMaybeCreateMember(memberId, false);
throwIfMemberDoesNotUseClassicProtocol(member);

log.info("[Group {}] Dynamic Member {} has left group " +
"through explicit `LeaveGroup` request; client reason: {}",
groupId, memberId, reason);
} else {
member = group.staticMember(instanceId);
throwIfStaticMemberIsUnknown(member, instanceId);
// The LeaveGroup API allows administrative removal of members by GroupInstanceId
// in which case we expect the MemberId to be undefined.
if (!UNKNOWN_MEMBER_ID.equals(memberId)) {
throwIfInstanceIdIsFenced(member, groupId, memberId, instanceId);
}
throwIfMemberDoesNotUseClassicProtocol(member);

memberId = member.memberId();
log.info("[Group {}] Static Member {} with instance id {} has left group " +
"through explicit `LeaveGroup` request; client reason: {}",
groupId, memberId, instanceId, reason);
}

removeMember(records, groupId, memberId);
cancelTimers(groupId, memberId);
memberResponses.add(
new MemberResponse()
.setMemberId(memberId)
.setGroupInstanceId(instanceId)
);
validLeaveGroupMembers.add(member);
} catch (KafkaException e) {
memberResponses.add(
new MemberResponse()
.setMemberId(memberId)
.setGroupInstanceId(instanceId)
.setErrorCode(Errors.forException(e).code())
);
}
}

if (!records.isEmpty()) {
// Maybe update the subscription metadata.
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
group.computeSubscribedTopicNames(validLeaveGroupMembers),
metadataImage.topics(),
metadataImage.cluster()
);

if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
log.info("[GroupId {}] Computed new subscription metadata: {}.",
group.groupId(), subscriptionMetadata);
records.add(newGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata));
}

// Bump the group epoch.
records.add(newGroupEpochRecord(groupId, group.groupEpoch() + 1));
}

return new CoordinatorResult<>(records, new LeaveGroupResponseData().setMembers(memberResponses));
}

/**
* Handle a classic LeaveGroupRequest to a ClassicGroup.
*
* @param group The ClassicGroup.
* @param context The request context.
* @param request The actual LeaveGroup request.
*
* @return The LeaveGroup response and the GroupMetadata record to append if the group
* no longer has any members.
*/
public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeave(
private CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> classicGroupLeaveToClassicGroup(
ClassicGroup group,
RequestContext context,
LeaveGroupRequestData request
) throws UnknownMemberIdException, GroupIdNotFoundException {
ClassicGroup group = getOrMaybeCreateClassicGroup(request.groupId(), false);
) throws UnknownMemberIdException {
if (group.isInState(DEAD)) {
return new CoordinatorResult<>(
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,29 @@ public Map<String, Integer> computeSubscribedTopicNames(
return subscribedTopicNames;
}

/**
* Updates the subscription count with a set of members removed.
*
* @param removedMembers The set of removed members.
*
* @return Copy of the map of topics to the count of number of subscribers.
*/
public Map<String, Integer> computeSubscribedTopicNames(
Set<ConsumerGroupMember> removedMembers
) {
Map<String, Integer> subscribedTopicNames = new HashMap<>(this.subscribedTopicNames);
if (removedMembers != null) {
removedMembers.forEach(removedMember ->
maybeUpdateSubscribedTopicNames(
subscribedTopicNames,
removedMember,
null
)
);
}
return subscribedTopicNames;
}

/**
* Compute the subscription type of the consumer group.
*
Expand Down
Loading