Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively #15844

Closed

Conversation

kirktrue
Copy link
Collaborator

@kirktrue kirktrue commented May 2, 2024

This issue is related to an optimization for offset fetch logic.

When a user calls Consumer.poll(), among other things, the consumer performs a network request to fetch any previously-committed offsets so it can determine from where to start fetching new records. When the user passes in a timeout of zero, it's almost always the case that the offset fetch network request will not be performed within 0 milliseconds. However, the consumer still sends out the request and handles the response when it is received, usually a few milliseconds later. In this first attempt, the lookup fails and the poll() loops back around. Given that this timeout is the common case, the consumer caches the offset fetch response/result from the first attempt (even though it timed out) because it knows that the next call to poll() is going to attempt the exact same operation. When it is later attempted a second time, the response is already there from the first attempt such that the consumer doesn't need to perform a network request.

The existing consumer has implemented this caching in PendingCommittedOffsetRequest. The new consumer has implemented it in CommitRequestManager. The core issue is the new consumer implementation is clearing out the first attempt's cached result too aggressively. The effect being that the second (and subsequent) attempts fail to find any previous attempt's cached result, and all submit network requests, which all fail. Thus the consumer never makes any headway.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@kirktrue kirktrue changed the title KAFKA-16637: KIP-848 does not work well KAFKA-16637: AsyncKafkaConsumer removes offset fetch responses from cache too aggressively May 2, 2024
@kirktrue
Copy link
Collaborator Author

kirktrue commented May 2, 2024

@kirktrue Should we rename the title of the PR/Jira now that we understand the root cause?

Done 👍

@kirktrue kirktrue marked this pull request as draft May 2, 2024 19:27
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can we have a test which validates this new behaviour? It's quite subtle and I worry that it might be inadvertently broken by an adjacent code change in the future.

* almost immediately.
*/
private void maybeRemoveInflightOffsetFetch(OffsetFetchRequestState fetchRequest, Throwable error) {
if (error == null && !fetchRequest.isExpired) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line implies a big change in the current logic, that I wonder if we're taking too far. Agree with not removing the expired requests (that's the root cause of the problem we have), but why putting all errors (not only timeout) in the same bucket? With this new check, how are we ensuring that fetch requests that fail fatally are removed from the inflight queue?

@lianetm
Copy link
Member

lianetm commented May 6, 2024

One concern on comment above about how we identify this situation (inflight fetch requests that we shouldn't delete too soon).

Another one about where to consider the situation. Inflight requests are removed in 2 places: direct call to fetch (handled in this PR), but also from the commit manager poll. The commit manager (as other managers) has logic for removing all expired requests in its poll loop, when calling failAndRemoveExpiredFetchRequests. Shouldn't we consider that too?

@lianetm
Copy link
Member

lianetm commented May 6, 2024

Did we consider the approach of simply decoupling the request timeout from the application event timeout? We could issue the fetch request without a time boundary (max value probably), and get the application event result with the time boundary (here).

Expressing the intention when creating the request and event seems clearer and brings what we want: fetch requests would remain in the background thread until they get a response or timeout, so they could be reused by a following fetch application event (for the same partitions). Then we could keep the manager logic simple and consistent around how inflights are maintained (removed when they get a response or expire, as it is now). I may be missing something, thoughts?

@lianetm
Copy link
Member

lianetm commented May 16, 2024

FYI, I found another issue that looks to me it's related to this same situation: https://issues.apache.org/jira/browse/KAFKA-16777

@kirktrue
Copy link
Collaborator Author

Thanks for the review, @lianetm. Agreed on all your points. This PR is a draft because it's a PoC and more thought and tests are needed.

@kirktrue kirktrue closed this May 31, 2024
@kirktrue kirktrue deleted the KAFKA-16637-keep-cached-offset-fetch-result branch May 31, 2024 13:32
@lianetm
Copy link
Member

lianetm commented Jun 3, 2024

Hey @kirktrue , this is the simple integration test I was suggesting on the other timeout PR, that should be an easy way to ensure that we're being able to fetch offsets and make progress on continuous poll(ZERO), which is what this PR should unblock I expect (just as guidance, needs tweaks to ensure TestUtils polls with 0). Leaving it here in case it helps validate the changes:

  // Ensure TestUtils polls with ZERO. This fails for the new consumer only.
  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
  def testPollEventuallyReturnsRecordsWithZeroTimeout(quorum: String, groupProtocol: String): Unit = {
    val numMessages = 100
    val producer = createProducer()
    sendRecords(producer, numMessages, tp)

    val consumer = createConsumer()
    consumer.subscribe(Set(topic).asJava)
    val records = awaitNonEmptyRecords(consumer, tp)
    assertEquals(numMessages, records.count())
  }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants