Skip to content

Commit

Permalink
fix duplicates
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Mar 30, 2023
1 parent ffe215e commit 015cea3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
13 changes: 8 additions & 5 deletions pkg/pipeline/extract/conntrack/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM

var outputRecords []config.GenericMap
for _, fl := range flowLogs {
if fl.IsDuplicate() {
log.Debugf("skipping duplicated flow log %v", fl)
ct.metrics.inputRecords.WithLabelValues("duplicate").Inc()
continue
}
computedHash, err := ComputeHash(fl, ct.config.KeyDefinition, ct.hashProvider())
if err != nil {
log.Warningf("skipping flow log %v: %v", fl, err)
Expand Down Expand Up @@ -164,11 +169,9 @@ func (ct *conntrackImpl) prepareHeartbeatRecords() []config.GenericMap {
}

func (ct *conntrackImpl) updateConnection(conn connection, flowLog config.GenericMap, flowLogHash totalHashType, isNew bool) {
if !flowLog.IsDuplicate() {
d := ct.getFlowLogDirection(conn, flowLogHash)
for _, agg := range ct.aggregators {
agg.update(conn, flowLog, d)
}
d := ct.getFlowLogDirection(conn, flowLogHash)
for _, agg := range ct.aggregators {
agg.update(conn, flowLog, d)
}

for _, cp := range ct.copiers {
Expand Down
22 changes: 18 additions & 4 deletions pkg/pipeline/extract/conntrack/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func TestTrack(t *testing.T) {
hashIdBA := "cc40f571f40f3111"

flAB1 := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 111, 11, false)
// duplicates should be ignored
flAB1Duplicated := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 111, 11, true)
flAB2 := newMockFlowLog(ipA, portA, ipB, portB, protocol, flowDir, 222, 22, false)
flBA3 := newMockFlowLog(ipB, portB, ipA, portA, protocol, flowDir, 333, 33, false)
Expand All @@ -121,6 +122,12 @@ func TestTrack(t *testing.T) {
inputFlowLogs []config.GenericMap
expected []config.GenericMap
}{
{
"duplicates, doesn't output connection events",
buildMockConnTrackConfig(true, []string{"newConnection", "heartbeat", "endConnection"}, heartbeatInterval, endConnectionTimeout),
[]config.GenericMap{flAB1Duplicated},
[]config.GenericMap(nil),
},
{
"bidirectional, output new connection",
buildMockConnTrackConfig(true, []string{"newConnection"}, heartbeatInterval, endConnectionTimeout),
Expand All @@ -136,7 +143,6 @@ func TestTrack(t *testing.T) {
[]config.GenericMap{
newMockRecordNewConnAB(ipA, portA, ipB, portB, protocol, 111, 0, 11, 0, 1).withHash(hashId).get(),
newMockRecordFromFlowLog(flAB1).withHash(hashId).get(),
newMockRecordFromFlowLog(flAB1Duplicated).withHash(hashIdAB).get(),
newMockRecordFromFlowLog(flAB2).withHash(hashId).get(),
newMockRecordFromFlowLog(flBA3).withHash(hashId).get(),
newMockRecordFromFlowLog(flBA4).withHash(hashId).get(),
Expand All @@ -158,7 +164,6 @@ func TestTrack(t *testing.T) {
[]config.GenericMap{
newMockRecordNewConn(ipA, portA, ipB, portB, protocol, 111, 11, 1).withHash(hashIdAB).get(),
newMockRecordFromFlowLog(flAB1).withHash(hashIdAB).get(),
newMockRecordFromFlowLog(flAB1Duplicated).withHash(hashIdAB).get(),
newMockRecordFromFlowLog(flAB2).withHash(hashIdAB).get(),
newMockRecordNewConn(ipB, portB, ipA, portA, protocol, 333, 33, 1).withHash(hashIdBA).get(),
newMockRecordFromFlowLog(flBA3).withHash(hashIdBA).get(),
Expand Down Expand Up @@ -1017,6 +1022,9 @@ func TestDetectEndConnection(t *testing.T) {
hashIdTCP := "705baa5149302fa1"
flTCP1 := newMockFlowLog(ipA, portA, ipB, portB, protocolTCP, flowDir, 111, 11, false)
flTCP2 := newMockFlowLog(ipB, portB, ipA, portA, protocolTCP, flowDir, 222, 22, false)
// duplicates should be ignored
flTCP2Duplicated := newMockFlowLog(ipB, portB, ipA, portA, protocolTCP, flowDir, 222, 22, true)

flTCP2[tcpFlagsFieldName] = FIN_ACK_FLAG

startTime := clk.Now()
Expand All @@ -1043,8 +1051,14 @@ func TestDetectEndConnection(t *testing.T) {
},
},
{
"16s: no end connection",
startTime.Add(16 * time.Second),
"10s: end connection duplicated",
startTime.Add(10 * time.Second),
[]config.GenericMap{flTCP2Duplicated},
[]config.GenericMap(nil),
},
{
"21s: no end connection",
startTime.Add(21 * time.Second),
[]config.GenericMap{},
nil,
},
Expand Down

0 comments on commit 015cea3

Please sign in to comment.