Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16722: Introduce ConsumerGroupPartitionAssignor interface #15998

Merged
merged 3 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.ConsumerGroupMigrationPolicy
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor
import org.apache.kafka.coordinator.transaction.{TransactionLogConfigs, TransactionStateManagerConfigs}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.QuorumConfig
Expand Down Expand Up @@ -953,7 +953,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
val consumerGroupMinHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)
val consumerGroupMaxHeartbeatIntervalMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)
val consumerGroupMaxSize = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SIZE_CONFIG)
val consumerGroupAssignors = getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, classOf[PartitionAssignor])
val consumerGroupAssignors = getConfiguredInstances(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, classOf[ConsumerGroupPartitionAssignor])
val consumerGroupMigrationPolicy = ConsumerGroupMigrationPolicy.parse(getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG))

/** ********* Offset management configuration ***********/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.assignor.UniformAssignor;

Expand Down Expand Up @@ -182,7 +182,7 @@ public class GroupCoordinatorConfig {
/**
* The consumer group assignors.
*/
public final List<PartitionAssignor> consumerGroupAssignors;
public final List<ConsumerGroupPartitionAssignor> consumerGroupAssignors;

/**
* The offsets topic segment bytes should be kept relatively small to facilitate faster
Expand Down Expand Up @@ -262,7 +262,7 @@ public GroupCoordinatorConfig(
int consumerGroupSessionTimeoutMs,
int consumerGroupHeartbeatIntervalMs,
int consumerGroupMaxSize,
List<PartitionAssignor> consumerGroupAssignors,
List<ConsumerGroupPartitionAssignor> consumerGroupAssignors,
int offsetsTopicSegmentBytes,
int offsetMetadataMaxSize,
int classicGroupMaxSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignorException;
import org.apache.kafka.coordinator.group.assignor.SubscriptionType;
import org.apache.kafka.coordinator.group.consumer.Assignment;
Expand Down Expand Up @@ -158,7 +158,7 @@ public static class Builder {
private SnapshotRegistry snapshotRegistry = null;
private Time time = null;
private CoordinatorTimer<Void, CoordinatorRecord> timer = null;
private List<PartitionAssignor> consumerGroupAssignors = null;
private List<ConsumerGroupPartitionAssignor> consumerGroupAssignors = null;
private int consumerGroupMaxSize = Integer.MAX_VALUE;
private int consumerGroupHeartbeatIntervalMs = 5000;
private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
Expand Down Expand Up @@ -192,7 +192,7 @@ Builder withTimer(CoordinatorTimer<Void, CoordinatorRecord> timer) {
return this;
}

Builder withConsumerGroupAssignors(List<PartitionAssignor> consumerGroupAssignors) {
Builder withConsumerGroupAssignors(List<ConsumerGroupPartitionAssignor> consumerGroupAssignors) {
this.consumerGroupAssignors = consumerGroupAssignors;
return this;
}
Expand Down Expand Up @@ -323,14 +323,14 @@ GroupMetadataManager build() {
private final GroupCoordinatorMetricsShard metrics;

/**
* The supported partition assignors keyed by their name.
* The supported consumer group partition assignors keyed by their name.
*/
private final Map<String, PartitionAssignor> assignors;
private final Map<String, ConsumerGroupPartitionAssignor> consumerGroupAssignors;

/**
* The default assignor used.
* The default consumer group assignor used.
*/
private final PartitionAssignor defaultAssignor;
private final ConsumerGroupPartitionAssignor defaultConsumerGroupAssignor;

/**
* The classic and consumer groups keyed by their name.
Expand Down Expand Up @@ -412,7 +412,7 @@ private GroupMetadataManager(
Time time,
CoordinatorTimer<Void, CoordinatorRecord> timer,
GroupCoordinatorMetricsShard metrics,
List<PartitionAssignor> assignors,
List<ConsumerGroupPartitionAssignor> consumerGroupAssignors,
MetadataImage metadataImage,
int consumerGroupMaxSize,
int consumerGroupSessionTimeoutMs,
Expand All @@ -432,8 +432,8 @@ private GroupMetadataManager(
this.timer = timer;
this.metrics = metrics;
this.metadataImage = metadataImage;
this.assignors = assignors.stream().collect(Collectors.toMap(PartitionAssignor::name, Function.identity()));
this.defaultAssignor = assignors.get(0);
this.consumerGroupAssignors = consumerGroupAssignors.stream().collect(Collectors.toMap(ConsumerGroupPartitionAssignor::name, Function.identity()));
this.defaultConsumerGroupAssignor = consumerGroupAssignors.get(0);
this.groups = new TimelineHashMap<>(snapshotRegistry, 0);
this.groupsByTopics = new TimelineHashMap<>(snapshotRegistry, 0);
this.consumerGroupMaxSize = consumerGroupMaxSize;
Expand Down Expand Up @@ -539,7 +539,7 @@ public List<ConsumerGroupDescribeResponseData.DescribedGroup> consumerGroupDescr
try {
describedGroups.add(consumerGroup(groupId, committedOffset).asDescribedGroup(
committedOffset,
defaultAssignor.name(),
defaultConsumerGroupAssignor.name(),
metadataImage.topics()
));
} catch (GroupIdNotFoundException exception) {
Expand Down Expand Up @@ -1041,9 +1041,9 @@ private void throwIfConsumerGroupHeartbeatRequestIsInvalid(
throw new InvalidRequestException("MemberEpoch is invalid.");
}

if (request.serverAssignor() != null && !assignors.containsKey(request.serverAssignor())) {
if (request.serverAssignor() != null && !consumerGroupAssignors.containsKey(request.serverAssignor())) {
throw new UnsupportedAssignorException("ServerAssignor " + request.serverAssignor()
+ " is not supported. Supported assignors: " + String.join(", ", assignors.keySet())
+ " is not supported. Supported assignors: " + String.join(", ", consumerGroupAssignors.keySet())
+ ".");
}
}
Expand Down Expand Up @@ -1892,10 +1892,10 @@ private Assignment updateTargetAssignment(
String preferredServerAssignor = group.computePreferredServerAssignor(
member,
updatedMember
).orElse(defaultAssignor.name());
).orElse(defaultConsumerGroupAssignor.name());
try {
TargetAssignmentBuilder assignmentResultBuilder =
new TargetAssignmentBuilder(group.groupId(), groupEpoch, assignors.get(preferredServerAssignor))
new TargetAssignmentBuilder(group.groupId(), groupEpoch, consumerGroupAssignors.get(preferredServerAssignor))
.withMembers(group.members())
.withStaticMembers(group.staticMembers())
.withSubscriptionMetadata(subscriptionMetadata)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.coordinator.group.assignor;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* Server-side partition assignor for consumer groups used by the GroupCoordinator.
*
* The interface is kept in an internal module until KIP-848 is fully
* implemented and ready to be released.
*/
@InterfaceStability.Unstable
public interface ConsumerGroupPartitionAssignor extends PartitionAssignor {
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.kafka.common.annotation.InterfaceStability;

/**
* Server side partition assignor used by the GroupCoordinator.
* Server-side partition assignor used by the GroupCoordinator.
*
* The interface is kept in an internal module until KIP-848 is fully
* implemented and ready to be released.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
* movements during reassignment. (Sticky) </li>
* </ol>
*/
public class RangeAssignor implements PartitionAssignor {
public class RangeAssignor implements ConsumerGroupPartitionAssignor {
public static final String RANGE_ASSIGNOR_NAME = "range";

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
* @see OptimizedUniformAssignmentBuilder
* @see GeneralUniformAssignmentBuilder
*/
public class UniformAssignor implements PartitionAssignor {
public class UniformAssignor implements ConsumerGroupPartitionAssignor {
private static final Logger LOG = LoggerFactory.getLogger(UniformAssignor.class);
public static final String UNIFORM_ASSIGNOR_NAME = "uniform";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.coordinator.group;

import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.junit.jupiter.api.Test;

Expand All @@ -28,7 +28,7 @@
public class GroupCoordinatorConfigTest {
@Test
public void testConfigs() {
PartitionAssignor assignor = new RangeAssignor();
ConsumerGroupPartitionAssignor assignor = new RangeAssignor();
GroupCoordinatorConfig config = new GroupCoordinatorConfig(
10,
30,
Expand Down
Loading