Skip to content

Commit

Permalink
KAFKA-16722: Introduce ConsumerGroupPartitionAssignor interface (apac…
Browse files Browse the repository at this point in the history
…he#15998)

KIP-932 introduces share groups to go alongside consumer groups. Both kinds of group use server-side assignors but it is unlikely that a single assignor class would be suitable for both. As a result, the KIP introduces specific interfaces for consumer group and share group partition assignors.

This PR introduces only the consumer group interface, `o.a.k.coordinator.group.assignor.ConsumerGroupPartitionAssignor`. The share group interface will come in a later release. The existing implementations of the general `PartitionAssignor` interface have been changed to implement `ConsumerGroupPartitionAssignor` instead and all other code changes are just propagating the change throughout the codebase.

Note that the code in the group coordinator that actually calculates assignments uses the general `PartitionAssignor` interface so that it can be used with both kinds of group, even though the assignors themselves are specific.

Reviewers: Apoorv Mittal <[email protected]>, David Jacot <[email protected]>
  • Loading branch information
AndrewJSchofield authored May 29, 2024
1 parent 0b75cf7 commit 2d9994e
Show file tree
Hide file tree
Showing 12 changed files with 139 additions and 104 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
Expand Down Expand Up @@ -953,7 +953,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val consumerGroupMinHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)
val consumerGroupMaxHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)
val consumerGroupMaxSize = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG)
val consumerGroupAssignors = getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, classOf[PartitionAssignor])
val consumerGroupAssignors = getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, classOf[ConsumerGroupPartitionAssignor])
val consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG))

/** ********* Offset management configuration ***********/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;

