diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 92e4cf2a55078..7038c7b29c63f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -878,7 +878,10 @@ public void assign(Collection partitions) { * @throws org.apache.kafka.common.errors.InvalidTopicException if the current subscription contains any invalid * topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)}) * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the consumer attempts to fetch stable offsets - * when the broker doesn't support this feature + * when the broker doesn't support this feature. Also, if the consumer attempts to subscribe to a + * SubscriptionPattern via {@link #subscribe(SubscriptionPattern)} or + * {@link #subscribe(SubscriptionPattern, ConsumerRebalanceListener)} and the broker doesn't + * support this feature. * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker. */ @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java index 9c1a63cd79a37..b260e5e5fbf6b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java @@ -25,6 +25,8 @@ import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.utils.LogContext; @@ -94,6 +96,10 @@ public abstract class AbstractHeartbeatRequestManager= 1 to allow to subscribe to a SubscriptionPattern."); + } return new ConsumerGroupHeartbeatRequest(data, version); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index a3c59956c5f43..ccb6a50892d85 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -17,6 +17,8 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.Metadata.LeaderAndEpoch; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -57,7 +59,9 @@ import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; +import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InterruptException; @@ -66,17 +70,26 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse; +import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.JoinGroupRequest; import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.RequestTestUtils; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; import org.apache.kafka.test.MockConsumerInterceptor; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -1878,6 +1891,66 @@ public void testSubscribeToRe2JPatternGeneratesEvent() { verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(TopicRe2JPatternSubscriptionChangeEvent.class)); } + // SubscriptionPattern is supported as of ConsumerGroupHeartbeatRequest v1. Clients using subscribe + // (SubscribePattern) against older broker versions should get UnsupportedVersionException on poll after subscribe + @Test + public void testSubscribePatternAgainstBrokerNotSupportingRegex() throws InterruptedException { + final Properties props = requiredConsumerConfig(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id"); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + final ConsumerConfig config = new ConsumerConfig(props); + + ConsumerMetadata metadata = new ConsumerMetadata(0, 0, Long.MAX_VALUE, false, false, + mock(SubscriptionState.class), new LogContext(), new ClusterResourceListeners()); + MockClient client = new MockClient(time, metadata); + MetadataResponse initialMetadata = RequestTestUtils.metadataUpdateWithIds(1, Map.of("topic1", 2), + Map.of("topic1", Uuid.randomUuid())); + client.updateMetadata(initialMetadata); + // ConsumerGroupHeartbeat v0 does not support broker-side regex resolution + client.setNodeApiVersions(NodeApiVersions.create(ApiKeys.CONSUMER_GROUP_HEARTBEAT.id, (short) 0, (short) 0)); + + // Mock response to find coordinator + Node node = metadata.fetch().nodes().get(0); + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node), node); + + // Mock HB response (needed so that the MockClient builds the request) + ConsumerGroupHeartbeatResponse result = + new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() + .setMemberId("") + .setMemberEpoch(0)); + Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()); + client.prepareResponseFrom(result, coordinator); + + SubscriptionState subscriptionState = mock(SubscriptionState.class); + + consumer = new AsyncKafkaConsumer<>( + new LogContext(), + time, + config, + new StringDeserializer(), + new StringDeserializer(), + client, + subscriptionState, + metadata + ); + completeTopicRe2JPatternSubscriptionChangeEventSuccessfully(); + + SubscriptionPattern pattern = new SubscriptionPattern("t*"); + consumer.subscribe(pattern); + when(subscriptionState.subscriptionPattern()).thenReturn(pattern); + TestUtils.waitForCondition(() -> { + try { + // The request is generated in the background thread so allow for that + // async operation to happen to detect the failure. + consumer.poll(Duration.ZERO); + return false; + } catch (UnsupportedVersionException e) { + return true; + } + }, "Consumer did not throw the expected UnsupportedVersionException on poll"); + } + private Map mockTopicPartitionOffset() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java index 47630c4e9d859..415a84ebbb906 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java @@ -602,6 +602,44 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole } } + @Test + public void testUnsupportedVersion() { + mockErrorResponse(Errors.UNSUPPORTED_VERSION, null); + ArgumentCaptor errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class); + verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture()); + ErrorEvent errorEvent = errorEventArgumentCaptor.getValue(); + + // UnsupportedApiVersion in HB response without any custom message. It's considered as new protocol not supported. + String hbNotSupportedMsg = "The cluster does not support the new consumer group protocol. Set group" + + ".protocol=classic on the consumer configs to revert to the classic protocol until the cluster is upgraded."; + assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error()); + assertEquals(hbNotSupportedMsg, errorEvent.error().getMessage()); + clearInvocations(backgroundEventHandler); + + // UnsupportedApiVersion in HB response with custom message. Specific to required version not present, should + // keep the custom message. + String hbVersionNotSupportedMsg = "The cluster does not support resolution of SubscriptionPattern on version 0. " + + "It must be upgraded to version >= 1 to allow to subscribe to a SubscriptionPattern."; + mockErrorResponse(Errors.UNSUPPORTED_VERSION, hbVersionNotSupportedMsg); + errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class); + verify(backgroundEventHandler).add(errorEventArgumentCaptor.capture()); + errorEvent = errorEventArgumentCaptor.getValue(); + assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), errorEvent.error()); + assertEquals(hbVersionNotSupportedMsg, errorEvent.error().getMessage()); + } + + private void mockErrorResponse(Errors error, String exceptionCustomMsg) { + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(1, result.unsentRequests.size()); + + when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); + ClientResponse response = createHeartbeatResponse( + result.unsentRequests.get(0), error, exceptionCustomMsg); + result.unsentRequests.get(0).handler().onComplete(response); + ConsumerGroupHeartbeatResponse mockResponse = (ConsumerGroupHeartbeatResponse) response.responseBody(); + } + private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) { long currentTimeMs = time.milliseconds(); assertEquals(expectedTimeToNextHeartbeatMs, heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs)); @@ -969,9 +1007,15 @@ private static Collection errorProvider() { Arguments.of(Errors.GROUP_MAX_SIZE_REACHED, true)); } + private ClientResponse createHeartbeatResponse(NetworkClientDelegate.UnsentRequest request, + Errors error) { + return createHeartbeatResponse(request, error, "stubbed error message"); + } + private ClientResponse createHeartbeatResponse( final NetworkClientDelegate.UnsentRequest request, - final Errors error + final Errors error, + final String msg ) { ConsumerGroupHeartbeatResponseData data = new ConsumerGroupHeartbeatResponseData() .setErrorCode(error.code()) @@ -979,7 +1023,7 @@ private ClientResponse createHeartbeatResponse( .setMemberId(DEFAULT_MEMBER_ID) .setMemberEpoch(DEFAULT_MEMBER_EPOCH); if (error != Errors.NONE) { - data.setErrorMessage("stubbed error message"); + data.setErrorMessage(msg); } ConsumerGroupHeartbeatResponse response = new ConsumerGroupHeartbeatResponse(data); return new ClientResponse( diff --git a/core/src/test/java/kafka/clients/consumer/AsyncKafkaConsumerIntegrationTest.java b/core/src/test/java/kafka/clients/consumer/AsyncKafkaConsumerIntegrationTest.java index ff851145a21ef..7e1c062c0fcf7 100644 --- a/core/src/test/java/kafka/clients/consumer/AsyncKafkaConsumerIntegrationTest.java +++ b/core/src/test/java/kafka/clients/consumer/AsyncKafkaConsumerIntegrationTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.test.TestUtils; @@ -64,8 +65,7 @@ public void testAsyncConsumerWithOldGroupCoordinator(ClusterInstance clusterInst consumer.poll(Duration.ofMillis(1000)); return false; } catch (UnsupportedVersionException e) { - return e.getMessage().contains("The cluster doesn't yet support the new consumer group protocol. " + - "Set group.protocol=classic to revert to the classic protocol until the cluster is upgraded."); + return e.getMessage().equals(AbstractHeartbeatRequestManager.CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG); } }, "Should get UnsupportedVersionException and how to revert to classic protocol"); }