diff --git a/pkg/pipeline/extract/conntrack/conntrack.go b/pkg/pipeline/extract/conntrack/conntrack.go index b880065a9..344ff49fb 100644 --- a/pkg/pipeline/extract/conntrack/conntrack.go +++ b/pkg/pipeline/extract/conntrack/conntrack.go @@ -62,6 +62,11 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM var outputRecords []config.GenericMap for _, fl := range flowLogs { + if fl.IsDuplicate() { + log.Debugf("skipping duplicated flow log %v", fl) + ct.metrics.inputRecords.WithLabelValues("duplicate").Inc() + continue + } computedHash, err := ComputeHash(fl, ct.config.KeyDefinition, ct.hashProvider()) if err != nil { log.Warningf("skipping flow log %v: %v", fl, err) @@ -164,11 +169,9 @@ func (ct *conntrackImpl) prepareHeartbeatRecords() []config.GenericMap { } func (ct *conntrackImpl) updateConnection(conn connection, flowLog config.GenericMap, flowLogHash totalHashType, isNew bool) { - if !flowLog.IsDuplicate() { - d := ct.getFlowLogDirection(conn, flowLogHash) - for _, agg := range ct.aggregators { - agg.update(conn, flowLog, d) - } + d := ct.getFlowLogDirection(conn, flowLogHash) + for _, agg := range ct.aggregators { + agg.update(conn, flowLog, d) } for _, cp := range ct.copiers { diff --git a/pkg/pipeline/extract/conntrack/conntrack_test.go b/pkg/pipeline/extract/conntrack/conntrack_test.go index f44aaa255..d23310860 100644 --- a/pkg/pipeline/extract/conntrack/conntrack_test.go +++ b/pkg/pipeline/extract/conntrack/conntrack_test.go @@ -111,6 +111,7 @@ func TestTrack(t *testing.T) { hashIdBA := "cc40f571f40f3111" flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 111, 11, false) + // duplicates should be ignored flAB1Duplicated := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 111, 11, true) flAB2 := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 222, 22, false) flBA3 := newMockFlowLog(ipB, portB, ipA, portA, protocol, flowDir, 333, 33, false) @@ -121,6 +122,12 @@ func TestTrack(t *testing.T) { inputFlowLogs []config.GenericMap expected []config.GenericMap }{ + { + "duplicates, doesn't output connection events", + buildMockConnTrackConfig(true, []string{"newConnection", "heartbeat", "endConnection"}, heartbeatInterval, endConnectionTimeout), + []config.GenericMap{flAB1Duplicated}, + []config.GenericMap(nil), + }, { "bidirectional, output new connection", buildMockConnTrackConfig(true, []string{"newConnection"}, heartbeatInterval, endConnectionTimeout), @@ -136,7 +143,6 @@ func TestTrack(t *testing.T) { []config.GenericMap{ newMockRecordNewConnAB(ipA, portA, ipB, portB, protocol, 111, 0, 11, 0, 1).withHash(hashId).get(), newMockRecordFromFlowLog(flAB1).withHash(hashId).get(), - newMockRecordFromFlowLog(flAB1Duplicated).withHash(hashIdAB).get(), newMockRecordFromFlowLog(flAB2).withHash(hashId).get(), newMockRecordFromFlowLog(flBA3).withHash(hashId).get(), newMockRecordFromFlowLog(flBA4).withHash(hashId).get(), @@ -158,7 +164,6 @@ func TestTrack(t *testing.T) { []config.GenericMap{ newMockRecordNewConn(ipA, portA, ipB, portB, protocol, 111, 11, 1).withHash(hashIdAB).get(), newMockRecordFromFlowLog(flAB1).withHash(hashIdAB).get(), - newMockRecordFromFlowLog(flAB1Duplicated).withHash(hashIdAB).get(), newMockRecordFromFlowLog(flAB2).withHash(hashIdAB).get(), newMockRecordNewConn(ipB, portB, ipA, portA, protocol, 333, 33, 1).withHash(hashIdBA).get(), newMockRecordFromFlowLog(flBA3).withHash(hashIdBA).get(), @@ -1017,6 +1022,9 @@ func TestDetectEndConnection(t *testing.T) { hashIdTCP := "705baa5149302fa1" flTCP1 := newMockFlowLog(ipA, portA, ipB, portB, protocolTCP, flowDir, 111, 11, false) flTCP2 := newMockFlowLog(ipB, portB, ipA, portA, protocolTCP, flowDir, 222, 22, false) + // duplicates should be ignored + flTCP2Duplicated := newMockFlowLog(ipB, portB, ipA, portA, protocolTCP, flowDir, 222, 22, true) + flTCP2[tcpFlagsFieldName] = FIN_ACK_FLAG startTime := clk.Now() @@ -1043,8 +1051,14 @@ func TestDetectEndConnection(t *testing.T) { }, }, { - "16s: no end connection", - startTime.Add(16 * time.Second), + "10s: end connection duplicated", + startTime.Add(10 * time.Second), + []config.GenericMap{flTCP2Duplicated}, + []config.GenericMap(nil), + }, + { + "21s: no end connection", + startTime.Add(21 * time.Second), []config.GenericMap{}, nil, },