Skip to content

Commit

Permalink
KAFKA-17593; [11/11] Update subscription type (apache#18020)
Browse files Browse the repository at this point in the history
This is the last patch in the series which introduces regular expressions in the new consumer group protocol. The patch ensures that the subscription type of the group takes into account the regular expressions. Please refer to the code to see how they are included.

Reviewers: Sean Quah <[email protected]>, Jeff Kim <[email protected]>
  • Loading branch information
dajac authored Dec 6, 2024
1 parent 0bbed82 commit b7294d9
Show file tree
Hide file tree
Showing 8 changed files with 538 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
public enum SubscriptionType {
/**
* A homogeneous subscription type means that all the members
* of the group are subscribed to the same set of topics.
* of the group use the same subscription.
*/
HOMOGENEOUS("Homogeneous"),
/**
* A heterogeneous subscription type means that not all the members
* of the group are subscribed to the same set of topics.
* of the group use the same subscription.
*/
HETEROGENEOUS("Heterogeneous");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.ModernGroup;
import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
Expand Down Expand Up @@ -199,6 +200,22 @@
public class GroupMetadataManager {
private static final int METADATA_REFRESH_INTERVAL_MS = Integer.MAX_VALUE;

private static class UpdateSubscriptionMetadataResult {
private final int groupEpoch;
private final Map<String, TopicMetadata> subscriptionMetadata;
private final SubscriptionType subscriptionType;

UpdateSubscriptionMetadataResult(
int groupEpoch,
Map<String, TopicMetadata> subscriptionMetadata,
SubscriptionType subscriptionType
) {
this.groupEpoch = groupEpoch;
this.subscriptionMetadata = Objects.requireNonNull(subscriptionMetadata);
this.subscriptionType = Objects.requireNonNull(subscriptionType);
}
}

public static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
Expand Down Expand Up @@ -1698,43 +1715,17 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
// The subscription metadata is updated in two cases:
// 1) The member has updated its subscriptions;
// 2) The refresh deadline has been reached.
Map<String, Integer> subscribedTopicNamesMap = group.computeSubscribedTopicNames(
UpdateSubscriptionMetadataResult result = updateSubscriptionMetadata(
group,
bumpGroupEpoch,
member,
updatedMember
);
subscriptionMetadata = group.computeSubscriptionMetadata(
subscribedTopicNamesMap,
metadataImage.topics(),
metadataImage.cluster()
);

int numMembers = group.numMembers();
if (!group.hasMember(updatedMember.memberId()) && !group.hasStaticMember(updatedMember.instanceId())) {
numMembers++;
}

subscriptionType = ModernGroup.subscriptionType(
subscribedTopicNamesMap,
numMembers
updatedMember,
records
);

if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
if (log.isDebugEnabled()) {
log.debug("[GroupId {}] Computed new subscription metadata: {}.",
groupId, subscriptionMetadata);
}
bumpGroupEpoch = true;
records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
}

if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
}

group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);
groupEpoch = result.groupEpoch;
subscriptionMetadata = result.subscriptionMetadata;
subscriptionType = result.subscriptionType;
}

// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The delta between
Expand Down Expand Up @@ -1860,7 +1851,6 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro

int groupEpoch = group.groupEpoch();
Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();
Map<String, Integer> subscribedTopicNamesMap = group.subscribedTopicNames();
SubscriptionType subscriptionType = group.subscriptionType();
final ConsumerProtocolSubscription subscription = deserializeSubscription(protocols);

Expand Down Expand Up @@ -1894,40 +1884,17 @@ private CoordinatorResult<Void, CoordinatorRecord> classicGroupJoinToConsumerGro
// The subscription metadata is updated in two cases:
// 1) The member has updated its subscriptions;
// 2) The refresh deadline has been reached.
subscribedTopicNamesMap = group.computeSubscribedTopicNames(member, updatedMember);
subscriptionMetadata = group.computeSubscriptionMetadata(
subscribedTopicNamesMap,
metadataImage.topics(),
metadataImage.cluster()
);

int numMembers = group.numMembers();
if (!group.hasMember(updatedMember.memberId()) && !group.hasStaticMember(updatedMember.instanceId())) {
numMembers++;
}

subscriptionType = ConsumerGroup.subscriptionType(
subscribedTopicNamesMap,
numMembers
UpdateSubscriptionMetadataResult result = updateSubscriptionMetadata(
group,
bumpGroupEpoch,
member,
updatedMember,
records
);

if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
if (log.isDebugEnabled()) {
log.debug("[GroupId {}] Computed new subscription metadata: {}.",
groupId, subscriptionMetadata);
}
bumpGroupEpoch = true;
records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
}