Expand Down Expand Up @@ -182,7 +182,7 @@ public class GroupCoordinatorConfig {
/**
* The consumer group assignors.
*/
public final List<PartitionAssignor> consumerGroupAssignors;
public final List<ConsumerGroupPartitionAssignor> consumerGroupAssignors;

/**
* The offsets topic segment bytes should be kept relatively small to facilitate faster
Expand Down Expand Up @@ -262,7 +262,7 @@ public GroupCoordinatorConfig(
int consumerGroupSessionTimeoutMs,
int consumerGroupHeartbeatIntervalMs,
int consumerGroupMaxSize,
List<PartitionAssignor> consumerGroupAssignors,
List<ConsumerGroupPartitionAssignor> consumerGroupAssignors,
int offsetsTopicSegmentBytes,
int offsetMetadataMaxSize,
int classicGroupMaxSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.consumer.Assignment;
Expand Down Expand Up @@ -158,7 +158,7 @@ public static class Builder {
private SnapshotRegistry snapshotRegistry = null;
private Time time = null;
private CoordinatorTimer<Void, CoordinatorRecord> timer = null;
private List<PartitionAssignor> consumerGroupAssignors = null;
private List<ConsumerGroupPartitionAssignor> consumerGroupAssignors = null;
private int consumerGroupMaxSize = Integer.MAX_VALUE;
private int consumerGroupHeartbeatIntervalMs = 5000;
private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
Expand Down Expand Up @@ -192,7 +192,7 @@ Builder withTimer(CoordinatorTimer<Void, CoordinatorRecord> timer) {
return this;
}

Builder withConsumerGroupAssignors(List<PartitionAssignor> consumerGroupAssignors) {
Builder withConsumerGroupAssignors(List<ConsumerGroupPartitionAssignor> consumerGroupAssignors) {
this.consumerGroupAssignors = consumerGroupAssignors;
return this;
}
Expand Down Expand Up @@ -323,14 +323,14 @@ GroupMetadataManager build() {
private final GroupCoordinatorMetricsShard metrics;

/**
* The supported partition assignors keyed by their name.
* The supported consumer group partition assignors keyed by their name.
*/
private final Map<String, PartitionAssignor> assignors;
private final Map<String, ConsumerGroupPartitionAssignor> consumerGroupAssignors;

/**
* The default assignor used.
* The default consumer group assignor used.
*/
private final PartitionAssignor defaultAssignor;
private final ConsumerGroupPartitionAssignor defaultConsumerGroupAssignor;

/**
* The classic and consumer groups keyed by their name.
Expand Down Expand Up @@ -412,7 +412,7 @@ private GroupMetadataManager(
Time time,
CoordinatorTimer<Void, CoordinatorRecord> timer,
GroupCoordinatorMetricsShard metrics,
List<PartitionAssignor> assignors,
List<ConsumerGroupPartitionAssignor> consumerGroupAssignors,
MetadataImage metadataImage,
int consumerGroupMaxSize,
int consumerGroupSessionTimeoutMs,
Expand All @@ -432,8 +432,8 @@ private GroupMetadataManager(
this.timer = timer;
this.metrics = metrics;
this.metadataImage = metadataImage;
this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity()));
this.defaultAssignor = assignors.get(0);
this.consumerGroupAssignors = consumerGroupAssignors.stream().collect(Collectors.toMap(ConsumerGroupPartitionAssignor::name, Function.identity()));
this.defaultConsumerGroupAssignor = consumerGroupAssignors.get(0);
this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
this.consumerGroupMaxSize = consumerGroupMaxSize;
Expand Down Expand Up @@ -539,7 +539,7 @@ public List<ConsumerGroupDescribeResponseData.DescribedGroup> consumerGroupDescr
try {
describedGroups.add(consumerGroup(groupId, committedOffset).asDescribedGroup(
committedOffset,
defaultAssignor.name(),
defaultConsumerGroupAssignor.name(),
metadataImage.topics()
));
} catch (GroupIdNotFoundException exception) {
Expand Down Expand Up @@ -1041,9 +1041,9 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
throw new InvalidRequestException("MemberEpoch is invalid.");
}

if (request.serverAssignor() != null && !assignors.containsKey(request.serverAssignor())) {
if (request.serverAssignor() != null && !consumerGroupAssignors.containsKey(request.serverAssignor())) {
throw new UnsupportedAssignorException("ServerAssignor " + request.serverAssignor()
+ " is not supported. Supported assignors: " + String.join(", ", assignors.keySet())
+ " is not supported. Supported assignors: " + String.join(", ", consumerGroupAssignors.keySet())
+ ".");
}
}
Expand Down Expand Up @@ -1892,10 +1892,10 @@ private Assignment updateTargetAssignment(
String preferredServerAssignor = group.computePreferredServerAssignor(
member,
updatedMember
).orElse(defaultAssignor.name());
).orElse(defaultConsumerGroupAssignor.name());
try {
TargetAssignmentBuilder assignmentResultBuilder =
new TargetAssignmentBuilder(group.groupId(), groupEpoch, assignors.get(preferredServerAssignor))
new TargetAssignmentBuilder(group.groupId(), groupEpoch, consumerGroupAssignors.get(preferredServerAssignor))
.withMembers(group.members())
.withStaticMembers(group.staticMembers())
.withSubscriptionMetadata(subscriptionMetadata)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* Server-side partition assignor for consumer groups used by the GroupCoordinator.
*
* The interface is kept in an internal module until KIP-848 is fully
* implemented and ready to be released.
*/
@InterfaceStability.Unstable
public interface ConsumerGroupPartitionAssignor extends PartitionAssignor {
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.kafka.common.annotation.InterfaceStability;

/**
* Server side partition assignor used by the GroupCoordinator.
* Server-side partition assignor used by the GroupCoordinator.
*
* The interface is kept in an internal module until KIP-848 is fully
* implemented and ready to be released.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* movements during reassignment. (Sticky) </li>
* </ol>
*/
public class RangeAssignor implements PartitionAssignor {
public class RangeAssignor implements ConsumerGroupPartitionAssignor {
public static final String RANGE_ASSIGNOR_NAME = "range";

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* @see OptimizedUniformAssignmentBuilder
* @see GeneralUniformAssignmentBuilder
*/
public class UniformAssignor implements PartitionAssignor {
public class UniformAssignor implements ConsumerGroupPartitionAssignor {
private static final Logger LOG = LoggerFactory.getLogger(UniformAssignor.class);
public static final String UNIFORM_ASSIGNOR_NAME = "uniform";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.junit.jupiter.api.Test;

Expand All @@ -28,7 +28,7 @@
public class GroupCoordinatorConfigTest {
@Test
public void testConfigs() {
PartitionAssignor assignor = new RangeAssignor();
ConsumerGroupPartitionAssignor assignor = new RangeAssignor();
GroupCoordinatorConfig config = new GroupCoordinatorConfig(
10,
30,
Expand Down
Loading

0 comments on commit 2d9994e

Please sign in to comment.