Skip to content

Commit

Permalink
KAFKA-17896: Admin.describeClassicGroups (#17680)
Browse files Browse the repository at this point in the history
The implementation of Admin.describeClassicGroups from KIP-1043.

Reviewers: Manikumar Reddy <[email protected]>
  • Loading branch information
AndrewJSchofield authored Nov 6, 2024
1 parent c40cb07 commit 8cbd2ed
Show file tree
Hide file tree
Showing 12 changed files with 736 additions and 1 deletion.
22 changes: 22 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1844,6 +1844,28 @@ default ListShareGroupsResult listShareGroups() {
return listShareGroups(new ListShareGroupsOptions());
}

/**
* Describe some classic groups in the cluster.
*
* @param groupIds The IDs of the groups to describe.
* @param options The options to use when describing the groups.
* @return The DescribeClassicGroupsResult.
*/
DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds,
DescribeClassicGroupsOptions options);

/**
* Describe some classic groups in the cluster, with the default options.
* <p>
* This is a convenience method for {@link #describeClassicGroups(Collection, DescribeClassicGroupsOptions)}
* with default options. See the overload for more details.
*
* @param groupIds The IDs of the groups to describe.
* @return The DescribeClassicGroupsResult.
*/
default DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds) {
return describeClassicGroups(groupIds, new DescribeClassicGroupsOptions());
}

/**
* Add the provided application metric for subscription.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.ClassicGroupState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A detailed description of a single classic group in the cluster.
*/
public class ClassicGroupDescription {
private final String groupId;
private final String protocol;
private final String protocolData;
private final Collection<MemberDescription> members;
private final ClassicGroupState state;
private final Node coordinator;
private final Set<AclOperation> authorizedOperations;

public ClassicGroupDescription(String groupId,
String protocol,
String protocolData,
Collection<MemberDescription> members,
ClassicGroupState state,
Node coordinator) {
this(groupId, protocol, protocolData, members, state, coordinator, Set.of());
}

public ClassicGroupDescription(String groupId,
String protocol,
String protocolData,
Collection<MemberDescription> members,
ClassicGroupState state,
Node coordinator,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.protocol = protocol;
this.protocolData = protocolData == null ? "" : protocolData;
this.members = members == null ? List.of() : List.copyOf(members);
this.state = state;
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final ClassicGroupDescription that = (ClassicGroupDescription) o;
return Objects.equals(groupId, that.groupId) &&
Objects.equals(protocol, that.protocol) &&
Objects.equals(protocolData, that.protocolData) &&
Objects.equals(members, that.members) &&
state == that.state &&
Objects.equals(coordinator, that.coordinator) &&
Objects.equals(authorizedOperations, that.authorizedOperations);
}

@Override
public int hashCode() {
return Objects.hash(groupId, protocol, protocolData, members, state, coordinator, authorizedOperations);
}

/**
* The id of the classic group.
*/
public String groupId() {
return groupId;
}

/**
* The group protocol type.
*/
public String protocol() {
return protocol;
}

/**
* The group protocol data. The meaning depends on the group protocol type.
* For a classic consumer group, this is the partition assignor name.
* For a classic connect group, this indicates which Connect protocols are enabled.
*/
public String protocolData() {
return protocolData;
}

/**
* If the group is a simple consumer group or not.
*/
public boolean isSimpleConsumerGroup() {
return protocol.isEmpty();
}

/**
* A list of the members of the classic group.
*/
public Collection<MemberDescription> members() {
return members;
}

/**
* The classic group state, or UNKNOWN if the state is too new for us to parse.
*/
public ClassicGroupState state() {
return state;
}

/**
* The classic group coordinator, or null if the coordinator is not known.
*/
public Node coordinator() {
return coordinator;
}

/**
* authorizedOperations for this group, or null if that information is not known.
*/
public Set<AclOperation> authorizedOperations() {
return authorizedOperations;
}

@Override
public String toString() {
return "(groupId=" + groupId +
", protocol='" + protocol + '\'' +
", protocolData=" + protocolData +
", members=" + members.stream().map(MemberDescription::toString).collect(Collectors.joining(",")) +
", state=" + state +
", coordinator=" + coordinator +
", authorizedOperations=" + authorizedOperations +
")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;

/**
* Options for {@link Admin#describeClassicGroups(Collection, DescribeClassicGroupsOptions)}.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DescribeClassicGroupsOptions extends AbstractOptions<DescribeClassicGroupsOptions> {
private boolean includeAuthorizedOperations;

public DescribeClassicGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations) {
this.includeAuthorizedOperations = includeAuthorizedOperations;
return this;
}

public boolean includeAuthorizedOperations() {
return includeAuthorizedOperations;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;


/**
* The result of the {@link Admin#describeClassicGroups(Collection, DescribeClassicGroupsOptions)}} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DescribeClassicGroupsResult {

private final Map<String, KafkaFuture<ClassicGroupDescription>> futures;

public DescribeClassicGroupsResult(final Map<String, KafkaFuture<ClassicGroupDescription>> futures) {
this.futures = futures;
}

/**
* Return a map from group id to futures which yield group descriptions.
*/
public Map<String, KafkaFuture<ClassicGroupDescription>> describedGroups() {
return new HashMap<>(futures);
}

/**
* Return a future which yields all ClassicGroupDescription objects, if all the describes succeed.
*/
public KafkaFuture<Map<String, ClassicGroupDescription>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(
nil -> {
Map<String, ClassicGroupDescription> descriptions = new HashMap<>(futures.size());
futures.forEach((key, future) -> {
try {
descriptions.put(key, future.get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, since the KafkaFuture#allOf already ensured
// that all of the futures completed successfully.
throw new RuntimeException(e);
}
});
return descriptions;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ public ListGroupsResult listGroups(ListGroupsOptions options) {
return delegate.listGroups(options);
}

@Override
public DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds, DescribeClassicGroupsOptions options) {
return delegate.describeClassicGroups(groupIds, options);
}

@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
import org.apache.kafka.clients.admin.internals.DescribeShareGroupsHandler;
Expand Down Expand Up @@ -3965,6 +3966,17 @@ void handleFailure(Throwable throwable) {
return new ListShareGroupsResult(all);
}

@Override
public DescribeClassicGroupsResult describeClassicGroups(final Collection<String> groupIds,
final DescribeClassicGroupsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, ClassicGroupDescription> future =
DescribeClassicGroupsHandler.newFuture(groupIds);
DescribeClassicGroupsHandler handler = new DescribeClassicGroupsHandler(options.includeAuthorizedOperations(), logContext);
invokeDriver(handler, future, options.timeoutMs);
return new DescribeClassicGroupsResult(future.all().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}

@Override
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(this.metrics.metrics());
Expand Down
Loading

0 comments on commit 8cbd2ed

Please sign in to comment.