Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16626: Lazily convert subscribed topic names to topic ids #15970

Merged
merged 5 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public static CoordinatorRecord newMemberSubscriptionRecord(
String groupId,
ConsumerGroupMember member
) {
List<String> topicNames = new ArrayList<>(member.subscribedTopicNames());
Collections.sort(topicNames);
return new CoordinatorRecord(
new ApiMessageAndVersion(
new ConsumerGroupMemberMetadataKey()
Expand All @@ -78,7 +80,7 @@ public static CoordinatorRecord newMemberSubscriptionRecord(
.setInstanceId(member.instanceId())
.setClientId(member.clientId())
.setClientHost(member.clientHost())
.setSubscribedTopicNames(member.subscribedTopicNames())
.setSubscribedTopicNames(topicNames)
.setSubscribedTopicRegex(member.subscribedTopicRegex())
.setServerAssignor(member.serverAssignorName().orElse(null))
.setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1899,6 +1899,7 @@ private Assignment updateTargetAssignment(
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(group.targetAssignment())
.withTopicsImage(metadataImage.topics())
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
TargetAssignmentBuilder.TargetAssignmentResult assignmentResult;
// A new static member is replacing an older one with the same subscriptions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.common.Uuid;

import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -41,7 +40,7 @@ public class AssignmentMemberSpec {
/**
* Topics Ids that the member is subscribed to.
*/
private final Collection<Uuid> subscribedTopicIds;
private final Set<Uuid> subscribedTopicIds;

/**
* Partitions assigned keyed by topicId.
Expand All @@ -63,9 +62,9 @@ public Optional<String> rackId() {
}

/**
* @return Collection of subscribed topic Ids.
* @return Set of subscribed topic Ids.
*/
public Collection<Uuid> subscribedTopicIds() {
public Set<Uuid> subscribedTopicIds() {
return subscribedTopicIds;
}

Expand All @@ -79,7 +78,7 @@ public Map<Uuid, Set<Integer>> assignedPartitions() {
public AssignmentMemberSpec(
Optional<String> instanceId,
Optional<String> rackId,
Collection<Uuid> subscribedTopicIds,
Set<Uuid> subscribedTopicIds,
Map<Uuid, Set<Integer>> assignedPartitions
) {
Objects.requireNonNull(instanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -60,7 +59,7 @@ public static class Builder {
private int rebalanceTimeoutMs = -1;
private String clientId = "";
private String clientHost = "";
private List<String> subscribedTopicNames = Collections.emptyList();
private Set<String> subscribedTopicNames = Collections.emptySet();
private String subscribedTopicRegex = "";
private String serverAssignorName = null;
private Map<Uuid, Set<Integer>> assignedPartitions = Collections.emptyMap();
Expand Down Expand Up @@ -148,15 +147,13 @@ public Builder setClientHost(String clientHost) {
return this;
}

public Builder setSubscribedTopicNames(List<String> subscribedTopicNames) {
this.subscribedTopicNames = subscribedTopicNames;
this.subscribedTopicNames.sort(Comparator.naturalOrder());
public Builder setSubscribedTopicNames(List<String> subscribedTopicList) {
if (subscribedTopicNames != null) this.subscribedTopicNames = new HashSet<>(subscribedTopicList);
return this;
}

public Builder maybeUpdateSubscribedTopicNames(Optional<List<String>> subscribedTopicNames) {
this.subscribedTopicNames = subscribedTopicNames.orElse(this.subscribedTopicNames);
this.subscribedTopicNames.sort(Comparator.naturalOrder());
public Builder maybeUpdateSubscribedTopicNames(Optional<List<String>> subscribedTopicList) {
subscribedTopicList.ifPresent(list -> this.subscribedTopicNames = new HashSet<>(list));
return this;
}

Expand Down Expand Up @@ -299,7 +296,7 @@ public ConsumerGroupMember build() {
/**
* The list of subscriptions (topic names) configured by the member.
*/
private final List<String> subscribedTopicNames;
private final Set<String> subscribedTopicNames;

/**
* The subscription pattern configured by the member.
Expand Down Expand Up @@ -335,7 +332,7 @@ private ConsumerGroupMember(
int rebalanceTimeoutMs,
String clientId,
String clientHost,
List<String> subscribedTopicNames,
Set<String> subscribedTopicNames,
String subscribedTopicRegex,
String serverAssignorName,
MemberState state,
Expand Down Expand Up @@ -419,7 +416,7 @@ public String clientHost() {
/**
* @return The list of subscribed topic names.
*/
public List<String> subscribedTopicNames() {
public Set<String> subscribedTopicNames() {
return subscribedTopicNames;
}

Expand Down Expand Up @@ -533,7 +530,7 @@ public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember(
.setClientId(clientId)
.setInstanceId(instanceId)
.setRackId(rackId)
.setSubscribedTopicNames(subscribedTopicNames)
.setSubscribedTopicNames(subscribedTopicNames == null ? null : new ArrayList<>(subscribedTopicNames))
.setSubscribedTopicRegex(subscribedTopicRegex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.image.TopicsImage;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newTargetAssignmentEpochRecord;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newTargetAssignmentRecord;
Expand Down Expand Up @@ -127,6 +126,11 @@ public Map<String, Assignment> targetAssignment() {
*/
private Map<String, Assignment> targetAssignment = Collections.emptyMap();

/**
* The topics image.
*/
private TopicsImage topicsImage = TopicsImage.EMPTY;

/**
* The members which have been updated or deleted. Deleted members
* are signaled by a null value.
Expand Down Expand Up @@ -220,6 +224,19 @@ public TargetAssignmentBuilder withTargetAssignment(
return this;
}

/**
* Adds the topics image.
*
* @param topicsImage The topics image.
* @return This object.
*/
public TargetAssignmentBuilder withTopicsImage(
TopicsImage topicsImage
) {
this.topicsImage = topicsImage;
return this;
}

/**
* Adds or updates a member. This is useful when the updated member is
* not yet materialized in memory.
Expand Down Expand Up @@ -263,7 +280,7 @@ public TargetAssignmentResult build() throws PartitionAssignorException {
members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
subscriptionMetadata
topicsImage
)));

// Update the member spec if updated or deleted members.
Expand All @@ -284,7 +301,7 @@ public TargetAssignmentResult build() throws PartitionAssignorException {
memberSpecs.put(memberId, createAssignmentMemberSpec(
updatedMemberOrNull,
assignment,
subscriptionMetadata
topicsImage
));
}
});
Expand Down Expand Up @@ -353,23 +370,15 @@ private Assignment newMemberAssignment(
}
}

public static AssignmentMemberSpec createAssignmentMemberSpec(
static AssignmentMemberSpec createAssignmentMemberSpec(
ConsumerGroupMember member,
Assignment targetAssignment,
Map<String, TopicMetadata> subscriptionMetadata
TopicsImage topicsImage
) {
Set<Uuid> subscribedTopics = new HashSet<>();
member.subscribedTopicNames().forEach(topicName -> {
TopicMetadata topicMetadata = subscriptionMetadata.get(topicName);
if (topicMetadata != null) {
subscribedTopics.add(topicMetadata.id());
}
});

return new AssignmentMemberSpec(
Optional.ofNullable(member.instanceId()),
Optional.ofNullable(member.rackId()),
subscribedTopics,
new TopicIds(member.subscribedTopicNames(), topicsImage),
targetAssignment.partitions()
);
}
Expand Down
Loading