Skip to content

Commit

Permalink
Merge pull request #233 from OlivierCazade/NETOBSERV-389
Browse files Browse the repository at this point in the history
Kafka ingestor fix
  • Loading branch information
OlivierCazade authored Jun 17, 2022
2 parents f711714 + 7c96d50 commit cf65a90
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 2 deletions.
3 changes: 3 additions & 0 deletions pkg/pipeline/encode/encode_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ func NewEncodeKafka(params config.StageParam) (Encoder, error) {
WriteTimeout: time.Duration(writeTimeoutSecs) * time.Second,
BatchSize: jsonEncodeKafka.BatchSize,
BatchBytes: jsonEncodeKafka.BatchBytes,
// Temporary fix may be we should implement a batching systems
// https://github.com/segmentio/kafka-go/issues/326#issuecomment-519375403
BatchTimeout: time.Nanosecond,
}

return &encodeKafka{
Expand Down
15 changes: 13 additions & 2 deletions pkg/pipeline/ingest/ingest_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func (w *TransportWrapper) Send(_, data []byte) error {
message := goflowpb.FlowMessage{}
err := proto.Unmarshal(data, &message)
if err != nil {
// temporary fix
// A PR was submitted to log this error from goflow2:
// https://github.com/netsampler/goflow2/pull/86
log.Error(err)
return err
}
renderedMsg, err := RenderMessage(&message)
Expand Down Expand Up @@ -167,7 +171,7 @@ func (ingestC *ingestCollector) processLogLines(out chan<- []config.GenericMap)
case record := <-ingestC.in:
records = append(records, record)
if len(records) >= ingestC.batchMaxLength {
log.Debugf("ingestCollector sending %d entries", len(records))
log.Debugf("ingestCollector sending %d entries, %d entries waiting", len(records), len(ingestC.in))
linesProcessed.Add(float64(len(records)))
queueLength.Set(float64(len(out)))
out <- records
Expand All @@ -176,7 +180,14 @@ func (ingestC *ingestCollector) processLogLines(out chan<- []config.GenericMap)
case <-flushRecords.C:
// Process batch of records (if not empty)
if len(records) > 0 {
log.Debugf("ingestCollector sending %d entries", len(records))
if len(ingestC.in) > 0 {
for len(records) < ingestC.batchMaxLength && len(ingestC.in) > 0 {
record := <-ingestC.in
recordAsBytes, _ := json.Marshal(record)
records = append(records, string(recordAsBytes))
}
}
log.Debugf("ingestCollector sending %d entries, %d entries waiting", len(records), len(ingestC.in))
linesProcessed.Add(float64(len(records)))
queueLength.Set(float64(len(out)))
out <- records
Expand Down

0 comments on commit cf65a90

Please sign in to comment.