-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
State commit / cleanup #9
Conversation
public RecordQueue(TopicPartition partition, SourceNode source) { | ||
this.partition = partition; | ||
this.source = source; | ||
|
||
this.fifoQueue = new ArrayDeque<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you move these trivial variable initializations to constructor?
Is it the recommended Kafka coding style? Just wondering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really kafka conventions but just personal coding style.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an issue for moving tirivial initialization to constructor?
Left a minor comment. |
@ymatsuda
Task state commit can happen either when user called context.commit() during the processing of a record in any processor, or when the commit interval has reached. In the first case, the task will commit it state including local state store, consumed offset and produced record; in the latter case, the thread will just commit the states of all tasks it owns.
Did not have the threshold on total #. buffered records across tasks, since after chatting to @hachikuji I feel poll(0)'s overhead is very minimal, and asking users to specify the threshold could be hard since there is already a config for per-partition max buffered records.
Some minor fixes on metrics recording, state cleanup, etc.