-
Notifications
You must be signed in to change notification settings - Fork 139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel Consumer keeps committing the old offset after OFFSET_OUT_OF_RANGE in auto.offset.reset = latest #409
Comments
Yup, you got it in one! Nice find! The WorkManager needs to be informed from the consumer of the issue, and update the state accordingly. Want to take a stab at a PR? Is your work around for this, restating PC?
PC will keep trying to do the commit every commit frequency - i.e. it will retry. I'm not aware of any state the underlaying consumer could get into, which would prevent a retry from happening. However, as you've identified - if the partition's state has changed on the broker (through retention or compaction) - WorkManager must be made aware. |
Hi @astubbs, thanks for looking into this and confirming it's a verified bug on "offset out of range" scenarios. As for PR, I don't feel like I'm ready for it at this point. Will definitely try later if I can.
No, restarting PC didn't help at all. I had to manually reset the offset on topic to clear the CG lag. The reason why I asked that specific question was that I could be wrong, but have a suspicion that CG lag might have started happening before "offset out of range" issue comes in. Could you help confirm if my understanding below is correct?
Also, please help me understand how PC would behave in below hypothetical scenario. Please assume, they all belong to same key and we are using KEY ordering.
Where,
Do you think this is a valid scenario. If yes, how will PC recover from such situation without causing the CG lag?
Can I expect that the fix for this verified bug will also take care of such consumer problem? Please share if you think there's any other possible edge case scenarios where highest committable offset somehow stuck or remains the same. |
Hi @astubbs, appreciate if you could share ETA on when the fix for this bug could be released and if you could answer some of my questions in previous comment? This will really help with my investigation on future CG lag issues. Thanks! |
Hi again! Just finished prepping for Confluent's Current conference next week, then there's these issues in the merge queue for this week:
Then I'll look at patching this, I consider this the next highest priority for PC. |
Can you help me understand in your example - is record 86321 effectively disappearing underneath PC, so it's unable to successfully complete processing of it? |
Track expected offsets returned from broker, and truncate where needed.
Hi @leonoah86 , take a look at the direction I'm heading in: I thought this wold only need to be done on startup, but it probably needs to be checked continuously. Take a look and let me know if you think that will cover your situations. |
Yeah that's right - it can continue, because it's just processing what the broker sends it, there's no problem there.
Yes, it skips what's decoded from the metadata as completed previously. On startup, it loads the metadata - the metadata isn't kept in sync with what's on the partition by the broker.
The offset reset is done locally by the client, not on the broker. Broker just responds with the >= offset from the committed offset.
yeah...
PC will just continue retrying 4 until either it succeeds, or retries expires or user function skips it. See the offset storage FAQ part of the readme.
Yes, by default every second. It's configurable.
Only records with different keys.
yes
as far as I can tell, the bug report us unrelated to the scenario you describe. Please clarify if I got that wrong.. |
Hi @astubbs, thanks for your response and looking into this.
No, it's available when it was being processed and later it was deleted by retention policy. To simulate in my local, I used a topic with smallest possible retention time and size. Started the consumer which is slow and intermittently trying to interrupt the consume thread. And then I kept pumping the messages faster than what consumer can process. After a while, I started another consumer to trigger the rebalance. Then this "off out of range" error started showing up. I'm still trying to figure out what caused the CG lag before we eventually ran into "offset out of range" scenario. I suspect it's the scenario where consumer thread somehow got interrupted and caused the highest committable offset stuck. We had the similar issue in 2 other different environments. I have seen evidences in Splunk, where processing of some records in the partition in question got interrupted (i.e. didn't print "PROCESSING_COMPELTED") and were not retried.
Please note that my consumer implementation skips the record on failure. So, retry may not be applicable for scenario. Can you confirm if PC guarantee that in scenarios where consumer thread got interrupted, such records will remain in the queue and will be retried later? |
Yes unless there's explicit success, things will always get retried. If you have the time - writing an integration test with the scenario you describe would be awesome. But I think I've figured it out. Will try finish the draft on the flight to Austin today for Current :) |
Regarding retries - try with the version released yesterday 0.5.2.3 - there were some fixes there related to retrying. |
Thanks @astubbs. Will try to replicate the scenario I described using latest version and update. Regarding fixes for truncating state on offset high reset, with normal Kafka consumer, sometimes, we manually reset the offset of CG to an earlier offset to force replay and reprocess old messages.
|
Yes exactly. It's not currently supported, but on the flight I figured it out. There's three things that need to be added, each of which straightforward to add, but need to be careful as it's critical path. Reset scenarios: Newest - detected via - bootstrap offset higher than expected. Earliest - Detected by bootstrap offset less than expected. Action: remove all offset state and start from nothing. Compacted / retention: This work needs to be added anyway before finishing:
|
Hi @astubbs, I was able to replicate the CG lag caused by consumer thread getting stuck or interrupted for PC version 0.5.2.0. Retry was not triggered for the offset in question. It started printing warning message after 10s, saying 1 records in the queue have been waiting longer than 10s for following topic, etc. However using latest PC version 0.5.2.3, I can no longer replicate the same scenario because you recently modified UserFunctions.java file to catch Throwable. Now it printed this error stack trace on the console and retry was triggered after a while. It just mean that I can no longer simulate the scenario the way I did, however, if the consumer thread ever get stuck or interrupted, it may still cause the CG lag, do you agree?
CG lag will go away when rebalance happens and partition get reassigned. Otherwise, it will keep on growing. If it continues running for a few days, then we will run into "offset out of range" error scenario. Do you think if processing of a message is taking longer than a certain threshold, PC should do something about it? |
Generally, in your user function, you can access the retry count for a record, and choose to skip it if you don't want to retry it anymore. Is that what you're asking? Otherwise - I've just pushed the draft of the solution for detecting the resets - lower and higher offset commits, as well as missing offsets other than the committed one. Take a look and see what you think? Specifically: and |
I haven't tested it yet, but if you want to try it - you can check out that branch and give it a go. |
Not really, in the scenario where thread invoking user function get stuck or interrupted, I don't think there's anything we can do from user function. I think PC could do something about it. Right now, it prints the warning message if the messages remain in queue for more than 10s. I'm wondering if PC should invoke the user function with timeout and either retry or drop the message from queue (skip), but not everyone may want to retry or skip, configurable behavior? if it's taking longer than a certain threshold so that the lag doesn't happen when this happened. What do you think? In normal kafka client, we have max.poll.interval.ms which is 5 minutes by default. If the processing of the messages in a batch take longer than 5 minutes, rebalance will occurs. Unfinished offsets will be reprocessed again on next poll. This is how it can recover from such scenario. So, I thought it would be nice if PC could do something similar with timeout. Meanwhile, let me check out your branch and give it a go. |
FYI that code as pushed def doesn't work - sorry if you spent some time on it. I have a tested branch now that's looking good. Just finishing some refactoring. Will push it up soon. |
ok I got what you were saying the other week. That would be a completely separate issue / feature. It should also only /ever/ happen if your user function continues to fail processing. In which case the user should choose to skip it or not - not PC. Anything else is a bug in PC, which should be fixed. (i.e. this). I guess you could argue the feature could be a "fail safe" for bugs in PC. But you could also do the same by monitoring consumer lag like any consumer application, and if lag gets too big - try kill the app. If you want to pursue this feature - please open a new issue and we can discuss it there, I'd like to keep this one focused. I've fixed the issue, and am pushing up the clean brach today. |
…ting (#425) - Truncate the offset state when bootstrap polled offset higher or lower than committed - Prune missing records from the tracked incomplete offset state, when they're missing from polled batches Also introduces a new way to instantiate test brokers - so we can make temporary ones for compaction settings.
Hi @leonoah86 this is merged now. Can you please test the snapshot version? |
Hi @astubbs, thanks for your responses and working on the fix.
yes. I will open a new issue.
Currently, I'm a bit busy with BAU works. Will test it in a few days and update. |
Hi @astubbs, sorry for the delay. I ran into some issue building the snapshot version behind proxy, using latest master branch and downloading the latest snapshot version into company workspace where I have test setup and everything. Could you release the new version if you are good with the changes? Even if I do test, it will just be a random test specific to my use case. I can always file a new bug against the release version or update here if it's not working as expected. |
Please ignore. I just saw you have already released the new version 4 days ago. Will try that out. Thanks! |
Hi @astubbs,
Thanks for this great library. Recently, I have been consistently seeing the consumer lag on some partitions of the topic when offset become out of range for that partition. This happens even with latest PC version - 0.5.2.2. I'm using KEY ordering.
This might be related to #352. Sample logs below.
12-09-2022 11:48:42.904 TraceId/SpanId: / [pc-broker-poll] INFO org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange - [Consumer clientId=my-topic-consumer-client, groupId=my-topic-consumer] Fetch position FetchPosition{offset=86321, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:6668 (id: 1 rack: null)], epoch=596}} is out of range for partition my-topic-4, resetting offset 12-09-2022 11:48:43.148 TraceId/SpanId: / [pc-broker-poll] INFO org.apache.kafka.clients.consumer.internals.SubscriptionState.maybeSeekUnvalidated - [Consumer clientId=my-topic-consumer-client, groupId=my-topic-consumer] Resetting offset for partition my-topic-4 to position FetchPosition{offset=88617, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:6668 (id: 1 rack: null)], epoch=596}}. ..
New message at offset 88618 get consumed by the consumer
12-09-2022 11:49:32.550 TraceId/SpanId: c65c59c9b8c75d88/b26a1c4eea95894d [pc-pool-3-thread-1] INFO my.test.kafka.consumer.MyConsumer.consume - event=PROCESSING_COMPLETED, Event details: [id=cfd1d7e0-324d-11ed-8e3e-99871a3f782d, name=Event started, partition=4, offset=88618] 12-09-2022 11:49:32.550 TraceId/SpanId: a49e9995c6147711/d466506b2f29b224 [pc-pool-3-thread-1] INFO my.test.kafka.consumer.MyConsumer.consume - event=PROCESSING_COMPLETED, Event details: [id=cbabd4e0-324d-11ed-8e3e-99871a3f782d, name=Event completed, partition=4, offset=88617] 12-09-2022 11:49:32.781 TraceId/SpanId: c2a3cfd96ea76036/37104cfb0345e5b1 [pc-pool-3-thread-3] INFO my.test.kafka.consumer.MyConsumer.consume - event=PROCESSING_COMPLETED, Event details: [id=cff676e0-324d-11ed-8e3e-99871a3f782d, name=Other event created, partition=4, offset=88619] ..
But, old offset 86321 was being repeatedly committed. Hence, the consumer lag happens and it's continuously growing.
12-09-2022 11:49:35.552 TraceId/SpanId: / [pc-broker-poll] DEBUG io.confluent.parallelconsumer.internal.AbstractOffsetCommitter.retrieveOffsetsAndCommit - Commit starting - find completed work to commit offsets 12-09-2022 11:49:35.552 TraceId/SpanId: / [pc-broker-poll] DEBUG io.confluent.parallelconsumer.internal.AbstractOffsetCommitter.retrieveOffsetsAndCommit - Will commit offsets for 5 partition(s): {my-topic-4=OffsetAndMetadata{offset=86321, leaderEpoch=null, metadata='bgAKAAMACgACAAsBDgACB8Y='}, my-topic-3=OffsetAndMetadata{offset=89148, leaderEpoch=null, metadata='bAAJfgEA'}, my-topic-2=OffsetAndMetadata{offset=88725, leaderEpoch=null, metadata='bAATPIAGAA=='}, my-topic-1=OffsetAndMetadata{offset=107886, leaderEpoch=null, metadata='bAAGIAA='}, my-topic-0=OffsetAndMetadata{offset=89200, leaderEpoch=null, metadata='bgALAAE='}} 12-09-2022 11:49:35.552 TraceId/SpanId: / [pc-broker-poll] DEBUG io.confluent.parallelconsumer.internal.AbstractOffsetCommitter.retrieveOffsetsAndCommit - Begin commit ..
The text was updated successfully, but these errors were encountered: