-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka ingestor fix #233
Kafka ingestor fix #233
Conversation
@@ -180,7 +184,14 @@ func (ingestC *ingestCollector) processLogLines(out chan<- []interface{}) { | |||
case <-flushRecords.C: | |||
// Process batch of records (if not empty) | |||
if len(records) > 0 { | |||
log.Debugf("ingestCollector sending %d entries", len(records)) | |||
if len(ingestC.in) > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix is here, if the out channel is full, then we only loop once or twice in the ingest case before looping again in this case.
pkg/pipeline/encode/encode_kafka.go
Outdated
@@ -106,6 +106,9 @@ func NewEncodeKafka(params config.StageParam) (Encoder, error) { | |||
WriteTimeout: time.Duration(writeTimeoutSecs) * time.Second, | |||
BatchSize: jsonEncodeKafka.BatchSize, | |||
BatchBytes: jsonEncodeKafka.BatchBytes, | |||
// Temporary fix may be we should implement a batching systems | |||
// https://github.com/segmentio/kafka-go/issues/326#issuecomment-519375403 | |||
BatchTimeout: 10 * time.Millisecond, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the way Kafka works. Messages are flushed until one of the following conditions is fulfilled:
- The batch size is reached
- The batch bytes is reached
- The batch timeout is reached
For high-load scenarios, this small timeout could be counter-productive. I'd increase it to at least 200 or 500 ms and allow it to be configured by the customer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree that this is the normal way to use Kafka.
However, here this will not work since we handle flows sequentially. This means that if we call WriteMessage but the flush conditions are not fulfilled, we will wait for the timeout without reason since no one else is going to push messages.
In the conditions here, configuring the batch timeout without any refactoring, will only result in performance decrease. As en example, the default timeout value is 1 second. With such a value performances were capped at 500 flows per second, 500 being the internal batch size used by FLP.
I can create a task to refactor the kafka incoder, but I was also thinking, this could be a good improvement to gopipe to provide the possibility to run multiple goroutine for a single stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making WriteMessage async was also a possibility but we would have lost any error handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should do an experimental test to see how this library is implemented, but the way I interpret it should work (at least other libraries do) is, e.g. if batch size is 500 and timeout is 1 second:
-
If you receive 250 messages during a second, they will be accumulated during 1 second and then flushed.
-
if you receive 2000 messages during a second, they will be submitted in 4 batches during this second, e.g. it will send a bunch of 500 messages at 250ms, another bunch at 500ms, another at 750ms... (assuming uniform distribution). It doesn't mean that rate is limited to 500 messages per second because one flush condition (the batch size) is satisfied before the other (the timeout).
Maybe we could agree that, for systems with low traffic, adding a 1-second timeout in an intermediate stage is not acceptable, so we could decrease timeout to 200-500 ms.
The problem of decreasing the timeout to 10ms is that the system could struggle when the traffic is high.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been digging more into the linked issue. If the case is that the kafka library doesn't batch internally (It's hard to believe is designed like that), I'd use another library, because even for low timeouts you will always find a high-enough scenario that get performance limited.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that the library does batch internally, but the WriteMessages function is blocking until all messages have been flushed.
So if we try to send an incomplete batch, the writeMessages will block waiting for the timout since we handle the write sequentially.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm doing a test to verify how it actually behaves. My expectation is that writemessage should only block only until the server acknowledges.
Sometimes this is even too much and that's why many high-performance environments use async communications (you loose the error messages but anyway we are just logging and ignoring them). I prepared the eBPF agent to be configured asynchronously by the customer. Maybe we can do it also here so the user could choose.
More results on this writemessage issue soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been experimenting with the 3 main kafka libraries here: https://github.com/mariomac/kafka-go-experiment
The results are that both Segmentio's kafka-go and Sarama behave the same: they don't accumulate batches internally so each write invocation is treated as a single batch, waiting for timeout before flush.
On the other side, Confluent's Kafka library behaves as expected according to the actual documentation. This is also the way the Java library behaves, so that's why I didn't understood what was the problem with timeouts.
I'd suggest that, at this moment, you decrease the batch timeout to time.Nanosecond
to avoid that the library throttles the flows.
Then, in another PR, consider one of the following options:
- Use async communication by default.
- Create our own batch accumulator on top of the current client.
- Migrate to Confluent's Kafka library (with the inconvenience of having to enable GCO).
I'll accordingly modify the eBPF agent Kafka producer.
5626cf0
to
7c96d50
Compare
3 different things in this patch: