From cc20e7847450d7a3bd5af85f821697e17761cc60 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Fri, 15 Nov 2024 22:15:26 +0800 Subject: [PATCH] KAFKA-17648: AsyncKafkaConsumer#unsubscribe and #close swallow TopicAuthorizationException and GroupAuthorizationException (#17516) Reviewers: Lianet Magrans , Kirk True --- .../internals/AsyncKafkaConsumer.java | 12 ++- .../internals/AsyncKafkaConsumerTest.java | 18 +++++ .../api/GroupAuthorizerIntegrationTest.scala | 79 +++++++++++++++++++ 3 files changed, 105 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index fcf688b7f8e9b..da3f3f2f25b45 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -76,10 +76,12 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; @@ -1571,10 +1573,12 @@ public void unsubscribe() { subscriptions.assignedPartitions()); try { - // If users subscribe to an invalid topic name, they will get InvalidTopicException in error events, - // because network thread keeps trying to send MetadataRequest in the background. - // Ignore it to avoid unsubscribe failed. - processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof InvalidTopicException); + // If users subscribe to a topic with invalid name or without permission, they will get some exceptions. + // Because network thread keeps trying to send MetadataRequest or ConsumerGroupHeartbeatRequest in the background, + // there will be some error events in the background queue. + // When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully. + processBackgroundEvents(unsubscribeEvent.future(), timer, + e -> e instanceof InvalidTopicException || e instanceof TopicAuthorizationException || e instanceof GroupAuthorizationException); log.info("Unsubscribed all topics or patterns and assigned partitions"); } catch (TimeoutException e) { log.error("Failed while waiting for the unsubscribe event to complete"); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index eaf974b91c43a..8eb8ec4c85bd5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -63,6 +63,7 @@ import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; @@ -282,6 +283,23 @@ public void testCloseWithInvalidTopicException() { assertDoesNotThrow(() -> consumer.close()); } + @Test + public void testUnsubscribeWithTopicAuthorizationException() { + consumer = newConsumer(); + backgroundEventQueue.add(new ErrorEvent(new TopicAuthorizationException(Set.of("test-topic")))); + completeUnsubscribeApplicationEventSuccessfully(); + assertDoesNotThrow(() -> consumer.unsubscribe()); + assertDoesNotThrow(() -> consumer.close()); + } + + @Test + public void testCloseWithTopicAuthorizationException() { + consumer = newConsumer(); + backgroundEventQueue.add(new ErrorEvent(new TopicAuthorizationException(Set.of("test-topic")))); + completeUnsubscribeApplicationEventSuccessfully(); + assertDoesNotThrow(() -> consumer.close()); + } + @Test public void testCommitAsyncWithNullCallback() { consumer = newConsumer(); diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala index 46b34f4efdaff..e9a0644a26c63 100644 --- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala @@ -33,6 +33,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST import org.apache.kafka.server.config.ServerConfigs import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource @@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumeUnsubscribeWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = { + val topic = "topic" + + createTopic(topic, listenerName = interBrokerListenerName) + + // allow topic read/write permission to poll/send record + addAndVerifyAcls( + Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) + ) + val producer = createProducer() + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get() + producer.close() + + // allow group read permission to join group + val group = "group" + addAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) + ) + + val props = new Properties() + props.put(ConsumerConfig.GROUP_ID_CONFIG, group) + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + val consumer = createConsumer(configOverrides = props) + consumer.subscribe(List(topic).asJava) + TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1) + + removeAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) + ) + + assertDoesNotThrow(new Executable { + override def execute(): Unit = consumer.unsubscribe() + }) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = { + val topic = "topic" + createTopic(topic, listenerName = interBrokerListenerName) + + // allow topic read/write permission to poll/send record + addAndVerifyAcls( + Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) + ) + val producer = createProducer() + producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get() + + // allow group read permission to join group + val group = "group" + addAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) + ) + + val props = new Properties() + props.put(ConsumerConfig.GROUP_ID_CONFIG, group) + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + val consumer = createConsumer(configOverrides = props) + consumer.subscribe(List(topic).asJava) + TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1) + + removeAndVerifyAcls( + Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)), + new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL) + ) + + assertDoesNotThrow(new Executable { + override def execute(): Unit = consumer.close() + }) + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testAuthorizedProduceAndConsume(quorum: String, groupProtocol: String): Unit = {