From 9ce814f78dca682a65ce2bc44751e3d074e9acce Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Wed, 16 Oct 2024 21:54:21 +0800 Subject: [PATCH] KAFKA-17648: AsyncKafkaConsumer#unsubscribe swallow TopicAuthorizationException and GroupAuthorizationException Signed-off-by: PoAn Yang --- .../internals/AsyncKafkaConsumer.java | 22 ++++--- .../api/GroupAuthorizerIntegrationTest.scala | 60 +++++++++++++++++-- 2 files changed, 68 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 1e4d0cf195b63..06ceedf457413 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -73,10 +73,12 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.InvalidGroupIdException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; @@ -1277,10 +1279,12 @@ private void releaseAssignmentAndLeaveGroup(final Timer timer) { UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer)); applicationEventHandler.add(unsubscribeEvent); try { - // If users subscribe to an invalid topic name, they will get InvalidTopicException in error events, - // because network thread keeps trying to send MetadataRequest in the background. - // Ignore it to avoid unsubscribe failed. - processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof InvalidTopicException); + // If users subscribe to a topic with invalid name or without permission, they will get some exceptions. + // Because network thread sends MetadataRequest and FindCoordinatorRequest in the background, + // there will be some error events in the background queue. + // When running close, these exceptions should be ignored, or users can't close successfully. + processBackgroundEvents(unsubscribeEvent.future(), timer, + e -> e instanceof InvalidTopicException || e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException); log.info("Completed releasing assignment and sending leave group to close consumer"); } catch (TimeoutException e) { log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " + @@ -1477,10 +1481,12 @@ public void unsubscribe() { subscriptions.assignedPartitions()); try { - // If users subscribe to an invalid topic name, they will get InvalidTopicException in error events, - // because network thread keeps trying to send MetadataRequest in the background. - // Ignore it to avoid unsubscribe failed. - processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof InvalidTopicException); + // If users subscribe to a topic with invalid name or without permission, they will get some exceptions. + // Because network thread sends MetadataRequest and FindCoordinatorRequest in the background, + // there will be some error events in the background queue. + // When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully. + processBackgroundEvents(unsubscribeEvent.future(), timer, + e -> e instanceof InvalidTopicException || e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException); log.info("Unsubscribed all topics or patterns and assigned partitions"); } catch (TimeoutException e) { log.error("Failed while waiting for the unsubscribe event to complete"); diff --git a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala index 7723c38ac5eb0..e2ca05e92f1e2 100644 --- a/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/GroupAuthorizerIntegrationTest.scala @@ -17,13 +17,13 @@ import java.util.concurrent.ExecutionException import kafka.api.GroupAuthorizerIntegrationTest._ import kafka.security.authorizer.AclAuthorizer import kafka.server.BaseRequestTest -import kafka.utils.TestUtils +import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation, AclPermissionType} import org.apache.kafka.common.config.internals.BrokerSecurityConfigs -import org.apache.kafka.common.errors.TopicAuthorizationException +import org.apache.kafka.common.errors.{GroupAuthorizationException, TopicAuthorizationException} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType} import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal} @@ -34,10 +34,12 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST import org.apache.kafka.server.config.ServerConfigs import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.function.Executable import org.junit.jupiter.api.{BeforeEach, TestInfo} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.{Arguments, MethodSource, ValueSource} +import java.util.stream.Stream import scala.jdk.CollectionConverters._ object GroupAuthorizerIntegrationTest { @@ -57,6 +59,9 @@ object GroupAuthorizerIntegrationTest { } } } + + def getTestQuorumAndGroupProtocolParametersAll: Stream[Arguments] = + BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll() } class GroupAuthorizerIntegrationTest extends BaseRequestTest { @@ -118,9 +123,8 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { @ParameterizedTest @ValueSource(strings = Array("zk", "kraft")) - def testUnauthorizedProduceAndConsume(quorum: String): Unit = { + def testUnauthorizedProduce(quorum: String): Unit = { val topic = "topic" - val topicPartition = new TopicPartition("topic", 0) createTopic(topic, listenerName = interBrokerListenerName) @@ -129,12 +133,56 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { () => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()).getCause assertTrue(produceException.isInstanceOf[TopicAuthorizationException]) assertEquals(Set(topic), produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala) + } - val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testUnauthorizedConsumeUnsubscribe(quorum: String, groupProtocol: String): Unit = { + val topic = "topic" + val topicPartition = new TopicPartition(topic, 0) + + createTopic(topic, listenerName = interBrokerListenerName) + + val consumer = createConsumer() + consumer.assign(List(topicPartition).asJava) + val consumeException = assertThrows(classOf[TopicAuthorizationException], + () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)) + assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) + + assertThrows(classOf[GroupAuthorizationException], + () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)) + + // TODO: use background-event-queue-size metric to check there is background event + Thread.sleep(3000) + + assertDoesNotThrow(new Executable { + override def execute(): Unit = consumer.unsubscribe() + }) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testUnauthorizedConsumeClose(quorum: String, groupProtocol: String): Unit = { + val topic = "topic" + val topicPartition = new TopicPartition(topic, 0) + + createTopic(topic, listenerName = interBrokerListenerName) + + val consumer = createConsumer() consumer.assign(List(topicPartition).asJava) val consumeException = assertThrows(classOf[TopicAuthorizationException], () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)) assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) + + assertThrows(classOf[GroupAuthorizationException], + () => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)) + + // TODO: use background-event-queue-size metric to check there is background event + Thread.sleep(3000) + + assertDoesNotThrow(new Executable { + override def execute(): Unit = consumer.close() + }) } @ParameterizedTest