if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
}

group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);
groupEpoch = result.groupEpoch;
subscriptionMetadata = result.subscriptionMetadata;
subscriptionType = result.subscriptionType;
}

// 2. Update the target assignment if the group epoch is larger than the target assignment epoch. The delta between
Expand Down Expand Up @@ -2073,7 +2040,7 @@ private CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> sh
// The subscription metadata is updated in two cases:
// 1) The member has updated its subscriptions;
// 2) The refresh deadline has been reached.
Map<String, Integer> subscribedTopicNamesMap = group.computeSubscribedTopicNames(member, updatedMember);
Map<String, SubscriptionCount> subscribedTopicNamesMap = group.computeSubscribedTopicNames(member, updatedMember);
subscriptionMetadata = group.computeSubscriptionMetadata(
subscribedTopicNamesMap,
metadataImage.topics(),
Expand Down Expand Up @@ -2508,7 +2475,7 @@ private CoordinatorResult<Void, CoordinatorRecord> handleRegularExpressionsResul
List<CoordinatorRecord> records = new ArrayList<>();
try {
ConsumerGroup group = consumerGroup(groupId);
Map<String, Integer> subscribedTopicNames = new HashMap<>(group.subscribedTopicNames());
Map<String, SubscriptionCount> subscribedTopicNames = new HashMap<>(group.subscribedTopicNames());

boolean bumpGroupEpoch = false;
for (Map.Entry<String, ResolvedRegularExpression> entry : resolvedRegularExpressions.entrySet()) {
Expand All @@ -2527,11 +2494,11 @@ private CoordinatorResult<Void, CoordinatorRecord> handleRegularExpressionsResul
bumpGroupEpoch = true;

oldResolvedRegularExpression.topics.forEach(topicName ->
subscribedTopicNames.compute(topicName, Utils::decValue)
subscribedTopicNames.compute(topicName, SubscriptionCount::decRegexCount)
);

newResolvedRegularExpression.topics.forEach(topicName ->
subscribedTopicNames.compute(topicName, Utils::incValue)
subscribedTopicNames.compute(topicName, SubscriptionCount::incRegexCount)
);
}

Expand Down Expand Up @@ -2709,6 +2676,77 @@ private ShareGroupMember maybeReconcile(
return updatedMember;
}

/**
* Updates the subscription metadata and bumps the group epoch if needed.
*
* @param group The consumer group.
* @param bumpGroupEpoch Whether the group epoch must be bumped.
* @param member The old member.
* @param updatedMember The new member.
* @param records The record accumulator.
* @return The result of the update.
*/
private UpdateSubscriptionMetadataResult updateSubscriptionMetadata(
ConsumerGroup group,
boolean bumpGroupEpoch,
ConsumerGroupMember member,
ConsumerGroupMember updatedMember,
List<CoordinatorRecord> records
) {
final long currentTimeMs = time.milliseconds();
final String groupId = group.groupId();
int groupEpoch = group.groupEpoch();

Map<String, Integer> subscribedRegularExpressions = group.computeSubscribedRegularExpressions(
member,
updatedMember
);
Map<String, SubscriptionCount> subscribedTopicNamesMap = group.computeSubscribedTopicNames(
member,
updatedMember
);
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
subscribedTopicNamesMap,
metadataImage.topics(),
metadataImage.cluster()
);

int numMembers = group.numMembers();
if (!group.hasMember(updatedMember.memberId()) && !group.hasStaticMember(updatedMember.instanceId())) {
numMembers++;
}

SubscriptionType subscriptionType = ConsumerGroup.subscriptionType(
subscribedRegularExpressions,
subscribedTopicNamesMap,
numMembers
);

if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
if (log.isDebugEnabled()) {
log.debug("[GroupId {}] Computed new subscription metadata: {}.",
groupId, subscriptionMetadata);
}
bumpGroupEpoch = true;
records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
}

if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
}

group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);

return new UpdateSubscriptionMetadataResult(
groupEpoch,
subscriptionMetadata,
subscriptionType
);
}

/**
* Updates the target assignment according to the updated member and subscription metadata.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.Utils;
import org.apache.kafka.coordinator.group.api.assignor.SubscriptionType;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicImage;
Expand Down Expand Up @@ -82,7 +81,7 @@ public static class DeadlineAndEpoch {
/**
* The number of subscribers or regular expressions per topic.
*/
protected final TimelineHashMap<String, Integer> subscribedTopicNames;
protected final TimelineHashMap<String, SubscriptionCount> subscribedTopicNames;

