diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpers.java index 4cc68229f042b..55766ac1a643a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpers.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpers.java @@ -65,6 +65,8 @@ public static CoordinatorRecord newMemberSubscriptionRecord( String groupId, ConsumerGroupMember member ) { + List topicNames = new ArrayList<>(member.subscribedTopicNames()); + Collections.sort(topicNames); return new CoordinatorRecord( new ApiMessageAndVersion( new ConsumerGroupMemberMetadataKey() @@ -78,7 +80,7 @@ public static CoordinatorRecord newMemberSubscriptionRecord( .setInstanceId(member.instanceId()) .setClientId(member.clientId()) .setClientHost(member.clientHost()) - .setSubscribedTopicNames(member.subscribedTopicNames()) + .setSubscribedTopicNames(topicNames) .setSubscribedTopicRegex(member.subscribedTopicRegex()) .setServerAssignor(member.serverAssignorName().orElse(null)) .setRebalanceTimeoutMs(member.rebalanceTimeoutMs()) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 5520676d21b1d..aa11c0ed844f5 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1899,6 +1899,7 @@ private Assignment updateTargetAssignment( .withSubscriptionMetadata(subscriptionMetadata) .withSubscriptionType(subscriptionType) .withTargetAssignment(group.targetAssignment()) + .withTopicsImage(metadataImage.topics()) .addOrUpdateMember(updatedMember.memberId(), updatedMember); TargetAssignmentBuilder.TargetAssignmentResult assignmentResult; // A new static member is replacing an older one with the same subscriptions. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentMemberSpec.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentMemberSpec.java index 4a93fccae35d5..2a91a111aa27b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentMemberSpec.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignmentMemberSpec.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.Uuid; -import java.util.Collection; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -41,7 +40,7 @@ public class AssignmentMemberSpec { /** * Topics Ids that the member is subscribed to. */ - private final Collection subscribedTopicIds; + private final Set subscribedTopicIds; /** * Partitions assigned keyed by topicId. @@ -63,9 +62,9 @@ public Optional rackId() { } /** - * @return Collection of subscribed topic Ids. + * @return Set of subscribed topic Ids. */ - public Collection subscribedTopicIds() { + public Set subscribedTopicIds() { return subscribedTopicIds; } @@ -79,7 +78,7 @@ public Map> assignedPartitions() { public AssignmentMemberSpec( Optional instanceId, Optional rackId, - Collection subscribedTopicIds, + Set subscribedTopicIds, Map> assignedPartitions ) { Objects.requireNonNull(instanceId); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java index 5f4ed7efc60fe..9846fe602680c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -60,7 +59,7 @@ public static class Builder { private int rebalanceTimeoutMs = -1; private String clientId = ""; private String clientHost = ""; - private List subscribedTopicNames = Collections.emptyList(); + private Set subscribedTopicNames = Collections.emptySet(); private String subscribedTopicRegex = ""; private String serverAssignorName = null; private Map> assignedPartitions = Collections.emptyMap(); @@ -148,15 +147,13 @@ public Builder setClientHost(String clientHost) { return this; } - public Builder setSubscribedTopicNames(List subscribedTopicNames) { - this.subscribedTopicNames = subscribedTopicNames; - this.subscribedTopicNames.sort(Comparator.naturalOrder()); + public Builder setSubscribedTopicNames(List subscribedTopicList) { + if (subscribedTopicNames != null) this.subscribedTopicNames = new HashSet<>(subscribedTopicList); return this; } - public Builder maybeUpdateSubscribedTopicNames(Optional> subscribedTopicNames) { - this.subscribedTopicNames = subscribedTopicNames.orElse(this.subscribedTopicNames); - this.subscribedTopicNames.sort(Comparator.naturalOrder()); + public Builder maybeUpdateSubscribedTopicNames(Optional> subscribedTopicList) { + subscribedTopicList.ifPresent(list -> this.subscribedTopicNames = new HashSet<>(list)); return this; } @@ -299,7 +296,7 @@ public ConsumerGroupMember build() { /** * The list of subscriptions (topic names) configured by the member. */ - private final List subscribedTopicNames; + private final Set subscribedTopicNames; /** * The subscription pattern configured by the member. @@ -335,7 +332,7 @@ private ConsumerGroupMember( int rebalanceTimeoutMs, String clientId, String clientHost, - List subscribedTopicNames, + Set subscribedTopicNames, String subscribedTopicRegex, String serverAssignorName, MemberState state, @@ -419,7 +416,7 @@ public String clientHost() { /** * @return The list of subscribed topic names. */ - public List subscribedTopicNames() { + public Set subscribedTopicNames() { return subscribedTopicNames; } @@ -533,7 +530,7 @@ public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember( .setClientId(clientId) .setInstanceId(instanceId) .setRackId(rackId) - .setSubscribedTopicNames(subscribedTopicNames) + .setSubscribedTopicNames(subscribedTopicNames == null ? null : new ArrayList<>(subscribedTopicNames)) .setSubscribedTopicRegex(subscribedTopicRegex); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java index 6cfe03b751629..09a44b17c834b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilder.java @@ -25,16 +25,15 @@ import org.apache.kafka.coordinator.group.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; +import org.apache.kafka.image.TopicsImage; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Set; import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newTargetAssignmentEpochRecord; import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newTargetAssignmentRecord; @@ -127,6 +126,11 @@ public Map targetAssignment() { */ private Map targetAssignment = Collections.emptyMap(); + /** + * The topics image. + */ + private TopicsImage topicsImage = TopicsImage.EMPTY; + /** * The members which have been updated or deleted. Deleted members * are signaled by a null value. @@ -220,6 +224,19 @@ public TargetAssignmentBuilder withTargetAssignment( return this; } + /** + * Adds the topics image. + * + * @param topicsImage The topics image. + * @return This object. + */ + public TargetAssignmentBuilder withTopicsImage( + TopicsImage topicsImage + ) { + this.topicsImage = topicsImage; + return this; + } + /** * Adds or updates a member. This is useful when the updated member is * not yet materialized in memory. @@ -263,7 +280,7 @@ public TargetAssignmentResult build() throws PartitionAssignorException { members.forEach((memberId, member) -> memberSpecs.put(memberId, createAssignmentMemberSpec( member, targetAssignment.getOrDefault(memberId, Assignment.EMPTY), - subscriptionMetadata + topicsImage ))); // Update the member spec if updated or deleted members. @@ -284,7 +301,7 @@ public TargetAssignmentResult build() throws PartitionAssignorException { memberSpecs.put(memberId, createAssignmentMemberSpec( updatedMemberOrNull, assignment, - subscriptionMetadata + topicsImage )); } }); @@ -353,23 +370,15 @@ private Assignment newMemberAssignment( } } - public static AssignmentMemberSpec createAssignmentMemberSpec( + static AssignmentMemberSpec createAssignmentMemberSpec( ConsumerGroupMember member, Assignment targetAssignment, - Map subscriptionMetadata + TopicsImage topicsImage ) { - Set subscribedTopics = new HashSet<>(); - member.subscribedTopicNames().forEach(topicName -> { - TopicMetadata topicMetadata = subscriptionMetadata.get(topicName); - if (topicMetadata != null) { - subscribedTopics.add(topicMetadata.id()); - } - }); - return new AssignmentMemberSpec( Optional.ofNullable(member.instanceId()), Optional.ofNullable(member.rackId()), - subscribedTopics, + new TopicIds(member.subscribedTopicNames(), topicsImage), targetAssignment.partitions() ); } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicIds.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicIds.java new file mode 100644 index 0000000000000..8485c1f560887 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicIds.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.image.TopicImage; +import org.apache.kafka.image.TopicsImage; + +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Set; + +/** + * TopicIds is initialized with topic names (String) but exposes a Set of topic ids (Uuid) to the + * user and performs the conversion lazily with TopicsImage. + */ +public class TopicIds implements Set { + private final Set topicNames; + private final TopicsImage image; + + public TopicIds( + Set topicNames, + TopicsImage image + ) { + this.topicNames = Objects.requireNonNull(topicNames); + this.image = Objects.requireNonNull(image); + } + + @Override + public int size() { + return topicNames.size(); + } + + @Override + public boolean isEmpty() { + return topicNames.isEmpty(); + } + + @Override + public boolean contains(Object o) { + if (o instanceof Uuid) { + Uuid topicId = (Uuid) o; + TopicImage topicImage = image.getTopic(topicId); + if (topicImage == null) return false; + return topicNames.contains(topicImage.name()); + } + return false; + } + + private static class TopicIdIterator implements Iterator { + final Iterator iterator; + final TopicsImage image; + private Uuid next = null; + + private TopicIdIterator( + Iterator iterator, + TopicsImage image + ) { + this.iterator = Objects.requireNonNull(iterator); + this.image = Objects.requireNonNull(image); + } + + @Override + public boolean hasNext() { + if (next != null) return true; + Uuid result = null; + do { + if (!iterator.hasNext()) { + return false; + } + String next = iterator.next(); + TopicImage topicImage = image.getTopic(next); + if (topicImage != null) { + result = topicImage.id(); + } + } while (result == null); + next = result; + return true; + } + + @Override + public Uuid next() { + if (!hasNext()) throw new NoSuchElementException(); + Uuid result = next; + next = null; + return result; + } + } + + @Override + public Iterator iterator() { + return new TopicIdIterator(topicNames.iterator(), image); + } + + @Override + public Object[] toArray() { + throw new UnsupportedOperationException(); + } + + @Override + public T[] toArray(T[] a) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(Uuid o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + for (Object o : c) { + if (!contains(o)) return false; + } + return true; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TopicIds uuids = (TopicIds) o; + + if (!Objects.equals(topicNames, uuids.topicNames)) return false; + return Objects.equals(image, uuids.image); + } + + @Override + public int hashCode() { + int result = topicNames.hashCode(); + result = 31 * result + image.hashCode(); + return result; + } +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 058cdf206db60..18f1e985cc262 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -103,11 +103,11 @@ import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; -import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol; import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection; import static org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; import static org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID; +import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals; import static org.apache.kafka.coordinator.group.Assertions.assertRecordsEquals; import static org.apache.kafka.coordinator.group.Assertions.assertResponseEquals; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java index 13f860209db1d..32f9618ff92d5 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilderTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.coordinator.group.consumer.TopicMetadata; import org.junit.jupiter.api.Test; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -29,6 +28,7 @@ import java.util.Set; import java.util.TreeMap; +import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; @@ -69,13 +69,13 @@ public void testTwoMembersNoTopicSubscription() { members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.emptyList(), + Collections.emptySet(), Collections.emptyMap() )); members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.emptyList(), + Collections.emptySet(), Collections.emptyMap() )); @@ -103,13 +103,13 @@ public void testTwoMembersSubscribedToNonexistentTopics() { members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic3Uuid), + Collections.singleton(topic3Uuid), Collections.emptyMap() )); members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic2Uuid), + Collections.singleton(topic2Uuid), Collections.emptyMap() )); @@ -139,13 +139,13 @@ public void testFirstAssignmentTwoMembersTwoTopics() { members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic3Uuid), + mkSet(topic1Uuid, topic3Uuid), Collections.emptyMap() )); members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic3Uuid), + Collections.singleton(topic3Uuid), Collections.emptyMap() )); @@ -186,19 +186,19 @@ public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() { members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic3Uuid), + Collections.singleton(topic3Uuid), Collections.emptyMap() )); members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic3Uuid), + Collections.singleton(topic3Uuid), Collections.emptyMap() )); members.put(memberC, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic1Uuid), + Collections.singleton(topic1Uuid), Collections.emptyMap() )); @@ -254,7 +254,7 @@ public void testReassignmentForTwoMembersThreeTopicsGivenUnbalancedPrevAssignmen members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.of("rack0"), - Collections.singletonList(topic1Uuid), + Collections.singleton(topic1Uuid), currentAssignmentForA )); @@ -267,7 +267,7 @@ public void testReassignmentForTwoMembersThreeTopicsGivenUnbalancedPrevAssignmen members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.of("rack1"), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForB )); @@ -281,7 +281,7 @@ public void testReassignmentForTwoMembersThreeTopicsGivenUnbalancedPrevAssignmen members.put(memberC, new AssignmentMemberSpec( Optional.empty(), Optional.of("rack2"), - Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), + mkSet(topic1Uuid, topic2Uuid, topic3Uuid), currentAssignmentForC )); @@ -345,7 +345,7 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembers() { members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic3Uuid), + mkSet(topic1Uuid, topic3Uuid), currentAssignmentForA )); @@ -358,7 +358,7 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembers() { members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid, topic4Uuid), + mkSet(topic1Uuid, topic2Uuid, topic3Uuid, topic4Uuid), currentAssignmentForB )); @@ -406,7 +406,7 @@ public void testReassignmentWhenOneMemberAddedAndPartitionsAddedTwoMembersTwoTop members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic1Uuid), + Collections.singleton(topic1Uuid), currentAssignmentForA )); @@ -417,7 +417,7 @@ public void testReassignmentWhenOneMemberAddedAndPartitionsAddedTwoMembersTwoTop members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForB )); @@ -425,7 +425,7 @@ public void testReassignmentWhenOneMemberAddedAndPartitionsAddedTwoMembersTwoTop members.put(memberC, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), Collections.emptyMap() )); @@ -480,7 +480,7 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic3Uuid), + mkSet(topic1Uuid, topic3Uuid), currentAssignmentForA )); @@ -490,7 +490,7 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic2Uuid), + Collections.singleton(topic2Uuid), currentAssignmentForB )); @@ -539,7 +539,7 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic1Uuid), + Collections.singleton(topic1Uuid), currentAssignmentForA )); @@ -550,7 +550,7 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForB )); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java index 0599c6eaafa80..4fe748c3c03e0 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java @@ -22,7 +22,6 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -32,6 +31,7 @@ import java.util.Set; import java.util.TreeMap; +import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; @@ -72,7 +72,7 @@ public void testOneMemberNoTopicSubscription() { new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.emptyList(), + Collections.emptySet(), Collections.emptyMap() ) ); @@ -102,7 +102,7 @@ public void testOneMemberSubscribedToNonexistentTopic() { new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic2Uuid), + Collections.singleton(topic2Uuid), Collections.emptyMap() ) ); @@ -133,13 +133,13 @@ public void testFirstAssignmentTwoMembersTwoTopicsNoMemberRacks() { members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic3Uuid), + mkSet(topic1Uuid, topic3Uuid), Collections.emptyMap() )); members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic3Uuid), + mkSet(topic1Uuid, topic3Uuid), Collections.emptyMap() )); @@ -176,19 +176,19 @@ public void testFirstAssignmentNumMembersGreaterThanTotalNumPartitions() { members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic3Uuid), + Collections.singleton(topic3Uuid), Collections.emptyMap() )); members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic3Uuid), + Collections.singleton(topic3Uuid), Collections.emptyMap() )); members.put(memberC, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic3Uuid), + Collections.singleton(topic3Uuid), Collections.emptyMap() )); @@ -226,14 +226,12 @@ public void testValidityAndBalanceForLargeSampleSet() { )); } - List subscribedTopics = new ArrayList<>(topicMetadata.keySet()); - Map members = new TreeMap<>(); for (int i = 1; i < 50; i++) { members.put("member" + i, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - subscribedTopics, + topicMetadata.keySet(), Collections.emptyMap() )); } @@ -273,7 +271,7 @@ public void testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment( members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForA )); @@ -286,7 +284,7 @@ public void testReassignmentForTwoMembersTwoTopicsGivenUnbalancedPrevAssignment( members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForB )); @@ -337,7 +335,7 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembersTwoTopics() { members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForA )); @@ -350,7 +348,7 @@ public void testReassignmentWhenPartitionsAreAddedForTwoMembersTwoTopics() { members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForB )); @@ -398,7 +396,7 @@ public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembe members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForA )); @@ -409,7 +407,7 @@ public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembe members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForB )); @@ -417,7 +415,7 @@ public void testReassignmentWhenOneMemberAddedAfterInitialAssignmentWithTwoMembe members.put(memberC, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), Collections.emptyMap() )); @@ -467,7 +465,7 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForA )); @@ -478,7 +476,7 @@ public void testReassignmentWhenOneMemberRemovedAfterInitialAssignmentWithThreeM members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForB )); @@ -529,7 +527,7 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic2Uuid), + Collections.singleton(topic2Uuid), currentAssignmentForA )); @@ -540,7 +538,7 @@ public void testReassignmentWhenOneSubscriptionRemovedAfterInitialAssignmentWith members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic2Uuid), + Collections.singleton(topic2Uuid), currentAssignmentForB )); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java index bafef70ba4b88..b52681da48031 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.coordinator.group.consumer.TopicMetadata; import org.junit.jupiter.api.Test; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -30,6 +29,7 @@ import java.util.Set; import java.util.TreeMap; +import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.assignor.SubscriptionType.HETEROGENEOUS; @@ -69,7 +69,7 @@ public void testOneConsumerNoTopic() { new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.emptyList(), + Collections.emptySet(), Collections.emptyMap() ) ); @@ -99,7 +99,7 @@ public void testOneConsumerSubscribedToNonExistentTopic() { new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic2Uuid), + Collections.singleton(topic2Uuid), Collections.emptyMap() ) ); @@ -133,14 +133,14 @@ public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() { members.put(consumerA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic3Uuid), + mkSet(topic1Uuid, topic3Uuid), Collections.emptyMap() )); members.put(consumerB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic3Uuid), + mkSet(topic1Uuid, topic3Uuid), Collections.emptyMap() )); @@ -191,21 +191,21 @@ public void testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() members.put(consumerA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), Collections.emptyMap() )); members.put(consumerB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic3Uuid), + Collections.singleton(topic3Uuid), Collections.emptyMap() )); members.put(consumerC, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic2Uuid, topic3Uuid), + mkSet(topic2Uuid, topic3Uuid), Collections.emptyMap() )); @@ -254,21 +254,21 @@ public void testFirstAssignmentNumConsumersGreaterThanNumPartitions() { members.put(consumerA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic3Uuid), + mkSet(topic1Uuid, topic3Uuid), Collections.emptyMap() )); members.put(consumerB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic3Uuid), + mkSet(topic1Uuid, topic3Uuid), Collections.emptyMap() )); members.put(consumerC, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic3Uuid), + mkSet(topic1Uuid, topic3Uuid), Collections.emptyMap() )); @@ -322,7 +322,7 @@ public void testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerA members.put(consumerA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForA )); @@ -334,7 +334,7 @@ public void testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerA members.put(consumerB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForB )); @@ -342,7 +342,7 @@ public void testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerA members.put(consumerC, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), Collections.emptyMap() )); @@ -395,7 +395,7 @@ public void testReassignmentWhenOnePartitionAddedForTwoConsumersTwoTopics() { members.put(consumerA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForA )); @@ -407,7 +407,7 @@ public void testReassignmentWhenOnePartitionAddedForTwoConsumersTwoTopics() { members.put(consumerB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForB )); @@ -457,7 +457,7 @@ public void testReassignmentWhenOneConsumerAddedAfterInitialAssignmentWithTwoCon members.put(consumerA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForA )); @@ -469,7 +469,7 @@ public void testReassignmentWhenOneConsumerAddedAfterInitialAssignmentWithTwoCon members.put(consumerB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForB )); @@ -477,7 +477,7 @@ public void testReassignmentWhenOneConsumerAddedAfterInitialAssignmentWithTwoCon members.put(consumerC, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), Collections.emptyMap() )); @@ -533,7 +533,7 @@ public void testReassignmentWhenOneConsumerAddedAndOnePartitionAfterInitialAssig members.put(consumerA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForA )); @@ -545,7 +545,7 @@ public void testReassignmentWhenOneConsumerAddedAndOnePartitionAfterInitialAssig members.put(consumerB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForB )); @@ -553,7 +553,7 @@ public void testReassignmentWhenOneConsumerAddedAndOnePartitionAfterInitialAssig members.put(consumerC, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic1Uuid), + Collections.singleton(topic1Uuid), Collections.emptyMap() )); @@ -608,7 +608,7 @@ public void testReassignmentWhenOneConsumerRemovedAfterInitialAssignmentWithTwoC members.put(consumerB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid), + mkSet(topic1Uuid, topic2Uuid), currentAssignmentForB )); @@ -662,7 +662,7 @@ public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme members.put(consumerA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic1Uuid), + Collections.singleton(topic1Uuid), currentAssignmentForA )); @@ -673,7 +673,7 @@ public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme members.put(consumerB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Arrays.asList(topic1Uuid, topic2Uuid, topic3Uuid), + mkSet(topic1Uuid, topic2Uuid, topic3Uuid), currentAssignmentForB )); @@ -685,7 +685,7 @@ public void testReassignmentWhenMultipleSubscriptionsRemovedAfterInitialAssignme members.put(consumerC, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - Collections.singletonList(topic2Uuid), + Collections.singleton(topic2Uuid), currentAssignmentForC )); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java index 1c53445babe57..44ed930f3903e 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection; @@ -75,8 +76,7 @@ public void testNewMember() { assertEquals("rack-id", member.rackId()); assertEquals("client-id", member.clientId()); assertEquals("hostname", member.clientHost()); - // Names are sorted. - assertEquals(Arrays.asList("bar", "foo"), member.subscribedTopicNames()); + assertEquals(mkSet("bar", "foo"), member.subscribedTopicNames()); assertEquals("regex", member.subscribedTopicRegex()); assertEquals("range", member.serverAssignorName().get()); assertEquals(mkAssignment(mkTopicAssignment(topicId1, 1, 2, 3)), member.assignedPartitions()); @@ -175,7 +175,7 @@ public void testUpdateMember() { .maybeUpdateRackId(Optional.of("new-rack-id")) .maybeUpdateInstanceId(Optional.of("new-instance-id")) .maybeUpdateServerAssignorName(Optional.of("new-assignor")) - .maybeUpdateSubscribedTopicNames(Optional.of(Arrays.asList("zar"))) + .maybeUpdateSubscribedTopicNames(Optional.of(Collections.singletonList("zar"))) .maybeUpdateSubscribedTopicRegex(Optional.of("new-regex")) .maybeUpdateRebalanceTimeoutMs(OptionalInt.of(6000)) .build(); @@ -183,7 +183,7 @@ public void testUpdateMember() { assertEquals("new-instance-id", updatedMember.instanceId()); assertEquals("new-rack-id", updatedMember.rackId()); // Names are sorted. - assertEquals(Arrays.asList("zar"), updatedMember.subscribedTopicNames()); + assertEquals(mkSet("zar"), updatedMember.subscribedTopicNames()); assertEquals("new-regex", updatedMember.subscribedTopicRegex()); assertEquals("new-assignor", updatedMember.serverAssignorName().get()); } @@ -210,8 +210,7 @@ public void testUpdateWithConsumerGroupMemberMetadataValue() { assertEquals("rack-id", member.rackId()); assertEquals("client-id", member.clientId()); assertEquals("host-id", member.clientHost()); - // Names are sorted. - assertEquals(Arrays.asList("bar", "foo"), member.subscribedTopicNames()); + assertEquals(mkSet("bar", "foo"), member.subscribedTopicNames()); assertEquals("regex", member.subscribedTopicRegex()); assertEquals("range", member.serverAssignorName().get()); assertEquals( @@ -297,7 +296,7 @@ public void testAsConsumerGroupDescribeMember() { .setInstanceId(instanceId) .setRackId(rackId) .setClientHost(clientHost) - .setSubscribedTopicNames(subscribedTopicNames) + .setSubscribedTopicNames(new ArrayList<>(subscribedTopicNames)) .setSubscribedTopicRegex(subscribedTopicRegex) .setAssignment( new ConsumerGroupDescribeResponseData.Assignment() diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java index aff6673193c6c..3a03a16228a2b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java @@ -17,23 +17,25 @@ package org.apache.kafka.coordinator.group.consumer; import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.MetadataImageBuilder; import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; import org.apache.kafka.coordinator.group.assignor.SubscriptionType; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.image.TopicsImage; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.coordinator.group.Assertions.assertUnorderedListEquals; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; @@ -61,6 +63,7 @@ public static class TargetAssignmentBuilderTestContext { private final Map targetAssignment = new HashMap<>(); private final Map memberAssignments = new HashMap<>(); private final Map staticMembers = new HashMap<>(); + private MetadataImageBuilder topicsImageBuilder = new MetadataImageBuilder(); public TargetAssignmentBuilderTestContext( String groupId, @@ -107,6 +110,8 @@ public Uuid addTopicMetadata( numPartitions, partitionRacks )); + topicsImageBuilder = topicsImageBuilder.addTopic(topicId, topicName, numPartitions); + return topicId; } @@ -156,6 +161,7 @@ public void prepareMemberAssignment( } public TargetAssignmentBuilder.TargetAssignmentResult build() { + TopicsImage topicsImage = topicsImageBuilder.build().topics(); // Prepare expected member specs. Map memberSpecs = new HashMap<>(); @@ -164,7 +170,7 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() { memberSpecs.put(memberId, createAssignmentMemberSpec( member, targetAssignment.getOrDefault(memberId, Assignment.EMPTY), - subscriptionMetadata + topicsImage ) )); @@ -187,7 +193,7 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() { memberSpecs.put(memberId, createAssignmentMemberSpec( updatedMemberOrNull, assignment, - subscriptionMetadata + topicsImage )); } }); @@ -215,7 +221,8 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() { .withStaticMembers(staticMembers) .withSubscriptionMetadata(subscriptionMetadata) .withSubscriptionType(subscriptionType) - .withTargetAssignment(targetAssignment); + .withTargetAssignment(targetAssignment) + .withTopicsImage(topicsImage); // Add the updated members or delete the deleted members. updatedMembers.forEach((memberId, updatedMemberOrNull) -> { @@ -242,6 +249,11 @@ public TargetAssignmentBuilder.TargetAssignmentResult build() { public void testCreateAssignmentMemberSpec() { Uuid fooTopicId = Uuid.randomUuid(); Uuid barTopicId = Uuid.randomUuid(); + TopicsImage topicsImage = new MetadataImageBuilder() + .addTopic(fooTopicId, "foo", 5) + .addTopic(barTopicId, "bar", 5) + .build() + .topics(); ConsumerGroupMember member = new ConsumerGroupMember.Builder("member-id") .setSubscribedTopicNames(Arrays.asList("foo", "bar", "zar")) @@ -249,13 +261,6 @@ public void testCreateAssignmentMemberSpec() { .setInstanceId("instanceId") .build(); - Map subscriptionMetadata = new HashMap() { - { - put("foo", new TopicMetadata(fooTopicId, "foo", 5, Collections.emptyMap())); - put("bar", new TopicMetadata(barTopicId, "bar", 5, Collections.emptyMap())); - } - }; - Assignment assignment = new Assignment(mkAssignment( mkTopicAssignment(fooTopicId, 1, 2, 3), mkTopicAssignment(barTopicId, 1, 2, 3) @@ -264,13 +269,13 @@ public void testCreateAssignmentMemberSpec() { AssignmentMemberSpec assignmentMemberSpec = createAssignmentMemberSpec( member, assignment, - subscriptionMetadata + topicsImage ); assertEquals(new AssignmentMemberSpec( Optional.of("instanceId"), Optional.of("rackId"), - new HashSet<>(Arrays.asList(fooTopicId, barTopicId)), + new TopicIds(mkSet("bar", "foo", "zar"), topicsImage), assignment.partitions() ), assignmentMemberSpec); } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TopicIdsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TopicIdsTest.java new file mode 100644 index 0000000000000..c937b0551163d --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TopicIdsTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.MetadataImageBuilder; +import org.apache.kafka.image.TopicsImage; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TopicIdsTest { + + @Test + public void testTopicNamesCannotBeNull() { + assertThrows(NullPointerException.class, () -> new TopicIds(null, TopicsImage.EMPTY)); + } + + @Test + public void testTopicsImageCannotBeNull() { + assertThrows(NullPointerException.class, () -> new TopicIds(Collections.emptySet(), null)); + } + + @Test + public void testSize() { + Set topicNames = mkSet("foo", "bar", "baz"); + Set topicIds = new TopicIds(topicNames, TopicsImage.EMPTY); + assertEquals(topicNames.size(), topicIds.size()); + } + + @Test + public void testIsEmpty() { + Set topicNames = Collections.emptySet(); + Set topicIds = new TopicIds(topicNames, TopicsImage.EMPTY); + assertEquals(topicNames.size(), topicIds.size()); + } + + @Test + public void testContains() { + Uuid fooUuid = Uuid.randomUuid(); + Uuid barUuid = Uuid.randomUuid(); + Uuid bazUuid = Uuid.randomUuid(); + Uuid quxUuid = Uuid.randomUuid(); + TopicsImage topicsImage = new MetadataImageBuilder() + .addTopic(fooUuid, "foo", 3) + .addTopic(barUuid, "bar", 3) + .addTopic(bazUuid, "qux", 3) + .build() + .topics(); + + Set topicIds = new TopicIds(mkSet("foo", "bar", "baz"), topicsImage); + + assertTrue(topicIds.contains(fooUuid)); + assertTrue(topicIds.contains(barUuid)); + assertFalse(topicIds.contains(bazUuid)); + assertFalse(topicIds.contains(quxUuid)); + } + + @Test + public void testContainsAll() { + Uuid fooUuid = Uuid.randomUuid(); + Uuid barUuid = Uuid.randomUuid(); + Uuid bazUuid = Uuid.randomUuid(); + Uuid quxUuid = Uuid.randomUuid(); + TopicsImage topicsImage = new MetadataImageBuilder() + .addTopic(fooUuid, "foo", 3) + .addTopic(barUuid, "bar", 3) + .addTopic(bazUuid, "baz", 3) + .addTopic(quxUuid, "qux", 3) + .build() + .topics(); + + Set topicIds = new TopicIds(mkSet("foo", "bar", "baz", "qux"), topicsImage); + + assertTrue(topicIds.contains(fooUuid)); + assertTrue(topicIds.contains(barUuid)); + assertTrue(topicIds.contains(bazUuid)); + assertTrue(topicIds.contains(quxUuid)); + assertTrue(topicIds.containsAll(mkSet(fooUuid, barUuid, bazUuid, quxUuid))); + } + + @Test + public void testContainsAllOneTopicConversionFails() { + // topic 'qux' only exists as topic name. + Uuid fooUuid = Uuid.randomUuid(); + Uuid barUuid = Uuid.randomUuid(); + Uuid bazUuid = Uuid.randomUuid(); + Uuid quxUuid = Uuid.randomUuid(); + TopicsImage topicsImage = new MetadataImageBuilder() + .addTopic(fooUuid, "foo", 3) + .addTopic(barUuid, "bar", 3) + .addTopic(bazUuid, "baz", 3) + .build() + .topics(); + + Set topicIds = new TopicIds(mkSet("foo", "bar", "baz", "qux"), topicsImage); + + assertTrue(topicIds.contains(fooUuid)); + assertTrue(topicIds.contains(barUuid)); + assertTrue(topicIds.contains(bazUuid)); + assertTrue(topicIds.containsAll(mkSet(fooUuid, barUuid, bazUuid))); + assertFalse(topicIds.containsAll(mkSet(fooUuid, barUuid, bazUuid, quxUuid))); + } + + @Test + public void testIterator() { + Uuid fooUuid = Uuid.randomUuid(); + Uuid barUuid = Uuid.randomUuid(); + Uuid bazUuid = Uuid.randomUuid(); + Uuid quxUuid = Uuid.randomUuid(); + TopicsImage topicsImage = new MetadataImageBuilder() + .addTopic(fooUuid, "foo", 3) + .addTopic(barUuid, "bar", 3) + .addTopic(bazUuid, "baz", 3) + .addTopic(quxUuid, "qux", 3) + .build() + .topics(); + + Set topicIds = new TopicIds(mkSet("foo", "bar", "baz", "qux"), topicsImage); + Set expectedIds = mkSet(fooUuid, barUuid, bazUuid, quxUuid); + Set actualIds = new HashSet<>(topicIds); + + assertEquals(expectedIds, actualIds); + } + + @Test + public void testIteratorOneTopicConversionFails() { + // topic 'qux' only exists as topic id. + // topic 'quux' only exists as topic name. + Uuid fooUuid = Uuid.randomUuid(); + Uuid barUuid = Uuid.randomUuid(); + Uuid bazUuid = Uuid.randomUuid(); + Uuid qux = Uuid.randomUuid(); + TopicsImage topicsImage = new MetadataImageBuilder() + .addTopic(fooUuid, "foo", 3) + .addTopic(barUuid, "bar", 3) + .addTopic(bazUuid, "baz", 3) + .addTopic(qux, "qux", 3) + .build() + .topics(); + + Set topicIds = new TopicIds(mkSet("foo", "bar", "baz", "quux"), topicsImage); + Set expectedIds = mkSet(fooUuid, barUuid, bazUuid); + Set actualIds = new HashSet<>(topicIds); + + assertEquals(expectedIds, actualIds); + } + + @Test + public void testEquals() { + Uuid topicId = Uuid.randomUuid(); + TopicIds topicIds1 = new TopicIds(Collections.singleton("topic"), + new MetadataImageBuilder() + .addTopic(topicId, "topicId", 3) + .build() + .topics() + ); + + TopicIds topicIds2 = new TopicIds(Collections.singleton("topic"), + new MetadataImageBuilder() + .addTopic(topicId, "topicId", 3) + .build() + .topics() + ); + + assertEquals(topicIds1, topicIds2); + } +} diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java index 09ae345dbd383..eb9c4ee6e2702 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java @@ -25,9 +25,14 @@ import org.apache.kafka.coordinator.group.assignor.RangeAssignor; import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; import org.apache.kafka.coordinator.group.assignor.SubscriptionType; +import org.apache.kafka.coordinator.group.consumer.TopicIds; import org.apache.kafka.coordinator.group.assignor.UniformAssignor; import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.image.TopicsImage; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -44,7 +49,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -119,7 +123,9 @@ public enum AssignmentType { private SubscribedTopicDescriber subscribedTopicDescriber; - private final List allTopicIds = new ArrayList<>(); + private final List allTopicNames = new ArrayList<>(); + + private TopicsImage topicsImage = TopicsImage.EMPTY; @Setup(Level.Trial) public void setup() { @@ -136,6 +142,7 @@ public void setup() { } private Map createTopicMetadata() { + MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); Map topicMetadata = new HashMap<>(); int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount; @@ -146,15 +153,17 @@ private Map createTopicMetadata() { for (int i = 0; i < topicCount; i++) { Uuid topicUuid = Uuid.randomUuid(); String topicName = "topic" + i; - allTopicIds.add(topicUuid); + allTopicNames.add(topicName); topicMetadata.put(topicUuid, new TopicMetadata( topicUuid, topicName, partitionsPerTopicCount, partitionRacks )); + TargetAssignmentBuilderBenchmark.addTopic(delta, topicUuid, topicName, partitionsPerTopicCount); } + topicsImage = delta.apply(MetadataProvenance.EMPTY).topics(); return topicMetadata; } @@ -167,7 +176,7 @@ private void createAssignmentSpec() { if (subscriptionType == HOMOGENEOUS) { for (int i = 0; i < numberOfMembers; i++) { - addMemberSpec(members, i, new HashSet<>(allTopicIds)); + addMemberSpec(members, i, new TopicIds(new HashSet<>(allTopicNames), topicsImage)); } } else { // Adjust bucket count based on member count when member count < max bucket count. @@ -189,7 +198,7 @@ private void createAssignmentSpec() { int topicStartIndex = bucket * bucketSizeTopics; int topicEndIndex = Math.min((bucket + 1) * bucketSizeTopics, topicCount); - Set bucketTopics = new HashSet<>(allTopicIds.subList(topicStartIndex, topicEndIndex)); + TopicIds bucketTopics = new TopicIds(new HashSet<>(allTopicNames.subList(topicStartIndex, topicEndIndex)), topicsImage); // Assign topics to each member in the current bucket for (int i = memberStartIndex; i < memberEndIndex; i++) { @@ -248,11 +257,11 @@ private void simulateIncrementalRebalance() { )); }); - Collection subscribedTopicIdsForNewMember; + Set subscribedTopicIdsForNewMember; if (subscriptionType == HETEROGENEOUS) { subscribedTopicIdsForNewMember = updatedMembers.get("member" + (memberCount - 2)).subscribedTopicIds(); } else { - subscribedTopicIdsForNewMember = allTopicIds; + subscribedTopicIdsForNewMember = new TopicIds(new HashSet<>(allTopicNames), topicsImage); } Optional rackId = rackId(memberCount - 1); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java index 79a64cb667aab..3244f7ea82546 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java @@ -17,11 +17,14 @@ package org.apache.kafka.jmh.assignor; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.metadata.PartitionRecord; +import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.consumer.TopicIds; import org.apache.kafka.coordinator.group.assignor.UniformAssignor; import org.apache.kafka.coordinator.group.consumer.Assignment; import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember; @@ -29,6 +32,10 @@ import org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder; import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.image.TopicsImage; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -44,8 +51,10 @@ import org.openjdk.jmh.annotations.Warmup; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -85,7 +94,7 @@ public class TargetAssignmentBuilderBenchmark { private final List allTopicNames = new ArrayList<>(); - private final List allTopicIds = new ArrayList<>(); + private TopicsImage topicsImage; @Setup(Level.Trial) public void setup() { @@ -106,6 +115,7 @@ public void setup() { .withSubscriptionMetadata(subscriptionMetadata) .withTargetAssignment(existingTargetAssignment) .withSubscriptionType(HOMOGENEOUS) + .withTopicsImage(topicsImage) .addOrUpdateMember(newMember.memberId(), newMember); } @@ -123,13 +133,13 @@ private Map generateMockMembers() { private Map generateMockSubscriptionMetadata() { Map subscriptionMetadata = new HashMap<>(); + MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); int partitionsPerTopicCount = (memberCount * partitionsToMemberRatio) / topicCount; for (int i = 0; i < topicCount; i++) { String topicName = "topic-" + i; Uuid topicId = Uuid.randomUuid(); allTopicNames.add(topicName); - allTopicIds.add(topicId); TopicMetadata metadata = new TopicMetadata( topicId, @@ -138,8 +148,10 @@ private Map generateMockSubscriptionMetadata() { Collections.emptyMap() ); subscriptionMetadata.put(topicName, metadata); + addTopic(delta, topicId, topicName, partitionsPerTopicCount); } + topicsImage = delta.apply(MetadataProvenance.EMPTY).topics(); return subscriptionMetadata; } @@ -182,13 +194,31 @@ private void createAssignmentSpec() { members.put(memberId, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), - allTopicIds, + new TopicIds(new HashSet<>(allTopicNames), topicsImage), Collections.emptyMap() )); } assignmentSpec = new AssignmentSpec(members, HOMOGENEOUS); } + public static void addTopic( + MetadataDelta delta, + Uuid topicId, + String topicName, + int numPartitions + ) { + // For testing purposes, the following criteria are used: + // - Number of replicas for each partition: 2 + // - Number of brokers available in the cluster: 4 + delta.replay(new TopicRecord().setTopicId(topicId).setName(topicName)); + for (int i = 0; i < numPartitions; i++) { + delta.replay(new PartitionRecord() + .setTopicId(topicId) + .setPartitionId(i) + .setReplicas(Arrays.asList(i % 4, (i + 1) % 4))); + } + } + @Benchmark @Threads(1) @OutputTimeUnit(TimeUnit.MILLISECONDS)