Skip to content


KAFKA-18127 Validate SubscriptionPattern used on v0 HB (#17989)
Browse files Browse the repository at this point in the history
Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
lianetm authored Dec 4, 2024
1 parent 2ee7e4d commit f60382b
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,10 @@ public void assign(Collection<TopicPartition> partitions) {
* @throws org.apache.kafka.common.errors.InvalidTopicException if the current subscription contains any invalid
* topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
* @throws org.apache.kafka.common.errors.UnsupportedVersionException if the consumer attempts to fetch stable offsets
* when the broker doesn't support this feature
* when the broker doesn't support this feature. Also, if the consumer attempts to subscribe to a
* SubscriptionPattern via {@link #subscribe(SubscriptionPattern)} or
* {@link #subscribe(SubscriptionPattern, ConsumerRebalanceListener)} and the broker doesn't
* support this feature.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -94,6 +96,10 @@ public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse
private final HeartbeatMetricsManager metricsManager;

public static final String CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG = "The cluster does not support the new CONSUMER " +
"group protocol. Set group.protocol=classic on the consumer configs to revert to the CLASSIC protocol " +
"until the cluster is upgraded.";

final LogContext logContext,
final Time time,
Expand Down Expand Up @@ -313,12 +319,29 @@ private void onFailure(final Throwable exception, final long responseTimeMs) {
} else {
logger.error("{} failed due to fatal error: {}", heartbeatRequestName(), exception.getMessage());
if (isHBApiUnsupportedErrorMsg(exception)) {
// This is expected to be the case where building the request fails because the node does not support
// the API. Propagate custom message.
handleFatalFailure(new UnsupportedVersionException(CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG, exception));
} else {
// This is the case where building the request fails even though the node supports the API (ex.
// required version 1 not available when regex in use).
// Notify the group manager about the failure after all errors have been handled and propagated.
membershipManager().onHeartbeatFailure(exception instanceof RetriableException);

* @return True if the exception is the UnsupportedVersion generated on the client, before sending the request,
* when checking if the API is available on the broker.
private boolean isHBApiUnsupportedErrorMsg(Throwable exception) {
return exception instanceof UnsupportedVersionException &&
exception.getMessage().equals("The node does not support " + ApiKeys.CONSUMER_GROUP_HEARTBEAT);

private void onResponse(final R response, final long currentTimeMs) {
if (errorForResponse(response) == Errors.NONE) {
Expand Down Expand Up @@ -382,10 +405,11 @@ private void onErrorResponse(final R response, final long currentTimeMs) {

message = "The cluster doesn't yet support the new consumer group protocol." +
" Set group.protocol=classic to revert to the classic protocol until the cluster is upgraded.";
// Broker responded with HB not supported, meaning the new protocol is not enabled, so propagate
// custom message for it. Note that the case where the protocol is not supported at all should fail
// on the client side when building the request and checking supporting APIs (handled on onFailure).
logger.error("{} failed due to {}: {}", heartbeatRequestName(), error, errorMessage);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.apache.kafka.common.requests;

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
Expand Down Expand Up @@ -59,6 +60,11 @@ public Builder(ConsumerGroupHeartbeatRequestData data, boolean enableUnstableLas

public ConsumerGroupHeartbeatRequest build(short version) {
if (version == 0 && data.subscribedTopicRegex() != null) {
throw new UnsupportedVersionException("The cluster does not support regular expressions resolution " +
"on ConsumerGroupHeartbeat API version " + version + ". It must be upgraded to use " +
"ConsumerGroupHeartbeat API version >= 1 to allow to subscribe to a SubscriptionPattern.");
return new ConsumerGroupHeartbeatRequest(data, version);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.Metadata.LeaderAndEpoch;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
Expand Down Expand Up @@ -57,7 +59,9 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
Expand All @@ -66,17 +70,26 @@
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.test.MockConsumerInterceptor;
import org.apache.kafka.test.TestUtils;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -1878,6 +1891,66 @@ public void testSubscribeToRe2JPatternGeneratesEvent() {

// SubscriptionPattern is supported as of ConsumerGroupHeartbeatRequest v1. Clients using subscribe
// (SubscribePattern) against older broker versions should get UnsupportedVersionException on poll after subscribe
public void testSubscribePatternAgainstBrokerNotSupportingRegex() throws InterruptedException {
final Properties props = requiredConsumerConfig();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

final ConsumerConfig config = new ConsumerConfig(props);

ConsumerMetadata metadata = new ConsumerMetadata(0, 0, Long.MAX_VALUE, false, false,
mock(SubscriptionState.class), new LogContext(), new ClusterResourceListeners());
MockClient client = new MockClient(time, metadata);
MetadataResponse initialMetadata = RequestTestUtils.metadataUpdateWithIds(1, Map.of("topic1", 2),
Map.of("topic1", Uuid.randomUuid()));
// ConsumerGroupHeartbeat v0 does not support broker-side regex resolution
client.setNodeApiVersions(NodeApiVersions.create(, (short) 0, (short) 0));

// Mock response to find coordinator
Node node = metadata.fetch().nodes().get(0);
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node), node);

// Mock HB response (needed so that the MockClient builds the request)
ConsumerGroupHeartbeatResponse result =
new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
Node coordinator = new Node(Integer.MAX_VALUE -,, node.port());
client.prepareResponseFrom(result, coordinator);

SubscriptionState subscriptionState = mock(SubscriptionState.class);

consumer = new AsyncKafkaConsumer<>(
new LogContext(),
new StringDeserializer(),
new StringDeserializer(),

SubscriptionPattern pattern = new SubscriptionPattern("t*");
TestUtils.waitForCondition(() -> {
try {
// The request is generated in the background thread so allow for that
// async operation to happen to detect the failure.
return false;
} catch (UnsupportedVersionException e) {
return true;
}, "Consumer did not throw the expected UnsupportedVersionException on poll");

private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,44 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole

public void testUnsupportedVersion() {
mockErrorResponse(Errors.UNSUPPORTED_VERSION, null);
ArgumentCaptor<ErrorEvent> errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
ErrorEvent errorEvent = errorEventArgumentCaptor.getValue();

// UnsupportedApiVersion in HB response without any custom message. It's considered as new protocol not supported.
String hbNotSupportedMsg = "The cluster does not support the new consumer group protocol. Set group" +
".protocol=classic on the consumer configs to revert to the classic protocol until the cluster is upgraded.";
assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error());
assertEquals(hbNotSupportedMsg, errorEvent.error().getMessage());

// UnsupportedApiVersion in HB response with custom message. Specific to required version not present, should
// keep the custom message.
String hbVersionNotSupportedMsg = "The cluster does not support resolution of SubscriptionPattern on version 0. " +
"It must be upgraded to version >= 1 to allow to subscribe to a SubscriptionPattern.";
mockErrorResponse(Errors.UNSUPPORTED_VERSION, hbVersionNotSupportedMsg);
errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
errorEvent = errorEventArgumentCaptor.getValue();
assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error());
assertEquals(hbVersionNotSupportedMsg, errorEvent.error().getMessage());

private void mockErrorResponse(Errors error, String exceptionCustomMsg) {
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
assertEquals(1, result.unsentRequests.size());

ClientResponse response = createHeartbeatResponse(
result.unsentRequests.get(0), error, exceptionCustomMsg);
ConsumerGroupHeartbeatResponse mockResponse = (ConsumerGroupHeartbeatResponse) response.responseBody();

private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) {
long currentTimeMs = time.milliseconds();
assertEquals(expectedTimeToNextHeartbeatMs, heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
Expand Down Expand Up @@ -969,17 +1007,23 @@ private static Collection<Arguments> errorProvider() {
Arguments.of(Errors.GROUP_MAX_SIZE_REACHED, true));

private ClientResponse createHeartbeatResponse(NetworkClientDelegate.UnsentRequest request,
Errors error) {
return createHeartbeatResponse(request, error, "stubbed error message");

private ClientResponse createHeartbeatResponse(
final NetworkClientDelegate.UnsentRequest request,
final Errors error
final Errors error,
final String msg
) {
ConsumerGroupHeartbeatResponseData data = new ConsumerGroupHeartbeatResponseData()
if (error != Errors.NONE) {
data.setErrorMessage("stubbed error message");
ConsumerGroupHeartbeatResponse response = new ConsumerGroupHeartbeatResponse(data);
return new ClientResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.test.TestUtils;
Expand Down Expand Up @@ -64,8 +65,7 @@ public void testAsyncConsumerWithOldGroupCoordinator(ClusterInstance clusterInst
return false;
} catch (UnsupportedVersionException e) {
return e.getMessage().contains("The cluster doesn't yet support the new consumer group protocol. " +
"Set group.protocol=classic to revert to the classic protocol until the cluster is upgraded.");
return e.getMessage().equals(AbstractHeartbeatRequestManager.CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG);
}, "Should get UnsupportedVersionException and how to revert to classic protocol");
Expand Down

0 comments on commit f60382b

Please sign in to comment.