From e4771400ba66750c5bf0a326f74c3a5eb4530882 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Mon, 20 May 2024 11:53:33 +0100 Subject: [PATCH 1/3] KAFKA-16722: Introduce ConsumerGroupPartitionAssignor interface --- .../main/scala/kafka/server/KafkaConfig.scala | 4 +- .../group/GroupCoordinatorConfig.java | 6 +- .../group/GroupMetadataManager.java | 30 ++-- .../ConsumerGroupPartitionAssignor.java | 29 ++++ .../group/assignor/PartitionAssignor.java | 2 +- .../group/assignor/RangeAssignor.java | 2 +- .../group/assignor/UniformAssignor.java | 2 +- .../group/GroupCoordinatorConfigTest.java | 4 +- .../group/GroupMetadataManagerTest.java | 132 +++++++++--------- .../GroupMetadataManagerTestContext.java | 6 +- .../group/MockPartitionAssignor.java | 6 +- .../group/NoOpPartitionAssignor.java | 6 +- 12 files changed, 132 insertions(+), 97 deletions(-) create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupPartitionAssignor.java diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index af5d0ae9a26c0..6dda655bccdff 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -36,7 +36,7 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy import org.apache.kafka.coordinator.group.Group.GroupType import org.apache.kafka.coordinator.group.GroupCoordinatorConfig -import org.apache.kafka.coordinator.group.assignor.PartitionAssignor +import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.QuorumConfig @@ -953,7 +953,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val consumerGroupMinHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG) val consumerGroupMaxHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG) val consumerGroupMaxSize = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG) - val consumerGroupAssignors = getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, classOf[PartitionAssignor]) + val consumerGroupAssignors = getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, classOf[ConsumerGroupPartitionAssignor]) val consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG)) /** ********* Offset management configuration ***********/ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index be4e9c0bb3a37..7939dfa630e0b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -17,7 +17,7 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.assignor.RangeAssignor; import org.apache.kafka.coordinator.group.assignor.UniformAssignor; @@ -182,7 +182,7 @@ public class GroupCoordinatorConfig { /** * The consumer group assignors. */ - public final List consumerGroupAssignors; + public final List consumerGroupAssignors; /** * The offsets topic segment bytes should be kept relatively small to facilitate faster @@ -262,7 +262,7 @@ public GroupCoordinatorConfig( int consumerGroupSessionTimeoutMs, int consumerGroupHeartbeatIntervalMs, int consumerGroupMaxSize, - List consumerGroupAssignors, + List consumerGroupAssignors, int offsetsTopicSegmentBytes, int offsetMetadataMaxSize, int classicGroupMaxSize, diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index d127812556928..b912cb6ac36d0 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -58,7 +58,7 @@ import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; import org.apache.kafka.coordinator.group.assignor.SubscriptionType; import org.apache.kafka.coordinator.group.consumer.Assignment; @@ -158,7 +158,7 @@ public static class Builder { private SnapshotRegistry snapshotRegistry = null; private Time time = null; private CoordinatorTimer timer = null; - private List consumerGroupAssignors = null; + private List consumerGroupAssignors = null; private int consumerGroupMaxSize = Integer.MAX_VALUE; private int consumerGroupHeartbeatIntervalMs = 5000; private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE; @@ -192,7 +192,7 @@ Builder withTimer(CoordinatorTimer timer) { return this; } - Builder withConsumerGroupAssignors(List consumerGroupAssignors) { + Builder withConsumerGroupAssignors(List consumerGroupAssignors) { this.consumerGroupAssignors = consumerGroupAssignors; return this; } @@ -323,14 +323,14 @@ GroupMetadataManager build() { private final GroupCoordinatorMetricsShard metrics; /** - * The supported partition assignors keyed by their name. + * The supported consumer group partition assignors keyed by their name. */ - private final Map assignors; + private final Map consumerGroupAssignors; /** - * The default assignor used. + * The default consumer group assignor used. */ - private final PartitionAssignor defaultAssignor; + private final ConsumerGroupPartitionAssignor defaultConsumerGroupAssignor; /** * The classic and consumer groups keyed by their name. @@ -412,7 +412,7 @@ private GroupMetadataManager( Time time, CoordinatorTimer timer, GroupCoordinatorMetricsShard metrics, - List assignors, + List consumerGroupAssignors, MetadataImage metadataImage, int consumerGroupMaxSize, int consumerGroupSessionTimeoutMs, @@ -432,8 +432,8 @@ private GroupMetadataManager( this.timer = timer; this.metrics = metrics; this.metadataImage = metadataImage; - this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity())); - this.defaultAssignor = assignors.get(0); + this.consumerGroupAssignors = consumerGroupAssignors.stream().collect(Collectors.toMap(ConsumerGroupPartitionAssignor::name, Function.identity())); + this.defaultConsumerGroupAssignor = consumerGroupAssignors.get(0); this.groups = new TimelineHashMap<>(snapshotRegistry, 0); this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0); this.consumerGroupMaxSize = consumerGroupMaxSize; @@ -539,7 +539,7 @@ public List consumerGroupDescr try { describedGroups.add(consumerGroup(groupId, committedOffset).asDescribedGroup( committedOffset, - defaultAssignor.name(), + defaultConsumerGroupAssignor.name(), metadataImage.topics() )); } catch (GroupIdNotFoundException exception) { @@ -1041,9 +1041,9 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid( throw new InvalidRequestException("MemberEpoch is invalid."); } - if (request.serverAssignor() != null && !assignors.containsKey(request.serverAssignor())) { + if (request.serverAssignor() != null && !consumerGroupAssignors.containsKey(request.serverAssignor())) { throw new UnsupportedAssignorException("ServerAssignor " + request.serverAssignor() - + " is not supported. Supported assignors: " + String.join(", ", assignors.keySet()) + + " is not supported. Supported assignors: " + String.join(", ", consumerGroupAssignors.keySet()) + "."); } } @@ -1892,10 +1892,10 @@ private Assignment updateTargetAssignment( String preferredServerAssignor = group.computePreferredServerAssignor( member, updatedMember - ).orElse(defaultAssignor.name()); + ).orElse(defaultConsumerGroupAssignor.name()); try { TargetAssignmentBuilder assignmentResultBuilder = - new TargetAssignmentBuilder(group.groupId(), groupEpoch, assignors.get(preferredServerAssignor)) + new TargetAssignmentBuilder(group.groupId(), groupEpoch, consumerGroupAssignors.get(preferredServerAssignor)) .withMembers(group.members()) .withStaticMembers(group.staticMembers()) .withSubscriptionMetadata(subscriptionMetadata) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupPartitionAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupPartitionAssignor.java new file mode 100644 index 0000000000000..da315b38d6c39 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ConsumerGroupPartitionAssignor.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * Server-side partition assignor for consumer groups used by the GroupCoordinator. + * + * The interface is kept in an internal module until KIP-848 is fully + * implemented and ready to be released. + */ +@InterfaceStability.Unstable +public interface ConsumerGroupPartitionAssignor extends PartitionAssignor { +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java index 13b0ee30773be..f8b74bc218c17 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; /** - * Server side partition assignor used by the GroupCoordinator. + * Server-side partition assignor used by the GroupCoordinator. * * The interface is kept in an internal module until KIP-848 is fully * implemented and ready to be released. diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java index 8393353a9ec8b..fe067901cbd67 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java @@ -49,7 +49,7 @@ * movements during reassignment. (Sticky) * */ -public class RangeAssignor implements PartitionAssignor { +public class RangeAssignor implements ConsumerGroupPartitionAssignor { public static final String RANGE_ASSIGNOR_NAME = "range"; @Override diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java index caa0de48c2be2..7da7c2d8c8a67 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java @@ -44,7 +44,7 @@ * @see OptimizedUniformAssignmentBuilder * @see GeneralUniformAssignmentBuilder */ -public class UniformAssignor implements PartitionAssignor { +public class UniformAssignor implements ConsumerGroupPartitionAssignor { private static final Logger LOG = LoggerFactory.getLogger(UniformAssignor.class); public static final String UNIFORM_ASSIGNOR_NAME = "uniform"; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index da950c4d42618..e8906c4418a94 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -17,7 +17,7 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.assignor.RangeAssignor; import org.junit.jupiter.api.Test; @@ -28,7 +28,7 @@ public class GroupCoordinatorConfigTest { @Test public void testConfigs() { - PartitionAssignor assignor = new RangeAssignor(); + ConsumerGroupPartitionAssignor assignor = new RangeAssignor(); GroupCoordinatorConfig config = new GroupCoordinatorConfig( 10, 30, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index aa90e07c5dcf2..df5394383f195 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -61,9 +61,9 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.group.MockCoordinatorTimer.ExpiredTimeout; import org.apache.kafka.coordinator.group.MockCoordinatorTimer.ScheduledTimeout; +import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; import org.apache.kafka.coordinator.group.classic.ClassicGroupState; import org.apache.kafka.coordinator.group.consumer.Assignment; @@ -150,7 +150,7 @@ public class GroupMetadataManagerTest { public void testConsumerHeartbeatRequestValidation() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .build(); Exception ex; @@ -240,7 +240,7 @@ public void testConsumerHeartbeatRequestValidation() { public void testMemberIdGeneration() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(MetadataImage.EMPTY) .build(); @@ -283,7 +283,7 @@ public void testUnknownGroupId() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .build(); assertThrows(GroupIdNotFoundException.class, () -> @@ -304,7 +304,7 @@ public void testUnknownMemberIdJoinsConsumerGroup() { String memberId = Uuid.randomUuid().toString(); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new NoOpPartitionAssignor())) + .withConsumerGroupAssignors(Collections.singletonList(new NoOpPartitionAssignor())) .build(); // A first member joins to create the group. @@ -340,7 +340,7 @@ public void testConsumerGroupMemberEpochValidation() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .build(); ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId) @@ -426,7 +426,7 @@ public void testMemberJoinsEmptyConsumerGroup() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -516,7 +516,7 @@ public void testUpdatingSubscriptionTriggersNewTargetAssignment() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -618,7 +618,7 @@ public void testNewJoiningMemberTriggersNewTargetAssignment() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -758,7 +758,7 @@ public void testLeavingMemberBumpsGroupEpoch() { // Consumer group with two members. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -851,7 +851,7 @@ public void testGroupEpochBumpWhenNewStaticMemberJoins() { // Consumer group with two static members. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -1020,7 +1020,7 @@ public void testStaticMemberGetsBackAssignmentUponRejoin() { // Consumer group with two static members. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -1191,7 +1191,7 @@ public void testNoGroupEpochBumpWhenStaticMemberTemporarilyLeaves() { // Consumer group with two static members. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -1255,7 +1255,7 @@ public void testLeavingStaticMemberBumpsGroupEpoch() { // Consumer group with two static members. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -1348,7 +1348,7 @@ public void testShouldThrownUnreleasedInstanceIdExceptionWhenNewMemberJoinsWithI // Consumer group with one static member. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addRacks() @@ -1398,7 +1398,7 @@ public void testShouldThrownUnknownMemberIdExceptionWhenUnknownStaticMemberJoins // Consumer group with one static member. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .build()) @@ -1446,7 +1446,7 @@ public void testShouldThrowFencedInstanceIdExceptionWhenStaticMemberWithDifferen // Consumer group with one static member. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .build()) @@ -1488,7 +1488,7 @@ public void testConsumerGroupMemberEpochValidationForStaticMember() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .build(); ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId) @@ -1579,7 +1579,7 @@ public void testShouldThrowUnknownMemberIdExceptionWhenUnknownStaticMemberLeaves // Consumer group with one static member. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .build()) @@ -1625,7 +1625,7 @@ public void testShouldThrowFencedInstanceIdExceptionWhenStaticMemberWithDifferen // Consumer group with one static member. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .build()) @@ -1669,7 +1669,7 @@ public void testConsumerGroupHeartbeatFullResponse() { // Create a context with an empty consumer group. MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 2) .addRacks() @@ -1770,7 +1770,7 @@ public void testReconciliationProcess() { // Create a context with one consumer group containing two members. MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -2205,7 +2205,7 @@ public void testNewMemberIsRejectedWithMaximumMembersIsReached() { // Create a context with one consumer group containing two members. MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -2268,7 +2268,7 @@ public void testConsumerGroupStates() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)) .build(); @@ -2318,12 +2318,12 @@ public void testPartitionAssignorExceptionOnRegularHeartbeat() { Uuid barTopicId = Uuid.randomUuid(); String barTopicName = "bar"; - PartitionAssignor assignor = mock(PartitionAssignor.class); + ConsumerGroupPartitionAssignor assignor = mock(ConsumerGroupPartitionAssignor.class); when(assignor.name()).thenReturn("range"); when(assignor.assign(any(), any())).thenThrow(new PartitionAssignorException("Assignment failed.")); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -2357,7 +2357,7 @@ public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() { // Create a context with one consumer group containing one member. MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) @@ -2468,7 +2468,7 @@ public void testSubscriptionMetadataRefreshedAgainAfterWriteFailure() { // Create a context with one consumer group containing one member. MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) @@ -2592,7 +2592,7 @@ public void testGroupIdsByTopics() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .build(); assertEquals(Collections.emptySet(), context.groupMetadataManager.groupsSubscribedToTopic("foo")); @@ -2691,7 +2691,7 @@ public void testGroupIdsByTopics() { @Test public void testOnNewMetadataImageWithEmptyDelta() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .build(); MetadataDelta delta = new MetadataDelta(MetadataImage.EMPTY); @@ -2704,7 +2704,7 @@ public void testOnNewMetadataImageWithEmptyDelta() { @Test public void testOnNewMetadataImage() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .build(); // M1 in group 1 subscribes to a and b. @@ -2800,7 +2800,7 @@ public void testSessionTimeoutLifecycle() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addRacks() @@ -2875,7 +2875,7 @@ public void testSessionTimeoutExpiration() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addRacks() @@ -2939,7 +2939,7 @@ public void testSessionTimeoutExpirationStaticMember() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addRacks() @@ -3021,7 +3021,7 @@ public void testRebalanceTimeoutLifecycle() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 3) .addRacks() @@ -3176,7 +3176,7 @@ public void testRebalanceTimeoutExpiration() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 3) .addRacks() @@ -3313,7 +3313,7 @@ public void testOnLoaded() { String barTopicName = "bar"; GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -8217,7 +8217,7 @@ public void testListGroups() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 10)) .build(); @@ -8366,7 +8366,7 @@ public void testConsumerGroupDescribeNoErrors() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupIds.get(0), epoch)) .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupIds.get(1), epoch) .withMember(memberBuilder.build())) @@ -8401,7 +8401,7 @@ public void testConsumerGroupDescribeWithErrors() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .build(); List actual = context.sendConsumerGroupDescribe(Collections.singletonList(groupId)); @@ -8429,7 +8429,7 @@ public void testConsumerGroupDescribeBeforeAndAfterCommittingOffset() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(metadataImage) .build(); @@ -9224,7 +9224,7 @@ public void testConsumerGroupRebalanceSensor() { MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -9354,7 +9354,7 @@ public void testConsumerGroupHeartbeatWithNonEmptyClassicGroup() { String classicGroupId = "classic-group-id"; String memberId = Uuid.randomUuid().toString(); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new NoOpPartitionAssignor())) + .withConsumerGroupAssignors(Collections.singletonList(new NoOpPartitionAssignor())) .build(); ClassicGroup classicGroup = new ClassicGroup( new LogContext(), @@ -9383,7 +9383,7 @@ public void testConsumerGroupHeartbeatWithEmptyClassicGroup() { String classicGroupId = "classic-group-id"; String memberId = Uuid.randomUuid().toString(); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new NoOpPartitionAssignor())) + .withConsumerGroupAssignors(Collections.singletonList(new NoOpPartitionAssignor())) .build(); ClassicGroup classicGroup = new ClassicGroup( new LogContext(), @@ -9494,7 +9494,7 @@ public void testConsumerGroupHeartbeatWithStableClassicGroup() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.UPGRADE) - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(metadataImage) .build(); @@ -9673,7 +9673,7 @@ public void testConsumerGroupHeartbeatWithPreparingRebalanceClassicGroup() throw GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.UPGRADE) - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(metadataImage) .build(); @@ -9920,7 +9920,7 @@ public void testConsumerGroupHeartbeatWithCompletingRebalanceClassicGroup() thro GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.UPGRADE) - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(metadataImage) .build(); @@ -10315,7 +10315,7 @@ public void testLastConsumerProtocolMemberLeavingConsumerGroup() { // Member 1 uses the classic protocol and member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -10507,7 +10507,7 @@ public void testLastConsumerProtocolMemberSessionTimeoutInConsumerGroup() { // Member 1 uses the classic protocol and member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -10697,7 +10697,7 @@ public void testLastConsumerProtocolMemberRebalanceTimeoutInConsumerGroup() { // Member 1 uses the classic protocol and member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -10920,7 +10920,7 @@ public void testJoiningConsumerGroupWithNewDynamicMember() throws Exception { String memberId = Uuid.randomUuid().toString(); MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 2) .addTopic(barTopicId, barTopicName, 1) @@ -11070,7 +11070,7 @@ public void testJoiningConsumerGroupFailingToPersistRecords() throws Exception { )); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 2) .addRacks() @@ -11125,7 +11125,7 @@ public void testJoiningConsumerGroupWithNewStaticMember() throws Exception { String instanceId = "instance-id"; GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new NoOpPartitionAssignor())) + .withConsumerGroupAssignors(Collections.singletonList(new NoOpPartitionAssignor())) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 2) .addTopic(barTopicId, barTopicName, 1) @@ -11221,7 +11221,7 @@ public void testJoiningConsumerGroupReplacingExistingStaticMember() throws Excep String memberId = Uuid.randomUuid().toString(); String instanceId = "instance-id"; GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new NoOpPartitionAssignor())) + .withConsumerGroupAssignors(Collections.singletonList(new NoOpPartitionAssignor())) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 2) .addRacks() @@ -11327,7 +11327,7 @@ public void testJoiningConsumerGroupWithExistingStaticMemberAndNewSubscription() MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 2) .addTopic(barTopicId, barTopicName, 1) @@ -11557,7 +11557,7 @@ public void testReconciliationInJoiningConsumerGroupWithEagerProtocol() throws E MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 2) .addTopic(barTopicId, barTopicName, 1) @@ -11799,7 +11799,7 @@ public void testReconciliationInJoiningConsumerGroupWithCooperativeProtocol() th MockPartitionAssignor assignor = new MockPartitionAssignor("range"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupAssignors(Collections.singletonList(assignor)) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 2) .addTopic(barTopicId, barTopicName, 1) @@ -12183,7 +12183,8 @@ public void testClassicGroupSyncToConsumerGroupWithAllConsumerProtocolVersions() // Consumer group with two members. // Member 1 uses the classic protocol and member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) @@ -12220,7 +12221,8 @@ public void testClassicGroupSyncToConsumerGroupWithUnknownMemberId() throws Exce // Consumer group with a member that doesn't use the classic protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) .build())) @@ -12269,7 +12271,8 @@ public void testClassicGroupSyncToConsumerGroupWithFencedInstanceId() throws Exc // Consumer group with a static member. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) .setInstanceId(instanceId) @@ -12307,7 +12310,8 @@ public void testClassicGroupSyncToConsumerGroupWithInconsistentGroupProtocol() t // Consumer group with a member using the classic protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) .setClassicMemberMetadata( @@ -12371,7 +12375,8 @@ public void testClassicGroupSyncToConsumerGroupWithIllegalGeneration() throws Ex // Consumer group with a member using the classic protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) .setClassicMemberMetadata( @@ -12414,7 +12419,8 @@ public void testClassicGroupSyncToConsumerGroupRebalanceInProgress() throws Exce // Consumer group with a member using the classic protocol. // The group epoch is greater than the member epoch. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupMigrationPolicy(ConsumerGroupMigrationPolicy.DOWNGRADE) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 11) .withMember(new ConsumerGroupMember.Builder(memberId) .setRebalanceTimeoutMs(10000) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java index bd7a30c541777..e6268edc34248 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java @@ -46,7 +46,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.classic.ClassicGroup; import org.apache.kafka.coordinator.group.consumer.ConsumerGroup; import org.apache.kafka.coordinator.group.consumer.ConsumerGroupBuilder; @@ -377,7 +377,7 @@ public static class Builder { final private LogContext logContext = new LogContext(); final private SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext); private MetadataImage metadataImage; - private List consumerGroupAssignors = Collections.singletonList(new MockPartitionAssignor("range")); + private List consumerGroupAssignors = Collections.singletonList(new MockPartitionAssignor("range")); final private List consumerGroupBuilders = new ArrayList<>(); private int consumerGroupMaxSize = Integer.MAX_VALUE; private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE; @@ -394,7 +394,7 @@ public Builder withMetadataImage(MetadataImage metadataImage) { return this; } - public Builder withAssignors(List assignors) { + public Builder withConsumerGroupAssignors(List assignors) { this.consumerGroupAssignors = assignors; return this; } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java index 1fdbb31125c17..f084bb86e1ce5 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/MockPartitionAssignor.java @@ -17,9 +17,9 @@ package org.apache.kafka.coordinator.group; import org.apache.kafka.common.Uuid; -import org.apache.kafka.coordinator.group.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.GroupSpec; import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException; import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; @@ -27,7 +27,7 @@ import java.util.Objects; import java.util.Set; -public class MockPartitionAssignor implements PartitionAssignor { +public class MockPartitionAssignor implements ConsumerGroupPartitionAssignor { private final String name; private GroupAssignment prepareGroupAssignment = null; diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java index cf15ee58654dd..2cf8309a72a40 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/NoOpPartitionAssignor.java @@ -16,16 +16,16 @@ */ package org.apache.kafka.coordinator.group; -import org.apache.kafka.coordinator.group.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.GroupSpec; import org.apache.kafka.coordinator.group.assignor.MemberAssignment; -import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; import java.util.Map; import java.util.stream.Collectors; -public class NoOpPartitionAssignor implements PartitionAssignor { +public class NoOpPartitionAssignor implements ConsumerGroupPartitionAssignor { static final String NAME = "no-op"; @Override From a70ba104eed534a2eaeaa9c89904f23bcada6aa3 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Tue, 28 May 2024 18:08:32 +0100 Subject: [PATCH 2/3] Fix merge problems --- .../kafka/coordinator/group/GroupMetadataManagerTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index df5394383f195..cc86670f76a5f 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -12465,7 +12465,7 @@ public void testClassicGroupHeartbeatToConsumerGroupMaintainsSession() throws Ex // Consumer group with a member using the classic protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) .setClassicMemberMetadata( @@ -12561,7 +12561,7 @@ public void testClassicGroupHeartbeatToConsumerGroupRebalanceInProgress() throws .build(); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(member1) .withMember(member2) @@ -12700,7 +12700,7 @@ public void testConsumerGroupMemberUsingClassicProtocolFencedWhenSessionTimeout( // Consumer group with a member using the classic protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) .setClassicMemberMetadata( @@ -12761,7 +12761,7 @@ public void testConsumerGroupMemberUsingClassicProtocolFencedWhenJoinTimeout() { // Consumer group with a member using the classic protocol whose member epoch is smaller than the group epoch. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) .setRebalanceTimeoutMs(rebalanceTimeout) From eddb1d1663eff0e7b30c45f8f8a7ca7932f1514d Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Wed, 29 May 2024 10:36:24 +0100 Subject: [PATCH 3/3] Fix more merge problems --- .../kafka/coordinator/group/GroupMetadataManagerTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index cc86670f76a5f..3664a7a61d295 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -12891,7 +12891,7 @@ public void testConsumerGroupMemberUsingClassicProtocolBatchLeaveGroup() { // Static member 2 uses the classic protocol. // Static member 3 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 2) .addTopic(barTopicId, barTopicName, 1) @@ -13065,7 +13065,7 @@ public void testConsumerGroupMemberUsingClassicProtocolBatchLeaveGroupUpdatingSu // Consumer group with two members. // Member 1 uses the classic protocol and member 2 uses the consumer protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 2) .addTopic(barTopicId, barTopicName, 1) @@ -13130,7 +13130,7 @@ public void testClassicGroupLeaveToConsumerGroupWithoutValidLeaveGroupMember() { // Consumer group without member using the classic protocol. GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) + .withConsumerGroupAssignors(Collections.singletonList(new MockPartitionAssignor("range"))) .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) .withMember(new ConsumerGroupMember.Builder(memberId) .build()))