diff --git a/pkg/pipeline/encode/encode_kafka.go b/pkg/pipeline/encode/encode_kafka.go index 694350b20..457bdbb89 100644 --- a/pkg/pipeline/encode/encode_kafka.go +++ b/pkg/pipeline/encode/encode_kafka.go @@ -106,6 +106,9 @@ func NewEncodeKafka(params config.StageParam) (Encoder, error) { WriteTimeout: time.Duration(writeTimeoutSecs) * time.Second, BatchSize: jsonEncodeKafka.BatchSize, BatchBytes: jsonEncodeKafka.BatchBytes, + // Temporary fix may be we should implement a batching systems + // https://github.com/segmentio/kafka-go/issues/326#issuecomment-519375403 + BatchTimeout: time.Nanosecond, } return &encodeKafka{ diff --git a/pkg/pipeline/ingest/ingest_collector.go b/pkg/pipeline/ingest/ingest_collector.go index c72150777..44c8130f5 100644 --- a/pkg/pipeline/ingest/ingest_collector.go +++ b/pkg/pipeline/ingest/ingest_collector.go @@ -98,6 +98,10 @@ func (w *TransportWrapper) Send(_, data []byte) error { message := goflowpb.FlowMessage{} err := proto.Unmarshal(data, &message) if err != nil { + // temporary fix + // A PR was submitted to log this error from goflow2: + // https://github.com/netsampler/goflow2/pull/86 + log.Error(err) return err } renderedMsg, err := RenderMessage(&message) @@ -167,7 +171,7 @@ func (ingestC *ingestCollector) processLogLines(out chan<- []config.GenericMap) case record := <-ingestC.in: records = append(records, record) if len(records) >= ingestC.batchMaxLength { - log.Debugf("ingestCollector sending %d entries", len(records)) + log.Debugf("ingestCollector sending %d entries, %d entries waiting", len(records), len(ingestC.in)) linesProcessed.Add(float64(len(records))) queueLength.Set(float64(len(out))) out <- records @@ -176,7 +180,14 @@ func (ingestC *ingestCollector) processLogLines(out chan<- []config.GenericMap) case <-flushRecords.C: // Process batch of records (if not empty) if len(records) > 0 { - log.Debugf("ingestCollector sending %d entries", len(records)) + if len(ingestC.in) > 0 { + for len(records) < ingestC.batchMaxLength && len(ingestC.in) > 0 { + record := <-ingestC.in + recordAsBytes, _ := json.Marshal(record) + records = append(records, string(recordAsBytes)) + } + } + log.Debugf("ingestCollector sending %d entries, %d entries waiting", len(records), len(ingestC.in)) linesProcessed.Add(float64(len(records))) queueLength.Set(float64(len(out))) out <- records