Skip to content

Commit

Permalink
Added key to kafka message for connection tracking consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierCazade committed Apr 4, 2023
1 parent c62173a commit 915de15
Showing 1 changed file with 22 additions and 1 deletion.
23 changes: 22 additions & 1 deletion pkg/exporter/kafka_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package exporter

import (
"context"
"strconv"

"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
kafkago "github.com/segmentio/kafka-go"
Expand All @@ -28,6 +29,26 @@ func (kp *KafkaProto) ExportFlows(input <-chan []*flow.Record) {
}
}

func getIPKey(ip [16]uint8) string {
key := ""
for _, v := range ip {
key += strconv.Itoa(int(v))
}
return key
}

func getFlowKey(record *flow.Record) []byte {
key := ""
key1 := getIPKey(record.Id.SrcIp)
key2 := getIPKey(record.Id.DstIp)
if key1 > key2 {
key = key1 + key2
} else {
key = key2 + key1
}
return []byte(key)
}

func (kp *KafkaProto) batchAndSubmit(records []*flow.Record) {
klog.Debugf("sending %d records", len(records))
msgs := make([]kafkago.Message, 0, len(records))
Expand All @@ -37,7 +58,7 @@ func (kp *KafkaProto) batchAndSubmit(records []*flow.Record) {
klog.WithError(err).Debug("can't encode protobuf message. Ignoring")
continue
}
msgs = append(msgs, kafkago.Message{Value: pbBytes})
msgs = append(msgs, kafkago.Message{Value: pbBytes, Key: getFlowKey(record)})
}

if err := kp.Writer.WriteMessages(context.TODO(), msgs...); err != nil {
Expand Down

0 comments on commit 915de15

Please sign in to comment.