Skip to content

Commit

Permalink
KAFKA-17648: AsyncKafkaConsumer#unsubscribe and #close swallow TopicA…
Browse files Browse the repository at this point in the history
…uthorizationException and GroupAuthorizationException (#17516)

Reviewers: Lianet Magrans <[email protected]>, Kirk True <[email protected]>
  • Loading branch information
FrankYang0529 authored Nov 15, 2024
1 parent 84fe668 commit cc20e78
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -1571,10 +1573,12 @@ public void unsubscribe() {
subscriptions.assignedPartitions());

try {
// If users subscribe to an invalid topic name, they will get InvalidTopicException in error events,
// because network thread keeps trying to send MetadataRequest in the background.
// Ignore it to avoid unsubscribe failed.
processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof InvalidTopicException);
// If users subscribe to a topic with invalid name or without permission, they will get some exceptions.
// Because network thread keeps trying to send MetadataRequest or ConsumerGroupHeartbeatRequest in the background,
// there will be some error events in the background queue.
// When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully.
processBackgroundEvents(unsubscribeEvent.future(), timer,
e -> e instanceof InvalidTopicException || e instanceof TopicAuthorizationException || e instanceof GroupAuthorizationException);
log.info("Unsubscribed all topics or patterns and assigned partitions");
} catch (TimeoutException e) {
log.error("Failed while waiting for the unsubscribe event to complete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
Expand Down Expand Up @@ -282,6 +283,23 @@ public void testCloseWithInvalidTopicException() {
assertDoesNotThrow(() -> consumer.close());
}

@Test
public void testUnsubscribeWithTopicAuthorizationException() {
consumer = newConsumer();
backgroundEventQueue.add(new ErrorEvent(new TopicAuthorizationException(Set.of("test-topic"))));
completeUnsubscribeApplicationEventSuccessfully();
assertDoesNotThrow(() -> consumer.unsubscribe());
assertDoesNotThrow(() -> consumer.close());
}

@Test
public void testCloseWithTopicAuthorizationException() {
consumer = newConsumer();
backgroundEventQueue.add(new ErrorEvent(new TopicAuthorizationException(Set.of("test-topic"))));
completeUnsubscribeApplicationEventSuccessfully();
assertDoesNotThrow(() -> consumer.close());
}

@Test
public void testCommitAsyncWithNullCallback() {
consumer = newConsumer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
Expand Down Expand Up @@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumeUnsubscribeWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = {
val topic = "topic"

createTopic(topic, listenerName = interBrokerListenerName)

// allow topic read/write permission to poll/send record
addAndVerifyAcls(
Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
)
val producer = createProducer()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()
producer.close()

// allow group read permission to join group
val group = "group"
addAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)

val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val consumer = createConsumer(configOverrides = props)
consumer.subscribe(List(topic).asJava)
TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)

removeAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)

assertDoesNotThrow(new Executable {
override def execute(): Unit = consumer.unsubscribe()
})
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = {
val topic = "topic"
createTopic(topic, listenerName = interBrokerListenerName)

// allow topic read/write permission to poll/send record
addAndVerifyAcls(
Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
)
val producer = createProducer()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()

// allow group read permission to join group
val group = "group"
addAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)

val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val consumer = createConsumer(configOverrides = props)
consumer.subscribe(List(topic).asJava)
TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)

removeAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)

assertDoesNotThrow(new Executable {
override def execute(): Unit = consumer.close()
})
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testAuthorizedProduceAndConsume(quorum: String, groupProtocol: String): Unit = {
Expand Down

0 comments on commit cc20e78

Please sign in to comment.