Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17648: AsyncKafkaConsumer#unsubscribe swallow TopicAuthorizationException and GroupAuthorizationException #17516

Merged
merged 1 commit into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -1571,10 +1573,12 @@ public void unsubscribe() {
subscriptions.assignedPartitions());

try {
// If users subscribe to an invalid topic name, they will get InvalidTopicException in error events,
// because network thread keeps trying to send MetadataRequest in the background.
// Ignore it to avoid unsubscribe failed.
processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof InvalidTopicException);
// If users subscribe to a topic with invalid name or without permission, they will get some exceptions.
// Because network thread keeps trying to send MetadataRequest or ConsumerGroupHeartbeatRequest in the background,
// there will be some error events in the background queue.
// When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully.
processBackgroundEvents(unsubscribeEvent.future(), timer,
e -> e instanceof InvalidTopicException || e instanceof TopicAuthorizationException || e instanceof GroupAuthorizationException);
log.info("Unsubscribed all topics or patterns and assigned partitions");
} catch (TimeoutException e) {
log.error("Failed while waiting for the unsubscribe event to complete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
Expand Down Expand Up @@ -282,6 +283,23 @@ public void testCloseWithInvalidTopicException() {
assertDoesNotThrow(() -> consumer.close());
}

@Test
public void testUnsubscribeWithTopicAuthorizationException() {
consumer = newConsumer();
backgroundEventQueue.add(new ErrorEvent(new TopicAuthorizationException(Set.of("test-topic"))));
completeUnsubscribeApplicationEventSuccessfully();
assertDoesNotThrow(() -> consumer.unsubscribe());
assertDoesNotThrow(() -> consumer.close());
}

@Test
public void testCloseWithTopicAuthorizationException() {
consumer = newConsumer();
backgroundEventQueue.add(new ErrorEvent(new TopicAuthorizationException(Set.of("test-topic"))));
completeUnsubscribeApplicationEventSuccessfully();
assertDoesNotThrow(() -> consumer.close());
}

@Test
public void testCommitAsyncWithNullCallback() {
consumer = newConsumer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
Expand Down Expand Up @@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest {
assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumeUnsubscribeWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = {
val topic = "topic"

createTopic(topic, listenerName = interBrokerListenerName)

// allow topic read/write permission to poll/send record
addAndVerifyAcls(
Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
)
val producer = createProducer()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()
producer.close()

// allow group read permission to join group
val group = "group"
addAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)

val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val consumer = createConsumer(configOverrides = props)
consumer.subscribe(List(topic).asJava)
TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)

removeAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)

assertDoesNotThrow(new Executable {
override def execute(): Unit = consumer.unsubscribe()
})
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = {
Copy link
Member

@lianetm lianetm Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is valuable, but if we agree on leaving this PR for the unsubscribe to leverage the close/callbacks changes in #16686 we should probably bring this test back in a separate PR, after that one goes in. What do you think?

Copy link
Member Author

@FrankYang0529 FrankYang0529 Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lianetm, I would like to confirm again: do you mean that we disable close test cases currently, revert change in AsyncKafkaConsumer#releaseAssignmentAndLeaveGroup function, and then we will enable this test after #16686 is merged? Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @lianetm, thanks for your review. I add test cases for TopicAuthorizationException and disable test cases for close function. Could you help me review again when you have time? Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like #16686 is almost ready. I will remove @Disabled after it's merged.

val topic = "topic"
createTopic(topic, listenerName = interBrokerListenerName)

// allow topic read/write permission to poll/send record
addAndVerifyAcls(
Set(createAcl(AclOperation.WRITE, AclPermissionType.ALLOW), createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)
)
val producer = createProducer()
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "message".getBytes)).get()

// allow group read permission to join group
val group = "group"
addAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)

val props = new Properties()
props.put(ConsumerConfig.GROUP_ID_CONFIG, group)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
val consumer = createConsumer(configOverrides = props)
consumer.subscribe(List(topic).asJava)
TestUtils.pollUntilAtLeastNumRecords(consumer, numRecords = 1)

removeAndVerifyAcls(
Set(createAcl(AclOperation.READ, AclPermissionType.ALLOW)),
new ResourcePattern(ResourceType.GROUP, group, PatternType.LITERAL)
)

assertDoesNotThrow(new Executable {
override def execute(): Unit = consumer.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar situation with close. I don't believe that the consumer swallows the exceptions, I believe they just don't come out here because there is no known coordinator (so no group request or client poll on close). If my understanding is right, we shouldn't swallow them either on the new consumer

})
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testAuthorizedProduceAndConsume(quorum: String, groupProtocol: String): Unit = {
Expand Down