Skip to content

Commit

Permalink
KAFKA-17648: AsyncKafkaConsumer#unsubscribe swallow TopicAuthorizatio…
Browse files Browse the repository at this point in the history
…nException and GroupAuthorizationException

Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Oct 16, 2024
1 parent 3b619db commit 9ce814f
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -1277,10 +1279,12 @@ private void releaseAssignmentAndLeaveGroup(final Timer timer) {
UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(calculateDeadlineMs(timer));
applicationEventHandler.add(unsubscribeEvent);
try {
// If users subscribe to an invalid topic name, they will get InvalidTopicException in error events,
// because network thread keeps trying to send MetadataRequest in the background.
// Ignore it to avoid unsubscribe failed.
processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof InvalidTopicException);
// If users subscribe to a topic with invalid name or without permission, they will get some exceptions.
// Because network thread sends MetadataRequest and FindCoordinatorRequest in the background,
// there will be some error events in the background queue.
// When running close, these exceptions should be ignored, or users can't close successfully.
processBackgroundEvents(unsubscribeEvent.future(), timer,
e -> e instanceof InvalidTopicException || e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException);
log.info("Completed releasing assignment and sending leave group to close consumer");
} catch (TimeoutException e) {
log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " +
Expand Down Expand Up @@ -1477,10 +1481,12 @@ public void unsubscribe() {
subscriptions.assignedPartitions());

try {
// If users subscribe to an invalid topic name, they will get InvalidTopicException in error events,
// because network thread keeps trying to send MetadataRequest in the background.
// Ignore it to avoid unsubscribe failed.
processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof InvalidTopicException);
// If users subscribe to a topic with invalid name or without permission, they will get some exceptions.
// Because network thread sends MetadataRequest and FindCoordinatorRequest in the background,
// there will be some error events in the background queue.
// When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully.
processBackgroundEvents(unsubscribeEvent.future(), timer,
e -> e instanceof InvalidTopicException || e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException);
log.info("Unsubscribed all topics or patterns and assigned partitions");
} catch (TimeoutException e) {
log.error("Failed while waiting for the unsubscribe event to complete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import java.util.concurrent.ExecutionException
import kafka.api.GroupAuthorizerIntegrationTest._
import kafka.security.authorizer.AclAuthorizer
import kafka.server.BaseRequestTest
import kafka.utils.TestUtils
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.acl.{AccessControlEntry, AclOperation, AclPermissionType}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.errors.TopicAuthorizationException
import org.apache.kafka.common.errors.{GroupAuthorizationException, TopicAuthorizationException}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal}
Expand All @@ -34,10 +34,12 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.params.provider.{Arguments, MethodSource, ValueSource}

import java.util.stream.Stream
import scala.jdk.CollectionConverters._

object GroupAuthorizerIntegrationTest {
Expand All @@ -57,6 +59,9 @@ object GroupAuthorizerIntegrationTest {
}
}
}

def getTestQuorumAndGroupProtocolParametersAll: Stream[Arguments] =
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll()
}

class GroupAuthorizerIntegrationTest extends BaseRequestTest {
Expand Down Expand Up @@ -118,9 +123,8 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {

@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
def testUnauthorizedProduceAndConsume(quorum: String): Unit = {
def testUnauthorizedProduce(quorum: String): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition("topic", 0)

createTopic(topic, listenerName = interBrokerListenerName)

Expand All @@ -129,12 +133,56 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
() => producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()).getCause
assertTrue(produceException.isInstanceOf[TopicAuthorizationException])
assertEquals(Set(topic), produceException.asInstanceOf[TopicAuthorizationException].unauthorizedTopics.asScala)
}

val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG))
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testUnauthorizedConsumeUnsubscribe(quorum: String, groupProtocol: String): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition(topic, 0)

createTopic(topic, listenerName = interBrokerListenerName)

val consumer = createConsumer()
consumer.assign(List(topicPartition).asJava)
val consumeException = assertThrows(classOf[TopicAuthorizationException],
() => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)

assertThrows(classOf[GroupAuthorizationException],
() => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))

// TODO: use background-event-queue-size metric to check there is background event
Thread.sleep(3000)

assertDoesNotThrow(new Executable {
override def execute(): Unit = consumer.unsubscribe()
})
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testUnauthorizedConsumeClose(quorum: String, groupProtocol: String): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition(topic, 0)

createTopic(topic, listenerName = interBrokerListenerName)

val consumer = createConsumer()
consumer.assign(List(topicPartition).asJava)
val consumeException = assertThrows(classOf[TopicAuthorizationException],
() => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))
assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)

assertThrows(classOf[GroupAuthorizationException],
() => TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1))

// TODO: use background-event-queue-size metric to check there is background event
Thread.sleep(3000)

assertDoesNotThrow(new Executable {
override def execute(): Unit = consumer.close()
})
}

@ParameterizedTest
Expand Down

0 comments on commit 9ce814f

Please sign in to comment.