diff --git a/pkg/pipeline/extract/conntrack/conn.go b/pkg/pipeline/extract/conntrack/conn.go index 203cbb46d..e5afc3576 100644 --- a/pkg/pipeline/extract/conntrack/conn.go +++ b/pkg/pipeline/extract/conntrack/conn.go @@ -180,6 +180,9 @@ func NewConnBuilder(metrics *metricsType) *connBuilder { } func (cb *connBuilder) Hash(h totalHashType) *connBuilder { + if cb.shouldSwapAB { + h.hashA, h.hashB = h.hashB, h.hashA + } cb.conn.hash = h return cb } diff --git a/pkg/pipeline/extract/conntrack/conntrack.go b/pkg/pipeline/extract/conntrack/conntrack.go index 0a96d1120..158261539 100644 --- a/pkg/pipeline/extract/conntrack/conntrack.go +++ b/pkg/pipeline/extract/conntrack/conntrack.go @@ -75,10 +75,10 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM } else { builder := NewConnBuilder(ct.metrics) conn = builder. - Hash(computedHash). ShouldSwapAB(ct.config.TCPFlags.SwapAB && ct.shouldSwapAB(fl)). KeysFrom(fl, ct.config.KeyDefinition, ct.endpointAFields, ct.endpointBFields). Aggregators(ct.aggregators). + Hash(computedHash). Build() ct.connStore.addConnection(computedHash.hashTotal, conn) ct.connStore.updateNextHeartbeatTime(computedHash.hashTotal) diff --git a/pkg/pipeline/extract/conntrack/conntrack_test.go b/pkg/pipeline/extract/conntrack/conntrack_test.go index 3adb12344..49d118916 100644 --- a/pkg/pipeline/extract/conntrack/conntrack_test.go +++ b/pkg/pipeline/extract/conntrack/conntrack_test.go @@ -1094,7 +1094,7 @@ func TestSwapAB(t *testing.T) { startTime.Add(0 * time.Second), []config.GenericMap{flTCP1}, []config.GenericMap{ - newMockRecordNewConnAB(ipA, portA, ipB, portB, protocolTCP, 111, 0, 11, 0, 1).withHash(hashIdTCP).markFirst().get(), + newMockRecordNewConnAB(ipA, portA, ipB, portB, protocolTCP, 0, 111, 0, 11, 1).withHash(hashIdTCP).markFirst().get(), }, }, }