-
Notifications
You must be signed in to change notification settings - Fork 536
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rhythm: fair partition consumption #4655
base: main
Are you sure you want to change the base?
rhythm: fair partition consumption #4655
Conversation
HI, Javi. You have the docs box checked in the description but I don't see any. Are you working on docs for this? |
My bad, I clicked all of them |
func (b *BlockBuilder) running(ctx context.Context) error { | ||
// Initial delay | ||
waitTime := 0 * time.Second | ||
consumeCycleBackoffFactor := 0.8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it is used when there is more data. What is the rationale for waiting, instead of immediately consuming again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea is to avoid excessive polling dynamically. If we consume immediately we could increase Kafka load and CPU usage. We can replace it by a fixed time like 1-second wait. Giving Kafka more time to accumulate new records
return false, err | ||
} | ||
if moreRecords { | ||
more = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change is losing the condition that it waits for enough data to be present in a partition before consuming again. I.e. moreRecords means lag >= consumeCycleDuration. Because it is losing the per-partition status, on the next consumption loop a partition that returned moreRecords==false is still consumed, and would lead to smaller inefficient flushes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the next consumption loop we will compute the partition lag again. If the partition has no records then it won't be consumed. I think it's a tradeoff, waiting less and flushing more seems to work better to reduce the lag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing we can do is do a couple of cycles for the partition which has more lag. That way we ensure that all the partitions are consumed on every cycle but still give preference to the laggiest one like
highLagPartition := sortedPartitions[0]
for i := 0; i < 2; i++ {
if !highLagPartition.hasRecords { // No records, we can skip the partition
continue
}
moreRecords, err := b.consumePartition(ctx, highLagPartition)
if err != nil {
return false, err
}
if moreRecords {
more = true
} else {
break
}
}
// Partitions with more lag are priorized
for _, partition := range sortedPartitions[1:] {
}
I think is simple and straightforward
What this PR does:
The current loop has two drawbacks. The first one is that it keeps consuming from a single partition while it has a record, neglecting the others. This is not a problem right now since every blockbuilder is in charge of a single partition. The second problem is the main run loop; after consuming a complete cycle (that can take much more than the consuming cycle due to the first drawback), it waits another fixed time, incrementing the lag.
This pr tries to tackle this by making a round-robin approach but prioritizing first the partitions with more lag. For that, we get the last committed offset and the last offset of the partition, subtract them and sort the ones with a higher lag first.
Another way to do this sorting is using the last record timestamp but I have found it's harder to implement since we would need to save the state.
With this approach, every partition is consumed just once for each cycle. Every time the cycle ends if there are more records left the waiting time is reduced by an 80% (this can be configurable)
The number of FetchOffsetsForTopics requests has been reduced now to only one request per cycle instead of one request for every partition cycle
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]