/**
* The metadata associated with each subscribed topic name.
Expand Down Expand Up @@ -221,7 +220,7 @@ public Map<String, T> members() {
* @return An immutable map containing all the subscribed topic names
* with the subscribers counts per topic.
*/
public Map<String, Integer> subscribedTopicNames() {
public Map<String, SubscriptionCount> subscribedTopicNames() {
return Collections.unmodifiableMap(subscribedTopicNames);
}

Expand Down Expand Up @@ -378,7 +377,7 @@ public void setSubscriptionMetadata(
* @return An immutable map of subscription metadata for each topic that the consumer group is subscribed to.
*/
public Map<String, TopicMetadata> computeSubscriptionMetadata(
Map<String, Integer> subscribedTopicNames,
Map<String, SubscriptionCount> subscribedTopicNames,
TopicsImage topicsImage,
ClusterImage clusterImage
) {
Expand Down Expand Up @@ -440,19 +439,24 @@ public DeadlineAndEpoch metadataRefreshDeadline() {
return metadataRefreshDeadline;
}

/**
* Updates the subscription type.
*/
protected void maybeUpdateGroupSubscriptionType() {
subscriptionType.set(subscriptionType(subscribedTopicNames, members.size()));
}

/**
* Updates the subscribed topic names count.
* The subscription type is updated as a consequence.
*
* @param oldMember The old member.
* @param newMember The new member.
*/
protected void maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(
protected void maybeUpdateSubscribedTopicNames(
ModernGroupMember oldMember,
ModernGroupMember newMember
) {
maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, newMember);
subscriptionType.set(subscriptionType(subscribedTopicNames, members.size()));
}

/**
Expand All @@ -463,19 +467,19 @@ protected void maybeUpdateSubscribedTopicNamesAndGroupSubscriptionType(
* @param newMember The new member.
*/
private static void maybeUpdateSubscribedTopicNames(
Map<String, Integer> subscribedTopicCount,
Map<String, SubscriptionCount> subscribedTopicCount,
ModernGroupMember oldMember,
ModernGroupMember newMember
) {
if (oldMember != null) {
oldMember.subscribedTopicNames().forEach(topicName ->
subscribedTopicCount.compute(topicName, Utils::decValue)
subscribedTopicCount.compute(topicName, SubscriptionCount::decNameCount)
);
}

if (newMember != null) {
newMember.subscribedTopicNames().forEach(topicName ->
subscribedTopicCount.compute(topicName, Utils::incValue)
subscribedTopicCount.compute(topicName, SubscriptionCount::incNameCount)
);
}
}
Expand All @@ -488,11 +492,11 @@ private static void maybeUpdateSubscribedTopicNames(
*
* @return Copy of the map of topics to the count of number of subscribers.
*/
public Map<String, Integer> computeSubscribedTopicNames(
public Map<String, SubscriptionCount> computeSubscribedTopicNames(
ModernGroupMember oldMember,
ModernGroupMember newMember
) {
Map<String, Integer> subscribedTopicNames = new HashMap<>(this.subscribedTopicNames);
Map<String, SubscriptionCount> subscribedTopicNames = new HashMap<>(this.subscribedTopicNames);
maybeUpdateSubscribedTopicNames(
subscribedTopicNames,
oldMember,
Expand All @@ -508,10 +512,10 @@ public Map<String, Integer> computeSubscribedTopicNames(
*
* @return Copy of the map of topics to the count of number of subscribers.
*/
public Map<String, Integer> computeSubscribedTopicNames(
public Map<String, SubscriptionCount> computeSubscribedTopicNames(
Set<? extends ModernGroupMember> removedMembers
) {
Map<String, Integer> subscribedTopicNames = new HashMap<>(this.subscribedTopicNames);
Map<String, SubscriptionCount> subscribedTopicNames = new HashMap<>(this.subscribedTopicNames);
if (removedMembers != null) {
removedMembers.forEach(removedMember ->
maybeUpdateSubscribedTopicNames(
Expand All @@ -533,15 +537,15 @@ public Map<String, Integer> computeSubscribedTopicNames(
* otherwise, {@link SubscriptionType#HETEROGENEOUS}.
*/
public static SubscriptionType subscriptionType(
Map<String, Integer> subscribedTopicNames,
Map<String, SubscriptionCount> subscribedTopicNames,
int numberOfMembers
) {
if (subscribedTopicNames.isEmpty()) {
return HOMOGENEOUS;
}

for (int subscriberCount : subscribedTopicNames.values()) {
if (subscriberCount != numberOfMembers) {
for (SubscriptionCount subscriberCount : subscribedTopicNames.values()) {
if (subscriberCount.byNameCount != numberOfMembers) {
return HETEROGENEOUS;
}
}
Expand Down
Loading

0 comments on commit b7294d9

Please sign in to comment.