Skip to content

Commit

Permalink
KAFKA-16626; Lazily convert subscribed topic names to topic ids (apac…
Browse files Browse the repository at this point in the history
…he#15970)

This patch aims to remove the data structure that stores the conversion from topic names to topic ids which was taking time similar to the actual assignment computation. Instead, we reuse the already existing ConsumerGroupMember.subscribedTopicNames() and do the conversion to topic ids when the iterator is requested.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
jeffkbkim authored May 24, 2024
1 parent 6941598 commit 520aa86
Show file tree
Hide file tree
Showing 15 changed files with 549 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public static CoordinatorRecord newMemberSubscriptionRecord(
String groupId,
ConsumerGroupMember member
) {
List<String> topicNames = new ArrayList<>(member.subscribedTopicNames());
Collections.sort(topicNames);
return new CoordinatorRecord(
new ApiMessageAndVersion(
new ConsumerGroupMemberMetadataKey()
Expand All @@ -78,7 +80,7 @@ public static CoordinatorRecord newMemberSubscriptionRecord(
.setInstanceId(member.instanceId())
.setClientId(member.clientId())
.setClientHost(member.clientHost())
.setSubscribedTopicNames(member.subscribedTopicNames())
.setSubscribedTopicNames(topicNames)
.setSubscribedTopicRegex(member.subscribedTopicRegex())
.setServerAssignor(member.serverAssignorName().orElse(null))
.setRebalanceTimeoutMs(member.rebalanceTimeoutMs())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1901,6 +1901,7 @@ private Assignment updateTargetAssignment(
.withSubscriptionMetadata(subscriptionMetadata)
.withSubscriptionType(subscriptionType)
.withTargetAssignment(group.targetAssignment())
.withTopicsImage(metadataImage.topics())
.addOrUpdateMember(updatedMember.memberId(), updatedMember);
TargetAssignmentBuilder.TargetAssignmentResult assignmentResult;
// A new static member is replacing an older one with the same subscriptions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.common.Uuid;

import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -41,7 +40,7 @@ public class AssignmentMemberSpec {
/**
* Topics Ids that the member is subscribed to.
*/
private final Collection<Uuid> subscribedTopicIds;
private final Set<Uuid> subscribedTopicIds;

/**
* Partitions assigned keyed by topicId.
Expand All @@ -63,9 +62,9 @@ public Optional<String> rackId() {
}

/**
* @return Collection of subscribed topic Ids.
* @return Set of subscribed topic Ids.
*/
public Collection<Uuid> subscribedTopicIds() {
public Set<Uuid> subscribedTopicIds() {
return subscribedTopicIds;
}

Expand All @@ -79,7 +78,7 @@ public Map<Uuid, Set<Integer>> assignedPartitions() {
public AssignmentMemberSpec(
Optional<String> instanceId,
Optional<String> rackId,
Collection<Uuid> subscribedTopicIds,
Set<Uuid> subscribedTopicIds,
Map<Uuid, Set<Integer>> assignedPartitions
) {
Objects.requireNonNull(instanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -60,7 +59,7 @@ public static class Builder {
private int rebalanceTimeoutMs = -1;
private String clientId = "";
private String clientHost = "";
private List<String> subscribedTopicNames = Collections.emptyList();
private Set<String> subscribedTopicNames = Collections.emptySet();
private String subscribedTopicRegex = "";
private String serverAssignorName = null;
private Map<Uuid, Set<Integer>> assignedPartitions = Collections.emptyMap();
Expand Down Expand Up @@ -148,15 +147,13 @@ public Builder setClientHost(String clientHost) {
return this;
}

public Builder setSubscribedTopicNames(List<String> subscribedTopicNames) {
this.subscribedTopicNames = subscribedTopicNames;
this.subscribedTopicNames.sort(Comparator.naturalOrder());
public Builder setSubscribedTopicNames(List<String> subscribedTopicList) {
if (subscribedTopicNames != null) this.subscribedTopicNames = new HashSet<>(subscribedTopicList);
return this;
}

public Builder maybeUpdateSubscribedTopicNames(Optional<List<String>> subscribedTopicNames) {
this.subscribedTopicNames = subscribedTopicNames.orElse(this.subscribedTopicNames);
this.subscribedTopicNames.sort(Comparator.naturalOrder());
public Builder maybeUpdateSubscribedTopicNames(Optional<List<String>> subscribedTopicList) {
subscribedTopicList.ifPresent(list -> this.subscribedTopicNames = new HashSet<>(list));
return this;
}

Expand Down Expand Up @@ -299,7 +296,7 @@ public ConsumerGroupMember build() {
/**
* The list of subscriptions (topic names) configured by the member.
*/
private final List<String> subscribedTopicNames;
private final Set<String> subscribedTopicNames;

/**
* The subscription pattern configured by the member.
Expand Down Expand Up @@ -335,7 +332,7 @@ private ConsumerGroupMember(
int rebalanceTimeoutMs,
String clientId,
String clientHost,
List<String> subscribedTopicNames,
Set<String> subscribedTopicNames,
String subscribedTopicRegex,
String serverAssignorName,
MemberState state,
Expand Down Expand Up @@ -419,7 +416,7 @@ public String clientHost() {
/**
* @return The list of subscribed topic names.
*/
public List<String> subscribedTopicNames() {
public Set<String> subscribedTopicNames() {
return subscribedTopicNames;
}

Expand Down Expand Up @@ -533,7 +530,7 @@ public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember(
.setClientId(clientId)
.setInstanceId(instanceId)
.setRackId(rackId)
.setSubscribedTopicNames(subscribedTopicNames)
.setSubscribedTopicNames(subscribedTopicNames == null ? null : new ArrayList<>(subscribedTopicNames))
.setSubscribedTopicRegex(subscribedTopicRegex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,15 @@
import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.image.TopicsImage;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newTargetAssignmentEpochRecord;
import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newTargetAssignmentRecord;
Expand Down Expand Up @@ -127,6 +126,11 @@ public Map<String, Assignment> targetAssignment() {
*/
private Map<String, Assignment> targetAssignment = Collections.emptyMap();

/**
* The topics image.
*/
private TopicsImage topicsImage = TopicsImage.EMPTY;

/**
* The members which have been updated or deleted. Deleted members
* are signaled by a null value.
Expand Down Expand Up @@ -220,6 +224,19 @@ public TargetAssignmentBuilder withTargetAssignment(
return this;
}

/**
* Adds the topics image.
*
* @param topicsImage The topics image.
* @return This object.
*/
public TargetAssignmentBuilder withTopicsImage(
TopicsImage topicsImage
) {
this.topicsImage = topicsImage;
return this;
}

/**
* Adds or updates a member. This is useful when the updated member is
* not yet materialized in memory.
Expand Down Expand Up @@ -263,7 +280,7 @@ public TargetAssignmentResult build() throws PartitionAssignorException {
members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec(
member,
targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
subscriptionMetadata
topicsImage
)));

// Update the member spec if updated or deleted members.
Expand All @@ -284,7 +301,7 @@ public TargetAssignmentResult build() throws PartitionAssignorException {
memberSpecs.put(memberId, createAssignmentMemberSpec(
updatedMemberOrNull,
assignment,
subscriptionMetadata
topicsImage
));
}
});
Expand Down Expand Up @@ -353,23 +370,15 @@ private Assignment newMemberAssignment(
}
}

public static AssignmentMemberSpec createAssignmentMemberSpec(
static AssignmentMemberSpec createAssignmentMemberSpec(
ConsumerGroupMember member,
Assignment targetAssignment,
Map<String, TopicMetadata> subscriptionMetadata
TopicsImage topicsImage
) {
Set<Uuid> subscribedTopics = new HashSet<>();
member.subscribedTopicNames().forEach(topicName -> {
TopicMetadata topicMetadata = subscriptionMetadata.get(topicName);
if (topicMetadata != null) {
subscribedTopics.add(topicMetadata.id());
}
});

return new AssignmentMemberSpec(
Optional.ofNullable(member.instanceId()),
Optional.ofNullable(member.rackId()),
subscribedTopics,
new TopicIds(member.subscribedTopicNames(), topicsImage),
targetAssignment.partitions()
);
}
Expand Down
Loading

0 comments on commit 520aa86

Please sign in to comment.