From 8cbd2edfe782997380683cbfa7451a4f2de893f0 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Wed, 6 Nov 2024 11:44:10 +0000 Subject: [PATCH] KAFKA-17896: Admin.describeClassicGroups (#17680) The implementation of Admin.describeClassicGroups from KIP-1043. Reviewers: Manikumar Reddy --- .../org/apache/kafka/clients/admin/Admin.java | 22 ++ .../admin/ClassicGroupDescription.java | 155 ++++++++++++++ .../admin/DescribeClassicGroupsOptions.java | 41 ++++ .../admin/DescribeClassicGroupsResult.java | 69 +++++++ .../kafka/clients/admin/ForwardingAdmin.java | 5 + .../kafka/clients/admin/KafkaAdminClient.java | 12 ++ .../DescribeClassicGroupsHandler.java | 189 ++++++++++++++++++ .../internals/DescribeShareGroupsHandler.java | 2 +- .../kafka/common/ClassicGroupState.java | 58 ++++++ .../clients/admin/KafkaAdminClientTest.java | 172 ++++++++++++++++ .../kafka/clients/admin/MockAdminClient.java | 5 + ...TestingMetricsInterceptingAdminClient.java | 7 + 12 files changed, 736 insertions(+), 1 deletion(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/ClassicGroupDescription.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/DescribeClassicGroupsOptions.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/DescribeClassicGroupsResult.java create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java create mode 100644 clients/src/main/java/org/apache/kafka/common/ClassicGroupState.java diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index d6695566bc2fa..dd3b92cf44a21 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -1844,6 +1844,28 @@ default ListShareGroupsResult listShareGroups() { return listShareGroups(new ListShareGroupsOptions()); } + /** + * Describe some classic groups in the cluster. + * + * @param groupIds The IDs of the groups to describe. + * @param options The options to use when describing the groups. + * @return The DescribeClassicGroupsResult. + */ + DescribeClassicGroupsResult describeClassicGroups(Collection groupIds, + DescribeClassicGroupsOptions options); + + /** + * Describe some classic groups in the cluster, with the default options. + *

+ * This is a convenience method for {@link #describeClassicGroups(Collection, DescribeClassicGroupsOptions)} + * with default options. See the overload for more details. + * + * @param groupIds The IDs of the groups to describe. + * @return The DescribeClassicGroupsResult. + */ + default DescribeClassicGroupsResult describeClassicGroups(Collection groupIds) { + return describeClassicGroups(groupIds, new DescribeClassicGroupsOptions()); + } /** * Add the provided application metric for subscription. diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ClassicGroupDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/ClassicGroupDescription.java new file mode 100644 index 0000000000000..065c12214552f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ClassicGroupDescription.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.ClassicGroupState; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.acl.AclOperation; + +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A detailed description of a single classic group in the cluster. + */ +public class ClassicGroupDescription { + private final String groupId; + private final String protocol; + private final String protocolData; + private final Collection members; + private final ClassicGroupState state; + private final Node coordinator; + private final Set authorizedOperations; + + public ClassicGroupDescription(String groupId, + String protocol, + String protocolData, + Collection members, + ClassicGroupState state, + Node coordinator) { + this(groupId, protocol, protocolData, members, state, coordinator, Set.of()); + } + + public ClassicGroupDescription(String groupId, + String protocol, + String protocolData, + Collection members, + ClassicGroupState state, + Node coordinator, + Set authorizedOperations) { + this.groupId = groupId == null ? "" : groupId; + this.protocol = protocol; + this.protocolData = protocolData == null ? "" : protocolData; + this.members = members == null ? List.of() : List.copyOf(members); + this.state = state; + this.coordinator = coordinator; + this.authorizedOperations = authorizedOperations; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final ClassicGroupDescription that = (ClassicGroupDescription) o; + return Objects.equals(groupId, that.groupId) && + Objects.equals(protocol, that.protocol) && + Objects.equals(protocolData, that.protocolData) && + Objects.equals(members, that.members) && + state == that.state && + Objects.equals(coordinator, that.coordinator) && + Objects.equals(authorizedOperations, that.authorizedOperations); + } + + @Override + public int hashCode() { + return Objects.hash(groupId, protocol, protocolData, members, state, coordinator, authorizedOperations); + } + + /** + * The id of the classic group. + */ + public String groupId() { + return groupId; + } + + /** + * The group protocol type. + */ + public String protocol() { + return protocol; + } + + /** + * The group protocol data. The meaning depends on the group protocol type. + * For a classic consumer group, this is the partition assignor name. + * For a classic connect group, this indicates which Connect protocols are enabled. + */ + public String protocolData() { + return protocolData; + } + + /** + * If the group is a simple consumer group or not. + */ + public boolean isSimpleConsumerGroup() { + return protocol.isEmpty(); + } + + /** + * A list of the members of the classic group. + */ + public Collection members() { + return members; + } + + /** + * The classic group state, or UNKNOWN if the state is too new for us to parse. + */ + public ClassicGroupState state() { + return state; + } + + /** + * The classic group coordinator, or null if the coordinator is not known. + */ + public Node coordinator() { + return coordinator; + } + + /** + * authorizedOperations for this group, or null if that information is not known. + */ + public Set authorizedOperations() { + return authorizedOperations; + } + + @Override + public String toString() { + return "(groupId=" + groupId + + ", protocol='" + protocol + '\'' + + ", protocolData=" + protocolData + + ", members=" + members.stream().map(MemberDescription::toString).collect(Collectors.joining(",")) + + ", state=" + state + + ", coordinator=" + coordinator + + ", authorizedOperations=" + authorizedOperations + + ")"; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClassicGroupsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClassicGroupsOptions.java new file mode 100644 index 0000000000000..ce2149d103f4f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClassicGroupsOptions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; + +/** + * Options for {@link Admin#describeClassicGroups(Collection, DescribeClassicGroupsOptions)}. + *

+ * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeClassicGroupsOptions extends AbstractOptions { + private boolean includeAuthorizedOperations; + + public DescribeClassicGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations) { + this.includeAuthorizedOperations = includeAuthorizedOperations; + return this; + } + + public boolean includeAuthorizedOperations() { + return includeAuthorizedOperations; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClassicGroupsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClassicGroupsResult.java new file mode 100644 index 0000000000000..8ee38e565469c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClassicGroupsResult.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + + +/** + * The result of the {@link Admin#describeClassicGroups(Collection, DescribeClassicGroupsOptions)}} call. + *

+ * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class DescribeClassicGroupsResult { + + private final Map> futures; + + public DescribeClassicGroupsResult(final Map> futures) { + this.futures = futures; + } + + /** + * Return a map from group id to futures which yield group descriptions. + */ + public Map> describedGroups() { + return new HashMap<>(futures); + } + + /** + * Return a future which yields all ClassicGroupDescription objects, if all the describes succeed. + */ + public KafkaFuture> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply( + nil -> { + Map descriptions = new HashMap<>(futures.size()); + futures.forEach((key, future) -> { + try { + descriptions.put(key, future.get()); + } catch (InterruptedException | ExecutionException e) { + // This should be unreachable, since the KafkaFuture#allOf already ensured + // that all of the futures completed successfully. + throw new RuntimeException(e); + } + }); + return descriptions; + }); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java index 87e350c5e7ac5..285acaf695e30 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java @@ -314,6 +314,11 @@ public ListGroupsResult listGroups(ListGroupsOptions options) { return delegate.listGroups(options); } + @Override + public DescribeClassicGroupsResult describeClassicGroups(Collection groupIds, DescribeClassicGroupsOptions options) { + return delegate.describeClassicGroups(groupIds, options); + } + @Override public void registerMetricForSubscription(KafkaMetric metric) { throw new UnsupportedOperationException(); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 7c7cbb99baaa3..46da3a2cec76c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -49,6 +49,7 @@ import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler; import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler; import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler; +import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler; import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler; import org.apache.kafka.clients.admin.internals.DescribeProducersHandler; import org.apache.kafka.clients.admin.internals.DescribeShareGroupsHandler; @@ -3965,6 +3966,17 @@ void handleFailure(Throwable throwable) { return new ListShareGroupsResult(all); } + @Override + public DescribeClassicGroupsResult describeClassicGroups(final Collection groupIds, + final DescribeClassicGroupsOptions options) { + SimpleAdminApiFuture future = + DescribeClassicGroupsHandler.newFuture(groupIds); + DescribeClassicGroupsHandler handler = new DescribeClassicGroupsHandler(options.includeAuthorizedOperations(), logContext); + invokeDriver(handler, future, options.timeoutMs); + return new DescribeClassicGroupsResult(future.all().entrySet().stream() + .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); + } + @Override public Map metrics() { return Collections.unmodifiableMap(this.metrics.metrics()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java new file mode 100644 index 0000000000000..77c04c5d5f02e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.clients.admin.ClassicGroupDescription; +import org.apache.kafka.clients.admin.MemberAssignment; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment; +import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.ClassicGroupState; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.message.DescribeGroupsRequestData; +import org.apache.kafka.common.message.DescribeGroupsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.DescribeGroupsRequest; +import org.apache.kafka.common.requests.DescribeGroupsResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType; +import org.apache.kafka.common.utils.LogContext; + +import org.slf4j.Logger; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.kafka.clients.admin.internals.AdminUtils.validAclOperations; + +public class DescribeClassicGroupsHandler extends AdminApiHandler.Batched { + + private final boolean includeAuthorizedOperations; + private final Logger log; + private final AdminApiLookupStrategy lookupStrategy; + + public DescribeClassicGroupsHandler( + boolean includeAuthorizedOperations, + LogContext logContext + ) { + this.includeAuthorizedOperations = includeAuthorizedOperations; + this.log = logContext.logger(DescribeConsumerGroupsHandler.class); + this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext); + } + + private static Set buildKeySet(Collection groupIds) { + return groupIds.stream() + .map(CoordinatorKey::byGroupId) + .collect(Collectors.toSet()); + } + + public static AdminApiFuture.SimpleAdminApiFuture newFuture(Collection groupIds) { + return AdminApiFuture.forKeys(buildKeySet(groupIds)); + } + + @Override + public String apiName() { + return "describeClassicGroups"; + } + + @Override + public AdminApiLookupStrategy lookupStrategy() { + return lookupStrategy; + } + + @Override + public DescribeGroupsRequest.Builder buildBatchedRequest(int coordinatorId, Set keys) { + List groupIds = keys.stream().map(key -> { + if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) { + throw new IllegalArgumentException("Invalid group coordinator key " + key + + " when building `DescribeGroups` request"); + } + return key.idValue; + }).collect(Collectors.toList()); + DescribeGroupsRequestData data = new DescribeGroupsRequestData() + .setGroups(groupIds) + .setIncludeAuthorizedOperations(includeAuthorizedOperations); + return new DescribeGroupsRequest.Builder(data); + } + + @Override + public ApiResult handleResponse( + Node coordinator, + Set groupIds, + AbstractResponse abstractResponse) { + final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; + final Map completed = new HashMap<>(); + final Map failed = new HashMap<>(); + final Set groupsToUnmap = new HashSet<>(); + + for (DescribeGroupsResponseData.DescribedGroup describedGroup : response.data().groups()) { + CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(describedGroup.groupId()); + Errors error = Errors.forCode(describedGroup.errorCode()); + if (error != Errors.NONE) { + handleError(groupIdKey, error, error.message(), failed, groupsToUnmap); + continue; + } + + final List memberDescriptions = new ArrayList<>(describedGroup.members().size()); + final Set authorizedOperations = validAclOperations(describedGroup.authorizedOperations()); + + final String protocolType = describedGroup.protocolType(); + final boolean isConsumerGroup = protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty(); + describedGroup.members().forEach(groupMember -> { + Set partitions = Collections.emptySet(); + if (isConsumerGroup && groupMember.memberAssignment().length > 0) { + // We can only deserialize the assignment for a classic consumer group + final Assignment assignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(groupMember.memberAssignment())); + partitions = new HashSet<>(assignment.partitions()); + } + memberDescriptions.add(new MemberDescription( + groupMember.memberId(), + Optional.ofNullable(groupMember.groupInstanceId()), + groupMember.clientId(), + groupMember.clientHost(), + new MemberAssignment(partitions))); + }); + + final ClassicGroupDescription classicGroupDescription = + new ClassicGroupDescription( + groupIdKey.idValue, + protocolType, + describedGroup.protocolData(), + memberDescriptions, + ClassicGroupState.parse(describedGroup.groupState()), + coordinator, + authorizedOperations); + completed.put(groupIdKey, classicGroupDescription); + } + + return new ApiResult<>(completed, failed, List.copyOf(groupsToUnmap)); + } + + private void handleError( + CoordinatorKey groupId, + Errors error, + String errorMsg, + Map failed, + Set groupsToUnmap) { + switch (error) { + case GROUP_AUTHORIZATION_FAILED: + log.debug("`DescribeGroups` request for group id {} failed due to error {}.", groupId.idValue, error); + failed.put(groupId, error.exception(errorMsg)); + break; + + case COORDINATOR_LOAD_IN_PROGRESS: + // If the coordinator is in the middle of loading, then we just need to retry + log.debug("`DescribeGroups` request for group id {} failed because the coordinator " + + "is still in the process of loading state. Will retry.", groupId.idValue); + break; + + case COORDINATOR_NOT_AVAILABLE: + case NOT_COORDINATOR: + // If the coordinator is unavailable or there was a coordinator change, then we unmap + // the key so that we retry the `FindCoordinator` request + log.debug("`DescribeGroups` request for group id {} returned error {}. " + + "Will attempt to find the coordinator again and retry.", groupId.idValue, error); + groupsToUnmap.add(groupId); + break; + + default: + log.error("`DescribeGroups` request for group id {} failed due to unexpected error {}.", groupId.idValue, error); + failed.put(groupId, error.exception(errorMsg)); + } + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java index 20ce9e003ed0d..80a112a5ed553 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java @@ -85,7 +85,7 @@ public AdminApiLookupStrategy lookupStrategy() { public ShareGroupDescribeRequest.Builder buildBatchedRequest(int coordinatorId, Set keys) { List groupIds = keys.stream().map(key -> { if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) { - throw new IllegalArgumentException("Invalid transaction coordinator key " + key + + throw new IllegalArgumentException("Invalid group coordinator key " + key + " when building `DescribeShareGroups` request"); } return key.idValue; diff --git a/clients/src/main/java/org/apache/kafka/common/ClassicGroupState.java b/clients/src/main/java/org/apache/kafka/common/ClassicGroupState.java new file mode 100644 index 0000000000000..65aaa1a6d3bed --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/ClassicGroupState.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * The classic group state. + */ +public enum ClassicGroupState { + UNKNOWN("Unknown"), + PREPARING_REBALANCE("PreparingRebalance"), + COMPLETING_REBALANCE("CompletingRebalance"), + STABLE("Stable"), + DEAD("Dead"), + EMPTY("Empty"); + + private static final Map NAME_TO_ENUM = Arrays.stream(values()) + .collect(Collectors.toMap(state -> state.name.toUpperCase(Locale.ROOT), Function.identity())); + + private final String name; + + ClassicGroupState(String name) { + this.name = name; + } + + /** + * Case-insensitive classic group state lookup by string name. + */ + public static ClassicGroupState parse(String name) { + ClassicGroupState state = NAME_TO_ENUM.get(name.toUpperCase(Locale.ROOT)); + return state == null ? UNKNOWN : state; + } + + @Override + public String toString() { + return name; + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index bf7a624cc4ea8..1b44b93c70406 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.internals.ConsumerProtocol; +import org.apache.kafka.common.ClassicGroupState; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.ElectionType; @@ -5320,6 +5321,177 @@ public void testListShareGroupsWithStatesOlderBrokerVersion() throws Exception { } } + @Test + public void testDescribeClassicGroups() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Retriable FindCoordinatorResponse errors should be retried + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, Node.noNode())); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + DescribeGroupsResponseData data = new DescribeGroupsResponseData(); + + // Retriable errors should be retried + data.groups().add(new DescribeGroupsResponseData.DescribedGroup() + .setGroupId(GROUP_ID) + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())); + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); + + /* + * We need to return two responses here, one with NOT_COORDINATOR error when calling describe classic group + * api using coordinator that has moved. This will retry whole operation. So we need to again respond with a + * FindCoordinatorResponse. + * + * And the same reason for COORDINATOR_NOT_AVAILABLE error response + */ + data = new DescribeGroupsResponseData(); + data.groups().add(new DescribeGroupsResponseData.DescribedGroup() + .setGroupId(GROUP_ID) + .setErrorCode(Errors.NOT_COORDINATOR.code())); + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + data = new DescribeGroupsResponseData(); + data.groups().add(new DescribeGroupsResponseData.DescribedGroup() + .setGroupId(GROUP_ID) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())); + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + final List topicPartitions = List.of( + new TopicPartition("my_topic", 0), + new TopicPartition("my_topic", 1), + new TopicPartition("my_topic", 2)); + final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions)); + final byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()]; + memberAssignment.get(memberAssignmentBytes); + + data = new DescribeGroupsResponseData(); + DescribeGroupsResponseData.DescribedGroupMember memberOne = new DescribeGroupsResponseData.DescribedGroupMember() + .setMemberId("0") + .setClientId("clientId0") + .setClientHost("clientHost") + .setMemberAssignment(memberAssignmentBytes); + DescribeGroupsResponseData.DescribedGroupMember memberTwo = new DescribeGroupsResponseData.DescribedGroupMember() + .setMemberId("1") + .setClientId("clientId1") + .setClientHost("clientHost") + .setGroupInstanceId("static") + .setMemberAssignment(memberAssignmentBytes); + + final List expectedTopicPartitions = new ArrayList<>(); + expectedTopicPartitions.add(0, new TopicPartition("my_topic", 0)); + expectedTopicPartitions.add(1, new TopicPartition("my_topic", 1)); + expectedTopicPartitions.add(2, new TopicPartition("my_topic", 2)); + + List expectedMemberDescriptions = new ArrayList<>(); + expectedMemberDescriptions.add(convertToMemberDescriptions(memberOne, + new MemberAssignment(new HashSet<>(expectedTopicPartitions)))); + expectedMemberDescriptions.add(convertToMemberDescriptions(memberTwo, + new MemberAssignment(new HashSet<>(expectedTopicPartitions)))); + data.groups().add(new DescribeGroupsResponseData.DescribedGroup() + .setGroupId(GROUP_ID) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(ClassicGroupState.STABLE.toString()) + .setMembers(List.of(memberOne, memberTwo))); + + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); + + final DescribeClassicGroupsResult result = env.adminClient().describeClassicGroups(List.of(GROUP_ID)); + final ClassicGroupDescription groupDescription = result.describedGroups().get(GROUP_ID).get(); + + assertEquals(1, result.describedGroups().size()); + assertEquals(GROUP_ID, groupDescription.groupId()); + assertEquals(2, groupDescription.members().size()); + assertEquals(expectedMemberDescriptions, groupDescription.members()); + } + } + + @Test + public void testDescribeClassicGroupsWithAuthorizedOperationsOmitted() throws Exception { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse( + prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + DescribeGroupsResponseData data = new DescribeGroupsResponseData(); + + data.groups().add(new DescribeGroupsResponseData.DescribedGroup() + .setGroupId(GROUP_ID) + .setProtocolType("") + .setAuthorizedOperations(MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED)); + + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(data)); + + final DescribeClassicGroupsResult result = env.adminClient().describeClassicGroups(List.of(GROUP_ID)); + final ClassicGroupDescription groupDescription = result.describedGroups().get(GROUP_ID).get(); + + assertNull(groupDescription.authorizedOperations()); + } + } + + @Test + public void testDescribeMultipleClassicGroups() { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(mockCluster(1, 0))) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + + final List topicPartitions = List.of( + new TopicPartition("my_topic", 0), + new TopicPartition("my_topic", 1), + new TopicPartition("my_topic", 2)); + final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions)); + final byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()]; + memberAssignment.get(memberAssignmentBytes); + + DescribeGroupsResponseData group0Data = new DescribeGroupsResponseData(); + group0Data.groups().add(new DescribeGroupsResponseData.DescribedGroup() + .setGroupId(GROUP_ID) + .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE) + .setGroupState(ClassicGroupState.STABLE.toString()) + .setMembers(List.of( + new DescribeGroupsResponseData.DescribedGroupMember() + .setMemberId("0") + .setClientId("clientId0") + .setClientHost("clientHost") + .setMemberAssignment(memberAssignmentBytes), + new DescribeGroupsResponseData.DescribedGroupMember() + .setMemberId("1") + .setClientId("clientId1") + .setClientHost("clientHost") + .setMemberAssignment(memberAssignmentBytes)))); + + DescribeGroupsResponseData group1Data = new DescribeGroupsResponseData(); + group1Data.groups().add(new DescribeGroupsResponseData.DescribedGroup() + .setGroupId("group-1") + .setProtocolType("other") + .setGroupState(ClassicGroupState.STABLE.toString()) + .setMembers(List.of( + new DescribeGroupsResponseData.DescribedGroupMember() + .setMemberId("0") + .setClientId("clientId0") + .setClientHost("clientHost"), + new DescribeGroupsResponseData.DescribedGroupMember() + .setMemberId("1") + .setClientId("clientId1") + .setClientHost("clientHost")))); + + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(group0Data)); + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(group1Data)); + + Collection groups = new HashSet<>(); + groups.add(GROUP_ID); + groups.add("group-1"); + final DescribeClassicGroupsResult result = env.adminClient().describeClassicGroups(groups); + assertEquals(2, result.describedGroups().size()); + assertEquals(groups, result.describedGroups().keySet()); + } + } + @Test public void testIncrementalAlterConfigs() throws Exception { try (AdminClientUnitTestEnv env = mockClientEnv()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index 2eef603c4a028..482240e57779d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -1399,6 +1399,11 @@ public synchronized ListShareGroupsResult listShareGroups(ListShareGroupsOptions throw new UnsupportedOperationException("Not implemented yet"); } + @Override + public synchronized DescribeClassicGroupsResult describeClassicGroups(Collection groupIds, DescribeClassicGroupsOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + @Override public synchronized void close(Duration timeout) {} diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java index a01324ef7fd85..0afdd1fc85cca 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TestingMetricsInterceptingAdminClient.java @@ -58,6 +58,8 @@ import org.apache.kafka.clients.admin.DeleteTopicsResult; import org.apache.kafka.clients.admin.DescribeAclsOptions; import org.apache.kafka.clients.admin.DescribeAclsResult; +import org.apache.kafka.clients.admin.DescribeClassicGroupsOptions; +import org.apache.kafka.clients.admin.DescribeClassicGroupsResult; import org.apache.kafka.clients.admin.DescribeClientQuotasOptions; import org.apache.kafka.clients.admin.DescribeClientQuotasResult; import org.apache.kafka.clients.admin.DescribeClusterOptions; @@ -425,6 +427,11 @@ public ListShareGroupsResult listShareGroups(final ListShareGroupsOptions option return adminDelegate.listShareGroups(options); } + @Override + public DescribeClassicGroupsResult describeClassicGroups(final Collection groupIds, final DescribeClassicGroupsOptions options) { + return adminDelegate.describeClassicGroups(groupIds, options); + } + @Override public void registerMetricForSubscription(final KafkaMetric metric) { passedMetrics.add(metric);