-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-16460: New consumer times out consuming records in multiple consumer_test.py system tests #17777
Conversation
e608c5b
to
c274232
Compare
c274232
to
6b0021c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch @FrankYang0529 ! This makes sense to me. It was actually the initial intention
https://github.com/lianetm/kafka/blob/e227a9cac887a201eed9105ed47337fb35acb0ae/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L872-L886
I guess we lost it with a refactoring and didn't have test coverage for that corner case.
...nts/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
Outdated
Show resolved
Hide resolved
...nts/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
Outdated
Show resolved
Hide resolved
// Mark partitions as pending revocation to stop fetching from the partitions (no new | ||
// fetches sent out, and no in-flight fetches responses processed). | ||
System.out.println("revokedPartitions = " + revokedPartitions); | ||
markPendingRevocationToPauseFetching(revokedPartitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we still need the other call now that we have this at the beginning of the reconciliation before commits?
Line 1117 in cc20e78
markPendingRevocationToPauseFetching(revokedPartitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to keep markPendingRevocationToPauseFetching
in AbstractMembershipMember#revokePartitions
, because ConsumerMembershipManager#invokeOnPartitionsRevokedOrLostToReleaseAssignment
also uses the function.
Although it's redundant for consumer heartbeat response path, it's still required for ConsumerMembershipManager#signalMemberLeavingGroup
path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense.
6b0021c
to
d08e4c8
Compare
// Mark partitions as pending revocation to stop fetching from the partitions (no new | ||
// fetches sent out, and no in-flight fetches responses processed). | ||
System.out.println("revokedPartitions = " + revokedPartitions); | ||
markPendingRevocationToPauseFetching(revokedPartitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense.
@@ -1456,6 +1457,7 @@ public void testReconcilePartitionsRevokedWithSuccessfulAutoCommitNoCallbacks() | |||
|
|||
// Complete commit request | |||
commitResult.complete(null); | |||
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new TopicPartition("topic1", 0))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we extend this test to ensure that the markPending happens before the commit is triggered? (Maybe using InOrder like I'm doing here)
Lines 434 to 436 in 64d020a
InOrder inOrder = inOrder(backgroundEventHandler, membershipManager); | |
inOrder.verify(backgroundEventHandler).add(any(ErrorEvent.class)); | |
inOrder.verify(membershipManager).onHeartbeatFailure(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated it. Thanks for the suggestion.
…sumer_test.py system tests Signed-off-by: PoAn Yang <[email protected]>
d08e4c8
to
9517c2d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the good catch and improved test coverage @FrankYang0529! LGTM.
…sumer_test.py system tests (apache#17777) Reviewers: Lianet Magrans <[email protected]>
…sumer_test.py system tests (apache#17777) Reviewers: Lianet Magrans <[email protected]>
When a consumer receives group heartbeat with revoked partitions. If the consumer enable auto commit, it will send a OffsetCommitRequest before removing revoked partitions from subscription state. There are two potential race conditions in AsyncKafkaConsumer:
poll
function. However, the auto commit function doesn't work for these data, because the consumer mark revoked partitions withpendingRevocation=true
after we receives OffsetCommitResponse. At that time, there is no further OffsetCommitRequest will be sent by auto commit feature.To fix this, we have to mark
pendingRevocation=true
before the consumer sends last OffsetCommitRequest, so FetchCollector can't get revoked partition data from FetchBuffer and FetchRequestManager doesn't send new FetchReqeust with revoked partitions.I run
consumer_test.py.test_broker_failure
10 times without error.Committer Checklist (excluded from commit message)