Skip to content

Commit

Permalink
KAFKA-17648: AsyncKafkaConsumer#unsubscribe swallow TopicAuthorizatio…
Browse files Browse the repository at this point in the history
…nException and GroupAuthorizationException

Signed-off-by: PoAn Yang <[email protected]>
  • Loading branch information
FrankYang0529 committed Nov 13, 2024
1 parent 6bc7be7 commit 4b76ef6
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -1460,10 +1462,12 @@ public void unsubscribe() {
subscriptions.assignedPartitions());

try {
// If users subscribe to an invalid topic name, they will get InvalidTopicException in error events,
// because network thread keeps trying to send MetadataRequest in the background.
// Ignore it to avoid unsubscribe failed.
processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof InvalidTopicException);
// If users subscribe to a topic with invalid name or without permission, they will get some exceptions.
// Because network thread keeps trying to send MetadataRequest or ConsumerGroupHeartbeatRequest in the background,
// there will be some error events in the background queue.
// When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully.
processBackgroundEvents(unsubscribeEvent.future(), timer,
e -> e instanceof InvalidTopicException || e instanceof TopicAuthorizationException || e instanceof GroupAuthorizationException);
log.info("Unsubscribed all topics or patterns and assigned partitions");
} catch (TimeoutException e) {
log.error("Failed while waiting for the unsubscribe event to complete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, Disabled, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource

Expand Down Expand Up @@ -132,6 +133,136 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumeUnsubscribeWithoutTopicPermission(quorum: String, groupProtocol: String): Unit = {
val topic = "topic"

createTopic(topic, listenerName = interBrokerListenerName)

// allow group read permission to join group
val group = "group"
addAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)

val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val consumer = createConsumer(configOverrides = props)
consumer.subscribe(List(topic).asJava)

assertDoesNotThrow(new Executable {
override def execute(): Unit = consumer.unsubscribe()
})
}

@Disabled("Enable after KAFKA-16985")
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumeCloseWithoutTopicPermission(quorum: String, groupProtocol: String): Unit = {
val topic = "topic"

createTopic(topic, listenerName = interBrokerListenerName)

// allow group read permission to join group
val group = "group"
addAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)

val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val consumer = createConsumer(configOverrides = props)
consumer.subscribe(List(topic).asJava)

assertDoesNotThrow(new Executable {
override def execute(): Unit = consumer.close()
})
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumeUnsubscribeWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = {
val topic = "topic"

createTopic(topic, listenerName = interBrokerListenerName)

// allow topic read/write permission to poll/send record
addAndVerifyAcls(
Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
)
val producer = createProducer()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()
producer.close()

// allow group read permission to join group
val group = "group"
addAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)

val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val consumer = createConsumer(configOverrides = props)
consumer.subscribe(List(topic).asJava)
TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)

removeAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)

assertDoesNotThrow(new Executable {
override def execute(): Unit = consumer.unsubscribe()
})
}

@Disabled("Enable after KAFKA-16985")
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = {
val topic = "topic"
createTopic(topic, listenerName = interBrokerListenerName)

// allow topic read/write permission to poll/send record
addAndVerifyAcls(
Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
)
val producer = createProducer()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()

// allow group read permission to join group
val group = "group"
addAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)

val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val consumer = createConsumer(configOverrides = props)
consumer.subscribe(List(topic).asJava)
TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)

removeAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)

assertDoesNotThrow(new Executable {
override def execute(): Unit = consumer.close()
})
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testAuthorizedProduceAndConsume(quorum: String, groupProtocol: String): Unit = {
Expand Down

0 comments on commit 4b76ef6

Please sign in to comment.