Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17896: Admin.describeClassicGroups #17680

Merged
merged 5 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -1844,6 +1844,28 @@ default ListShareGroupsResult listShareGroups() {
return listShareGroups(new ListShareGroupsOptions());
}

/**
* Describe some classic groups in the cluster.
*
* @param groupIds The IDs of the groups to describe.
* @param options The options to use when describing the groups.
* @return The DescribeClassicGroupsResult.
*/
DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds,
DescribeClassicGroupsOptions options);

/**
* Describe some classic groups in the cluster, with the default options.
* <p>
* This is a convenience method for {@link #describeClassicGroups(Collection, DescribeClassicGroupsOptions)}
* with default options. See the overload for more details.
*
* @param groupIds The IDs of the groups to describe.
* @return The DescribeClassicGroupsResult.
*/
default DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds) {
return describeClassicGroups(groupIds, new DescribeClassicGroupsOptions());
}

/**
* Add the provided application metric for subscription.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.ClassicGroupState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;

import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

/**
* A detailed description of a single classic group in the cluster.
*/
public class ClassicGroupDescription {
private final String groupId;
private final String protocol;
private final String protocolData;
private final Collection<MemberDescription> members;
private final ClassicGroupState state;
private final Node coordinator;
private final Set<AclOperation> authorizedOperations;

public ClassicGroupDescription(String groupId,
String protocol,
String protocolData,
Collection<MemberDescription> members,
ClassicGroupState state,
Node coordinator) {
this(groupId, protocol, protocolData, members, state, coordinator, Set.of());
}

public ClassicGroupDescription(String groupId,
String protocol,
String protocolData,
Collection<MemberDescription> members,
ClassicGroupState state,
Node coordinator,
Set<AclOperation> authorizedOperations) {
this.groupId = groupId == null ? "" : groupId;
this.protocol = protocol;
this.protocolData = protocolData == null ? "" : protocolData;
this.members = members == null ? List.of() : List.copyOf(members);
this.state = state;
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
final ClassicGroupDescription that = (ClassicGroupDescription) o;
return Objects.equals(groupId, that.groupId) &&
Objects.equals(protocol, that.protocol) &&
Objects.equals(protocolData, that.protocolData) &&
Objects.equals(members, that.members) &&
state == that.state &&
Objects.equals(coordinator, that.coordinator) &&
Objects.equals(authorizedOperations, that.authorizedOperations);
}

@Override
public int hashCode() {
return Objects.hash(groupId, protocol, protocolData, members, state, coordinator, authorizedOperations);
}

/**
* The id of the classic group.
*/
public String groupId() {
return groupId;
}

/**
* The group protocol type.
*/
public String protocol() {
return protocol;
}

/**
* The group protocol data. The meaning depends on the group protocol type.
* For a classic consumer group, this is the partition assignor name.
* For a classic connect group, this indicates which Connect protocols are enabled.
*/
public String protocolData() {
return protocolData;
}

/**
* If the group is a simple consumer group or not.
*/
public boolean isSimpleConsumerGroup() {
return protocol.isEmpty();
}

/**
* A list of the members of the classic group.
*/
public Collection<MemberDescription> members() {
return members;
}

/**
* The classic group state, or UNKNOWN if the state is too new for us to parse.
*/
public ClassicGroupState state() {
return state;
}

/**
* The classic group coordinator, or null if the coordinator is not known.
*/
public Node coordinator() {
return coordinator;
}

/**
* authorizedOperations for this group, or null if that information is not known.
*/
public Set<AclOperation> authorizedOperations() {
return authorizedOperations;
}

@Override
public String toString() {
return "(groupId=" + groupId +
", protocol='" + protocol + '\'' +
", protocolData=" + protocolData +
", members=" + members.stream().map(MemberDescription::toString).collect(Collectors.joining(",")) +
", state=" + state +
", coordinator=" + coordinator +
", authorizedOperations=" + authorizedOperations +
")";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;

/**
* Options for {@link Admin#describeClassicGroups(Collection, DescribeClassicGroupsOptions)}.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DescribeClassicGroupsOptions extends AbstractOptions<DescribeClassicGroupsOptions> {
private boolean includeAuthorizedOperations;

public DescribeClassicGroupsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations) {
this.includeAuthorizedOperations = includeAuthorizedOperations;
return this;
}

public boolean includeAuthorizedOperations() {
return includeAuthorizedOperations;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;


/**
* The result of the {@link Admin#describeClassicGroups(Collection, DescribeClassicGroupsOptions)}} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DescribeClassicGroupsResult {

private final Map<String, KafkaFuture<ClassicGroupDescription>> futures;

public DescribeClassicGroupsResult(final Map<String, KafkaFuture<ClassicGroupDescription>> futures) {
this.futures = futures;
}

/**
* Return a map from group id to futures which yield group descriptions.
*/
public Map<String, KafkaFuture<ClassicGroupDescription>> describedGroups() {
return new HashMap<>(futures);
}

/**
* Return a future which yields all ClassicGroupDescription objects, if all the describes succeed.
*/
public KafkaFuture<Map<String, ClassicGroupDescription>> all() {
return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply(
nil -> {
Map<String, ClassicGroupDescription> descriptions = new HashMap<>(futures.size());
futures.forEach((key, future) -> {
try {
descriptions.put(key, future.get());
} catch (InterruptedException | ExecutionException e) {
// This should be unreachable, since the KafkaFuture#allOf already ensured
// that all of the futures completed successfully.
throw new RuntimeException(e);
}
});
return descriptions;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,11 @@ public ListGroupsResult listGroups(ListGroupsOptions options) {
return delegate.listGroups(options);
}

@Override
public DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds, DescribeClassicGroupsOptions options) {
return delegate.describeClassicGroups(groupIds, options);
}

@Override
public void registerMetricForSubscription(KafkaMetric metric) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
import org.apache.kafka.clients.admin.internals.DescribeShareGroupsHandler;
Expand Down Expand Up @@ -3952,6 +3953,17 @@ void handleFailure(Throwable throwable) {
return new ListShareGroupsResult(all);
}

@Override
public DescribeClassicGroupsResult describeClassicGroups(final Collection<String> groupIds,
final DescribeClassicGroupsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, ClassicGroupDescription> future =
DescribeClassicGroupsHandler.newFuture(groupIds);
DescribeClassicGroupsHandler handler = new DescribeClassicGroupsHandler(options.includeAuthorizedOperations(), logContext);
invokeDriver(handler, future, options.timeoutMs);
return new DescribeClassicGroupsResult(future.all().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}

@Override
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(this.metrics.metrics());
Expand Down
Loading