-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17648: AsyncKafkaConsumer#unsubscribe swallow TopicAuthorizationException and GroupAuthorizationException #17516
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this PR achieving the same effect as PR #17440?
Yes, it's same. I will close this one. |
Hi @kirktrue, after discussing with @m1a2st, this PR will focus metadata error on Could you help me review #17199 first? This PR will rely metric to know there is background event, so we can check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @FrankYang0529, Thanks for this PR, left a small suggestion, PTAL
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
Outdated
Show resolved
Hide resolved
9ce814f
to
17cb5ec
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @FrankYang0529, interesting area, but some high level concerns with the intention here, let me know what you think. Thanks!
assertDoesNotThrow(new Executable { | ||
override def execute(): Unit = consumer.unsubscribe() | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uhm I think we may be not getting the classic consumer behaviour right here. I would expect this assertion passes for the classic consumer here only because of how the test is written (never finds a coordinator), but that doesn't mean that the classic consumer does not throw topic or group auth exceptions on unsubscribe (which is what the changes in this PR are trying to achieve for the new consumer)
- I believe classic does not throw GroupAuthException here only because it never finds a coordinator, so the unsubscribes perform no action on the group (but if it had had a known coordinator, the unsubscribe would send a leave group that I expect would fail with GroupAuthException)
- I believe classic does not throw TopicAuthException here only because unsubscribe does not poll the network client if it doesn't have a coordinator to send the leave group to.
I could definitely be missing something, but we could validate my expectations with an integration test here:
- consumer subscribes to a group successfully (has acls to READ + GROUP + "group-name-from-config")
- looses the acls (using
removeAndVerifyAcls
) - consumer.unsubscribe -> I would expect that this should indeed throw a group authorization exception received in the response to the leave group, or the topic auth exception propagated from metadata (honestly not sure about the precedence, but both should be there)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @lianetm, thanks for the review and suggestion. I try to separate it with 4 test cases:
- testConsumeUnsubscribeWithoutTopicPermission
- testConsumeCloseWithoutTopicPermission
- testConsumeUnsubscribeWithoutGroupPermission
- testConsumeCloseWithoutGroupPermission
Although these 4 cases can pass, I need some time to check whether it can really test classic consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sharing more details about my expectations of what the classic consumer does (so the contract we should keep):
- do not throw topic or group auth exceptions on unsubscribe simply because it does not wait for the leave group response after sending the request on
maybeLeaveGroup
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
Line 567 in 8cbd2ed
this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics"); - could throw topic/group auth on close because it does wait for responses (inflight requests) after the
maybeLeaveGroup
kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
Line 1140 in 8cbd2ed
if (coordinator != null && !client.awaitPendingRequests(coordinator, timer))
So we need to throw those on close if they exist, but not on unsubscribe (let's align the expectations on the test with this understanding I would say, and we'll validate it). Let me know what you think/find.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @lianetm, thanks for sharing the details in ClassicKafkaConsumer
. For ClassicKafkaConsumer#unsubscribe
, it doesn't check future object from maybeLeaveGroup
, so unsubscribe
doesn't throw exceptions. I remain unsubscribe
function test for AsyncKafkaConsumer
only, and keep close
function test for both consumers.
I have another question here. IIUC, we don't need to swallow TopicAuthorizationException
, because we don't check topic permission when doing consumer group heartbeat request. So I think we only need to swallow GroupAuthorizationException
here. WDYT? Thanks.
kafka/core/src/main/scala/kafka/server/KafkaApis.scala
Lines 3812 to 3815 in 9db5ed0
} else if (!authHelper.authorize(request.context, READ, GROUP, consumerGroupHeartbeatRequest.data.groupId)) { | |
requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) | |
CompletableFuture.completedFuture[Unit](()) | |
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For ClassicKafkaConsumer#unsubscribe, it doesn't check future object from maybeLeaveGroup, so unsubscribe doesn't throw exceptions
Agreed, the unused returned value is the reason why the error doesn't bubble up. Still I think we should keep the test for both consumers (it does pass for the classic I expect, this is just the reason why it passes)
I have another question here. IIUC, we don't need to swallow TopicAuthorizationException, because we don't check topic permission when doing consumer group heartbeat request. So I think we only need to swallow GroupAuthorizationException here. WDYT
The trick is that the client internally sends metadata request on poll, and is on those that we could get the topic auth error (that would bubble up in the consumer on processBackgroundEvents). Because of that I think we need to consider that we could receive both (group auth received in a HB response, topic auth received in a metadata response)
Thread.sleep(3000) | ||
|
||
assertDoesNotThrow(new Executable { | ||
override def execute(): Unit = consumer.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar situation with close. I don't believe that the consumer swallows the exceptions, I believe they just don't come out here because there is no known coordinator (so no group request or client poll on close). If my understanding is right, we shouldn't swallow them either on the new consumer
17cb5ec
to
bbdf6f0
Compare
f7519d2
to
7a3670f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates and responses @FrankYang0529! Some comments mainly to limit the scope to unsubscribe
given that there are important changes happening in parallel on consumer.close
that would simplify this issue. Let me know what you think
// If users subscribe to a topic with invalid name or without permission, they will get some exceptions. | ||
// Because network thread keeps trying to send MetadataRequest or ConsumerGroupHeartbeatRequest in the background, | ||
// there will be some error events in the background queue. | ||
// When running close, these exceptions should be ignored, or users can't close successfully. | ||
processBackgroundEvents(unsubscribeEvent.future(), timer, | ||
e -> e instanceof InvalidTopicException || e instanceof GroupAuthorizationException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about reverting these changes to keep this PR addressing the unsubscribe only? (aligned with the PR name). I think it will be convenient because there is another ongoing PR #16686 changing the close, and this "releaseAssignmentAndLeaveGroup" does not processBackgroundEvents anymore, so the issue you're trying to solve here disappears (we might remain with the other issue of events unaware of metadata errors, but that is already addressed in another PR #17440 too so let's address those concerns there). Makes sense?
assertDoesNotThrow(new Executable { | ||
override def execute(): Unit = consumer.unsubscribe() | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For ClassicKafkaConsumer#unsubscribe, it doesn't check future object from maybeLeaveGroup, so unsubscribe doesn't throw exceptions
Agreed, the unused returned value is the reason why the error doesn't bubble up. Still I think we should keep the test for both consumers (it does pass for the classic I expect, this is just the reason why it passes)
I have another question here. IIUC, we don't need to swallow TopicAuthorizationException, because we don't check topic permission when doing consumer group heartbeat request. So I think we only need to swallow GroupAuthorizationException here. WDYT
The trick is that the client internally sends metadata request on poll, and is on those that we could get the topic auth error (that would bubble up in the consumer on processBackgroundEvents). Because of that I think we need to consider that we could receive both (group auth received in a HB response, topic auth received in a metadata response)
@@ -132,6 +133,84 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { | |||
assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) | |||
} | |||
|
|||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | |||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test passes for both consumers I expect right? We should enabled it for both
|
||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | ||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) | ||
def testConsumeCloseWithoutGroupPermission(quorum: String, groupProtocol: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is valuable, but if we agree on leaving this PR for the unsubscribe to leverage the close/callbacks changes in #16686 we should probably bring this test back in a separate PR, after that one goes in. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @lianetm, thanks for your review. I add test cases for TopicAuthorizationException
and disable test cases for close
function. Could you help me review again when you have time? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like #16686 is almost ready. I will remove @Disabled
after it's merged.
cb6bbe5
to
4b76ef6
Compare
@@ -132,6 +133,136 @@ class GroupAuthorizerIntegrationTest extends BaseRequestTest { | |||
assertEquals(Set(topic), consumeException.unauthorizedTopics.asScala) | |||
} | |||
|
|||
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) | |||
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) | |||
def testConsumeUnsubscribeWithoutTopicPermission(quorum: String, groupProtocol: String): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this is better covered at the unit test level only. Here I don't see how we can trust the test is actually testing the unsubscribe changes. The trick is that the topic error comes in a metadata response, but the unsubscribe completes successfully as soon as it gets a response to the HB, so it will always complete ok unless we get a metadata response before the HB response right? It's a really small window btw, because we only processingBackgroundEvents (discover errors) from the moment we send the Unsubscribe (leave HB), to the moment we get a response. Then we stop processing background events, so nothing will be thrown even if it arrives in a response.
The other testConsumeUnsubscribeWithoutGroupPermission
makes sense to me, because the group error comes in a HB response, as well as the unsusbcribe response, so we can trust that if the unsubscribe does not throw is because we're indeed swallowing the exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @lianetm, thanks for review and suggestion. I move TopicAuthorizationException
related test cases to AsyncKafkaConsumerTest
. Also, I update the title with close
function, so we can add close
related test cases in this PR. WDYT? If you prefer to use another PR to include close
test cases, I can change it. Thanks.
720ec4c
to
f8880cd
Compare
21bf2b5
to
ffb000d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @FrankYang0529 ! LGTM.
I would only suggest we update the PR title, to remove the reference to close, because this PR does not change anything related to the close logic (just adds more tests for it), makes sense? Also, in the PR description where it refers to "users can't close ..." I guess it should say unsubscribe, which is what we're fixing in this PR. Thanks!
Hi @lianetm, thanks for review and suggestion. Updated both title and PR description. Thanks. |
…uthorizationException and GroupAuthorizationException Signed-off-by: PoAn Yang <[email protected]>
ffb000d
to
978d6a4
Compare
…uthorizationException and GroupAuthorizationException (apache#17516) Reviewers: Lianet Magrans <[email protected]>, Kirk True <[email protected]>
…uthorizationException and GroupAuthorizationException (apache#17516) Reviewers: Lianet Magrans <[email protected]>, Kirk True <[email protected]>
If users subscribe to a topic with invalid name or without permission, they will get some exceptions. Network thread sends MetadataRequest and ConsumerGroupHeartbeatRequest in the background, so there will be some error events in the background queue. When running
AsyncKafkaConsumer#unsubscribe
, these exceptions should be ignored, or users can't unsubscribe the consumer successfully.Committer Checklist (excluded from commit message)