From 1b1cc51f8ecc0b1f9cdde1c1a4f934fb18a267fd Mon Sep 17 00:00:00 2001 From: Julien Pinsonneau Date: Tue, 4 Apr 2023 16:27:34 +0200 Subject: [PATCH] addressed feedback --- README.md | 2 + .../kubernetes/flowlogs-pipeline.conf.yaml | 1 + docs/api.md | 1 + docs/operational-metrics.md | 16 +- network_definitions/config.yaml | 1 + pkg/api/conntrack.go | 1 + pkg/pipeline/conntrack_integ_test.go | 2 + pkg/pipeline/extract/conntrack/conntrack.go | 14 +- .../extract/conntrack/conntrack_test.go | 167 +++++++++++++++--- pkg/pipeline/extract/conntrack/metrics.go | 33 ++-- pkg/pipeline/extract/conntrack/store.go | 108 +++++------ 11 files changed, 228 insertions(+), 118 deletions(-) diff --git a/README.md b/README.md index d330d1dfc..d618133cb 100644 --- a/README.md +++ b/README.md @@ -631,9 +631,11 @@ parameters: Proto: 17 endConnectionTimeout: 5s heartbeatInterval: 40s + terminatingTimeout: 5s - selector: {} # Default group endConnectionTimeout: 10s heartbeatInterval: 30s + terminatingTimeout: 5s ``` A possible output would look like: diff --git a/contrib/kubernetes/flowlogs-pipeline.conf.yaml b/contrib/kubernetes/flowlogs-pipeline.conf.yaml index 3db0bdeb0..9288d5d34 100644 --- a/contrib/kubernetes/flowlogs-pipeline.conf.yaml +++ b/contrib/kubernetes/flowlogs-pipeline.conf.yaml @@ -104,6 +104,7 @@ parameters: scheduling: - endConnectionTimeout: 10s heartbeatInterval: 30s + terminatingTimeout: 5s - name: transform_network transform: type: network diff --git a/docs/api.md b/docs/api.md index 6befde0bf..cff511754 100644 --- a/docs/api.md +++ b/docs/api.md @@ -232,6 +232,7 @@ Following is the supported API format for specifying connection tracking: scheduling: list of timeouts and intervals to apply per selector selector: key-value map to match against connection fields to apply this scheduling endConnectionTimeout: duration of time to wait from the last flow log to end a connection + terminatingTimeout: duration of time to wait from detected FIN flag to end a connection heartbeatInterval: duration of time to wait between heartbeat reports of a connection maxConnectionsTracked: maximum number of connections we keep in our cache (0 means no limit) tcpFlags: settings for handling TCP flags diff --git a/docs/operational-metrics.md b/docs/operational-metrics.md index 65ab3c22f..2c21a6201 100644 --- a/docs/operational-metrics.md +++ b/docs/operational-metrics.md @@ -15,20 +15,12 @@ Each table below provides documentation for an exported flowlogs-pipeline operat | **Labels** | classification | -### conntrack_memory_connections_expired -| **Name** | conntrack_memory_connections_expired | +### conntrack_memory_connections +| **Name** | conntrack_memory_connections | |:---|:---| -| **Description** | The total number of tracked expired connections in memory. | +| **Description** | The total number of tracked connections in memory per group and phase. | | **Type** | gauge | -| **Labels** | group | - - -### conntrack_memory_connections_running -| **Name** | conntrack_memory_connections_running | -|:---|:---| -| **Description** | The total number of tracked running connections in memory. | -| **Type** | gauge | -| **Labels** | group | +| **Labels** | group, phase | ### conntrack_output_records diff --git a/network_definitions/config.yaml b/network_definitions/config.yaml index 8e82d797b..9ca8a6e7b 100644 --- a/network_definitions/config.yaml +++ b/network_definitions/config.yaml @@ -41,6 +41,7 @@ extract: - selector: {} heartbeatInterval: 30s endConnectionTimeout: 10s + terminatingTimeout: 5s outputRecordTypes: - newConnection - flowLog diff --git a/pkg/api/conntrack.go b/pkg/api/conntrack.go index ea250f738..01bf30779 100644 --- a/pkg/api/conntrack.go +++ b/pkg/api/conntrack.go @@ -88,6 +88,7 @@ type ConnTrackOperationEnum struct { type ConnTrackSchedulingGroup struct { Selector map[string]interface{} `yaml:"selector,omitempty" json:"selector,omitempty" doc:"key-value map to match against connection fields to apply this scheduling"` EndConnectionTimeout Duration `yaml:"endConnectionTimeout,omitempty" json:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"` + TerminatingTimeout Duration `yaml:"terminatingTimeout,omitempty" json:"terminatingTimeout,omitempty" doc:"duration of time to wait from detected FIN flag to end a connection"` HeartbeatInterval Duration `yaml:"heartbeatInterval,omitempty" json:"heartbeatInterval,omitempty" doc:"duration of time to wait between heartbeat reports of a connection"` } diff --git a/pkg/pipeline/conntrack_integ_test.go b/pkg/pipeline/conntrack_integ_test.go index 6d26fbc62..ae1fea284 100644 --- a/pkg/pipeline/conntrack_integ_test.go +++ b/pkg/pipeline/conntrack_integ_test.go @@ -51,6 +51,8 @@ parameters: scheduling: - selector: {} endConnectionTimeout: 1s + heartbeatInterval: 10s + terminatingTimeout: 5s outputRecordTypes: - newConnection - flowLog diff --git a/pkg/pipeline/extract/conntrack/conntrack.go b/pkg/pipeline/extract/conntrack/conntrack.go index 7eb5dbfe9..b39181c4f 100644 --- a/pkg/pipeline/extract/conntrack/conntrack.go +++ b/pkg/pipeline/extract/conntrack/conntrack.go @@ -81,7 +81,7 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM builder := NewConnBuilder(ct.metrics) conn = builder. Hash(computedHash). - ShouldSwapAB(ct.config.TCPFlags.SwapAB && ct.shouldSwapAB(fl)). + ShouldSwapAB(ct.config.TCPFlags.SwapAB && ct.containsTcpFlag(fl, SYN_ACK_FLAG)). KeysFrom(fl, ct.config.KeyDefinition, ct.endpointAFields, ct.endpointBFields). Aggregators(ct.aggregators). Build() @@ -172,22 +172,14 @@ func (ct *conntrackImpl) updateConnection(conn connection, flowLog config.Generi agg.update(conn, flowLog, d, isNew) } - if ct.config.TCPFlags.DetectEndConnection && ct.isLastFlowLogOfConnection(flowLog) { + if ct.config.TCPFlags.DetectEndConnection && ct.containsTcpFlag(flowLog, FIN_FLAG) { ct.metrics.tcpFlags.WithLabelValues("detectEndConnection").Inc() - ct.connStore.expireConnection(flowLogHash.hashTotal) + ct.connStore.setConnectionTerminating(flowLogHash.hashTotal) } else { ct.connStore.updateConnectionExpiryTime(flowLogHash.hashTotal) } } -func (ct *conntrackImpl) isLastFlowLogOfConnection(flowLog config.GenericMap) bool { - return ct.containsTcpFlag(flowLog, FIN_FLAG) -} - -func (ct *conntrackImpl) shouldSwapAB(flowLog config.GenericMap) bool { - return ct.containsTcpFlag(flowLog, SYN_ACK_FLAG) -} - func (ct *conntrackImpl) containsTcpFlag(flowLog config.GenericMap, queryFlag uint32) bool { tcpFlagsRaw, ok := flowLog[ct.config.TCPFlags.FieldName] if ok { diff --git a/pkg/pipeline/extract/conntrack/conntrack_test.go b/pkg/pipeline/extract/conntrack/conntrack_test.go index af6b7c73d..bb331ec30 100644 --- a/pkg/pipeline/extract/conntrack/conntrack_test.go +++ b/pkg/pipeline/extract/conntrack/conntrack_test.go @@ -36,7 +36,7 @@ import ( var opMetrics = operational.NewMetrics(&config.MetricsSettings{}) func buildMockConnTrackConfig(isBidirectional bool, outputRecordType []string, - heartbeatInterval, endConnectionTimeout time.Duration) *config.StageParam { + heartbeatInterval, endConnectionTimeout time.Duration, terminatingTimeout time.Duration) *config.StageParam { splitAB := isBidirectional var hash api.ConnTrackHash if isBidirectional { @@ -90,6 +90,7 @@ func buildMockConnTrackConfig(isBidirectional bool, outputRecordType []string, Selector: map[string]interface{}{}, HeartbeatInterval: api.Duration{Duration: heartbeatInterval}, EndConnectionTimeout: api.Duration{Duration: endConnectionTimeout}, + TerminatingTimeout: api.Duration{Duration: terminatingTimeout}, }, }, }, // end of api.ConnTrack @@ -100,6 +101,8 @@ func buildMockConnTrackConfig(isBidirectional bool, outputRecordType []string, func TestTrack(t *testing.T) { heartbeatInterval := 10 * time.Second endConnectionTimeout := 30 * time.Second + terminatingTimeout := 5 * time.Second + ipA := "10.0.0.1" ipB := "10.0.0.2" portA := 9001 @@ -124,13 +127,15 @@ func TestTrack(t *testing.T) { }{ { "duplicates, doesn't output connection events", - buildMockConnTrackConfig(true, []string{"newConnection", "heartbeat", "endConnection"}, heartbeatInterval, endConnectionTimeout), + buildMockConnTrackConfig(true, []string{"newConnection", "heartbeat", "endConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout), []config.GenericMap{flAB1Duplicated}, []config.GenericMap(nil), }, { "bidirectional, output new connection", - buildMockConnTrackConfig(true, []string{"newConnection"}, heartbeatInterval, endConnectionTimeout), + buildMockConnTrackConfig(true, []string{"newConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout), []config.GenericMap{flAB1, flAB1Duplicated, flAB2, flBA3, flBA4}, []config.GenericMap{ newMockRecordNewConnAB(ipA, portA, ipB, portB, protocol, 111, 0, 11, 0, 1).withHash(hashId).get(), @@ -138,7 +143,8 @@ func TestTrack(t *testing.T) { }, { "bidirectional, output new connection and flow log", - buildMockConnTrackConfig(true, []string{"newConnection", "flowLog"}, heartbeatInterval, endConnectionTimeout), + buildMockConnTrackConfig(true, []string{"newConnection", "flowLog"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout), []config.GenericMap{flAB1, flAB1Duplicated, flAB2, flBA3, flBA4}, []config.GenericMap{ newMockRecordNewConnAB(ipA, portA, ipB, portB, protocol, 111, 0, 11, 0, 1).withHash(hashId).get(), @@ -150,7 +156,8 @@ func TestTrack(t *testing.T) { }, { "unidirectional, output new connection", - buildMockConnTrackConfig(false, []string{"newConnection"}, heartbeatInterval, endConnectionTimeout), + buildMockConnTrackConfig(false, []string{"newConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout), []config.GenericMap{flAB1, flAB1Duplicated, flAB2, flBA3, flBA4}, []config.GenericMap{ newMockRecordNewConn(ipA, portA, ipB, portB, protocol, 111, 11, 1).withHash(hashIdAB).get(), @@ -159,7 +166,8 @@ func TestTrack(t *testing.T) { }, { "unidirectional, output new connection and flow log", - buildMockConnTrackConfig(false, []string{"newConnection", "flowLog"}, heartbeatInterval, endConnectionTimeout), + buildMockConnTrackConfig(false, []string{"newConnection", "flowLog"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout), []config.GenericMap{flAB1, flAB1Duplicated, flAB2, flBA3, flBA4}, []config.GenericMap{ newMockRecordNewConn(ipA, portA, ipB, portB, protocol, 111, 11, 1).withHash(hashIdAB).get(), @@ -193,7 +201,10 @@ func TestEndConn_Bidirectional(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 10 * time.Second endConnectionTimeout := 30 * time.Second - conf := buildMockConnTrackConfig(true, []string{"newConnection", "flowLog", "endConnection"}, heartbeatInterval, endConnectionTimeout) + terminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(true, []string{"newConnection", "flowLog", "endConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout) ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -280,7 +291,10 @@ func TestEndConn_Unidirectional(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 10 * time.Second endConnectionTimeout := 30 * time.Second - conf := buildMockConnTrackConfig(false, []string{"newConnection", "flowLog", "endConnection"}, heartbeatInterval, endConnectionTimeout) + terminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(false, []string{"newConnection", "flowLog", "endConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout) ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -383,7 +397,10 @@ func TestHeartbeat_Unidirectional(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 10 * time.Second endConnectionTimeout := 30 * time.Second - conf := buildMockConnTrackConfig(false, []string{"newConnection", "flowLog", "heartbeat", "endConnection"}, heartbeatInterval, endConnectionTimeout) + terminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(false, []string{"newConnection", "flowLog", "heartbeat", "endConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout) ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -532,7 +549,10 @@ func TestIsFirst_LongConnection(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 10 * time.Second endConnectionTimeout := 30 * time.Second - conf := buildMockConnTrackConfig(false, []string{"heartbeat", "endConnection"}, heartbeatInterval, endConnectionTimeout) + terminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(false, []string{"heartbeat", "endConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout) ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -621,8 +641,10 @@ func TestIsFirst_ShortConnection(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 10 * time.Second endConnectionTimeout := 5 * time.Second + terminatingTimeout := 5 * time.Second + conf := buildMockConnTrackConfig(false, []string{"heartbeat", "endConnection"}, - heartbeatInterval, endConnectionTimeout) + heartbeatInterval, endConnectionTimeout, terminatingTimeout) ct, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -685,7 +707,10 @@ func TestPrepareUpdateConnectionRecords(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 10 * time.Second endConnectionTimeout := 30 * time.Second - conf := buildMockConnTrackConfig(false, []string{"heartbeat"}, heartbeatInterval, endConnectionTimeout) + terminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(false, []string{"heartbeat"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout) interval := 10 * time.Second extract, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -744,8 +769,10 @@ func TestScheduling(t *testing.T) { clk := clock.NewMock() defaultHeartbeatInterval := 20 * time.Second defaultEndConnectionTimeout := 15 * time.Second + defaultTerminatingTimeout := 5 * time.Second + conf := buildMockConnTrackConfig(true, []string{"heartbeat", "endConnection"}, - defaultHeartbeatInterval, defaultEndConnectionTimeout) + defaultHeartbeatInterval, defaultEndConnectionTimeout, defaultTerminatingTimeout) // Insert a scheduling group before the default group. // https://github.com/golang/go/wiki/SliceTricks#push-frontunshift conf.Extract.ConnTrack.Scheduling = append( @@ -754,6 +781,7 @@ func TestScheduling(t *testing.T) { Selector: map[string]interface{}{"Proto": 1}, // ICMP HeartbeatInterval: api.Duration{Duration: 30 * time.Second}, EndConnectionTimeout: api.Duration{Duration: 20 * time.Second}, + TerminatingTimeout: api.Duration{Duration: 10 * time.Second}, }, }, conf.Extract.ConnTrack.Scheduling...) @@ -875,9 +903,9 @@ func assertStoreConsistency(t *testing.T, extractor extract.Extractor) { groupsLenSlice := make([]int, 0) sumGroupsLen := 0 for _, g := range store.groups { - sumGroupsLen += g.expiredMom.Len() - sumGroupsLen += g.runningMom.Len() - groupsLenSlice = append(groupsLenSlice, g.runningMom.Len()) + sumGroupsLen += g.terminatingMom.Len() + sumGroupsLen += g.activeMom.Len() + groupsLenSlice = append(groupsLenSlice, g.terminatingMom.Len()+g.activeMom.Len()) } require.Equal(t, hashLen, sumGroupsLen, "hashLen(=%v) != sum(%v)", hashLen, groupsLenSlice) } @@ -906,7 +934,10 @@ func TestMaxConnections(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 10 * time.Second endConnectionTimeout := 30 * time.Second - conf := buildMockConnTrackConfig(true, []string{"newConnection", "flowLog", "endConnection"}, heartbeatInterval, endConnectionTimeout) + terminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(true, []string{"newConnection", "flowLog", "endConnection"}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout) conf.Extract.ConnTrack.MaxConnectionsTracked = maxConnections extract, err := NewConnectionTrack(opMetrics, *conf, clk) require.NoError(t, err) @@ -932,7 +963,10 @@ func TestIsLastFlowLogOfConnection(t *testing.T) { clk := clock.NewMock() heartbeatInterval := 30 * time.Second endConnectionTimeout := 10 * time.Second - conf := buildMockConnTrackConfig(true, []string{}, heartbeatInterval, endConnectionTimeout) + terminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(true, []string{}, + heartbeatInterval, endConnectionTimeout, terminatingTimeout) tcpFlagsFieldName := "TCPFlags" conf.Extract.ConnTrack.TCPFlags = api.ConnTrackTCPFlags{ FieldName: tcpFlagsFieldName, @@ -1004,8 +1038,10 @@ func TestDetectEndConnection(t *testing.T) { clk := clock.NewMock() defaultUpdateConnectionInterval := 30 * time.Second defaultEndConnectionTimeout := 10 * time.Second + defaultTerminatingTimeout := 5 * time.Second + conf := buildMockConnTrackConfig(true, []string{"newConnection", "endConnection"}, - defaultUpdateConnectionInterval, defaultEndConnectionTimeout) + defaultUpdateConnectionInterval, defaultEndConnectionTimeout, defaultTerminatingTimeout) tcpFlagsFieldName := "TCPFlags" conf.Extract.ConnTrack.TCPFlags = api.ConnTrackTCPFlags{ FieldName: tcpFlagsFieldName, @@ -1078,6 +1114,7 @@ func TestDetectEndConnection(t *testing.T) { } exposed := test.ReadExposedMetrics(t) require.Contains(t, exposed, `conntrack_tcp_flags{action="detectEndConnection"} 1`) + require.Contains(t, exposed, `conntrack_input_records{classification="duplicate"} 1`) } func TestSwapAB(t *testing.T) { @@ -1085,8 +1122,10 @@ func TestSwapAB(t *testing.T) { clk := clock.NewMock() defaultUpdateConnectionInterval := 30 * time.Second defaultEndConnectionTimeout := 10 * time.Second + defaultTerminatingTimeout := 5 * time.Second + conf := buildMockConnTrackConfig(true, []string{"newConnection", "endConnection"}, - defaultUpdateConnectionInterval, defaultEndConnectionTimeout) + defaultUpdateConnectionInterval, defaultEndConnectionTimeout, defaultTerminatingTimeout) tcpFlagsFieldName := "TCPFlags" conf.Extract.ConnTrack.TCPFlags = api.ConnTrackTCPFlags{ FieldName: tcpFlagsFieldName, @@ -1136,3 +1175,91 @@ func TestSwapAB(t *testing.T) { exposed := test.ReadExposedMetrics(t) require.Contains(t, exposed, `conntrack_tcp_flags{action="swapAB"} 1`) } + +func TestExpiringConnection(t *testing.T) { + test.ResetPromRegistry() + clk := clock.NewMock() + defaultUpdateConnectionInterval := 30 * time.Second + defaultEndConnectionTimeout := 10 * time.Second + defaultTerminatingTimeout := 5 * time.Second + + conf := buildMockConnTrackConfig(true, []string{"newConnection", "endConnection"}, + defaultUpdateConnectionInterval, defaultEndConnectionTimeout, defaultTerminatingTimeout) + tcpFlagsFieldName := "TCPFlags" + conf.Extract.ConnTrack.TCPFlags = api.ConnTrackTCPFlags{ + FieldName: tcpFlagsFieldName, + DetectEndConnection: true, + } + ct, err := NewConnectionTrack(opMetrics, *conf, clk) + require.NoError(t, err) + + ipA := "10.0.0.1" + ipB := "10.0.0.2" + portA := 9001 + portB := 9002 + protocolTCP := 6 + flowDir := 0 + hashIdTCP := "705baa5149302fa1" + flTCP1 := newMockFlowLog(ipA, portA, ipB, portB, protocolTCP, flowDir, 111, 11, false) + flTCP2 := newMockFlowLog(ipB, portB, ipA, portA, protocolTCP, flowDir, 222, 22, false) + flTCP2[tcpFlagsFieldName] = FIN_FLAG + flTCP3 := newMockFlowLog(ipA, portA, ipB, portB, protocolTCP, flowDir, 333, 33, false) + flTCP4 := newMockFlowLog(ipA, portA, ipB, portB, protocolTCP, flowDir, 555, 55, false) + + startTime := clk.Now() + table := []struct { + name string + time time.Time + inputFlowLogs []config.GenericMap + expected []config.GenericMap + }{ + { + "start: new connection", + startTime.Add(0 * time.Second), + []config.GenericMap{flTCP1}, + []config.GenericMap{ + newMockRecordNewConnAB(ipA, portA, ipB, portB, protocolTCP, 111, 0, 11, 0, 1).withHash(hashIdTCP).markFirst().get(), + }, + }, + { + "5s: FIN flow log to end the connection", + startTime.Add(5 * time.Second), + []config.GenericMap{flTCP2}, + []config.GenericMap(nil), + }, + { + "10s: flow log after FIN is still part of the connection", + startTime.Add(10 * time.Second), + []config.GenericMap{flTCP3}, + []config.GenericMap(nil), + }, + { + "16s: end connection. The after-FIN-flow-log doesn't extend the connection's life", + startTime.Add(16 * time.Second), + []config.GenericMap{}, + []config.GenericMap{ + newMockRecordEndConnAB(ipA, portA, ipB, portB, protocolTCP, 444, 222, 44, 22, 3).withHash(hashIdTCP).get(), + }, + }, + { + "17s: another flow log will create a new connection", + startTime.Add(17 * time.Second), + []config.GenericMap{flTCP4}, + []config.GenericMap{ + newMockRecordNewConnAB(ipA, portA, ipB, portB, protocolTCP, 555, 0, 55, 0, 1).withHash(hashIdTCP).markFirst().get(), + }, + }, + } + + var prevTime time.Time + for _, tt := range table { + t.Run(tt.name, func(t *testing.T) { + require.Less(t, prevTime, tt.time) + prevTime = tt.time + clk.Set(tt.time) + actual := ct.Extract(tt.inputFlowLogs) + require.Equal(t, tt.expected, actual) + assertStoreConsistency(t, ct) + }) + } +} diff --git a/pkg/pipeline/extract/conntrack/metrics.go b/pkg/pipeline/extract/conntrack/metrics.go index cf5da280a..d91d1cf78 100644 --- a/pkg/pipeline/extract/conntrack/metrics.go +++ b/pkg/pipeline/extract/conntrack/metrics.go @@ -23,18 +23,11 @@ import ( ) var ( - connStoreRunningLengthDef = operational.DefineMetric( - "conntrack_memory_connections_running", - "The total number of tracked running connections in memory.", + connStoreLengthDef = operational.DefineMetric( + "conntrack_memory_connections", + "The total number of tracked connections in memory per group and phase.", operational.TypeGauge, - "group", - ) - - connStoreExpiredLengthDef = operational.DefineMetric( - "conntrack_memory_connections_expired", - "The total number of tracked expired connections in memory.", - operational.TypeGauge, - "group", + "group", "phase", ) inputRecordsDef = operational.DefineMetric( @@ -60,19 +53,17 @@ var ( ) type metricsType struct { - runningConnStoreLength *prometheus.GaugeVec - expiredConnStoreLength *prometheus.GaugeVec - inputRecords *prometheus.CounterVec - outputRecords *prometheus.CounterVec - tcpFlags *prometheus.CounterVec + connStoreLength *prometheus.GaugeVec + inputRecords *prometheus.CounterVec + outputRecords *prometheus.CounterVec + tcpFlags *prometheus.CounterVec } func newMetrics(opMetrics *operational.Metrics) *metricsType { return &metricsType{ - runningConnStoreLength: opMetrics.NewGaugeVec(&connStoreRunningLengthDef), - expiredConnStoreLength: opMetrics.NewGaugeVec(&connStoreExpiredLengthDef), - inputRecords: opMetrics.NewCounterVec(&inputRecordsDef), - outputRecords: opMetrics.NewCounterVec(&outputRecordsDef), - tcpFlags: opMetrics.NewCounterVec(&tcpFlagsDef), + connStoreLength: opMetrics.NewGaugeVec(&connStoreLengthDef), + inputRecords: opMetrics.NewCounterVec(&inputRecordsDef), + outputRecords: opMetrics.NewCounterVec(&outputRecordsDef), + tcpFlags: opMetrics.NewCounterVec(&tcpFlagsDef), } } diff --git a/pkg/pipeline/extract/conntrack/store.go b/pkg/pipeline/extract/conntrack/store.go index c5b535e0f..8d842543e 100644 --- a/pkg/pipeline/extract/conntrack/store.go +++ b/pkg/pipeline/extract/conntrack/store.go @@ -31,6 +31,8 @@ import ( const ( expiryOrder = utils.OrderID("expiryOrder") nextHeartbeatTimeOrder = utils.OrderID("nextHeartbeatTimeOrder") + activeLabel = "active" + terminatingLabel = "terminating" ) // connectionStore provides means to manage the connections such as retrieving a connection by its hash and organizing @@ -45,12 +47,12 @@ type connectionStore struct { type groupType struct { scheduling api.ConnTrackSchedulingGroup - // running connections - runningMom *utils.MultiOrderedMap - // connections that detected EndConnection from TCP flag. These will not trigger updates anymore until pop + // active connections + activeMom *utils.MultiOrderedMap + // connections that detected EndConnection from TCP FIN flag. These will not trigger updates anymore until pop // check expireConnection func - expiredMom *utils.MultiOrderedMap - labelValue string + terminatingMom *utils.MultiOrderedMap + labelValue string } func (cs *connectionStore) getGroupIdx(conn connection) (groupIdx int) { @@ -68,7 +70,7 @@ func (cs *connectionStore) getGroupIdx(conn connection) (groupIdx int) { func (cs *connectionStore) addConnection(hashId uint64, conn connection) { groupIdx := cs.getGroupIdx(conn) - mom := cs.groups[groupIdx].runningMom + mom := cs.groups[groupIdx].activeMom err := mom.AddRecord(utils.Key(hashId), conn) if err != nil { @@ -77,8 +79,8 @@ func (cs *connectionStore) addConnection(hashId uint64, conn connection) { cs.hashId2groupIdx[hashId] = groupIdx groupLabel := cs.groups[groupIdx].labelValue - groupLen := cs.groups[groupIdx].runningMom.Len() - cs.metrics.runningConnStoreLength.WithLabelValues(groupLabel).Set(float64(groupLen)) + activeLen := cs.groups[groupIdx].activeMom.Len() + cs.metrics.connStoreLength.WithLabelValues(groupLabel, activeLabel).Set(float64(activeLen)) } func (cs *connectionStore) getConnection(hashId uint64) (connection, bool, bool) { @@ -86,15 +88,15 @@ func (cs *connectionStore) getConnection(hashId uint64) (connection, bool, bool) if !found { return nil, false, false } - mom := cs.groups[groupIdx].runningMom + mom := cs.groups[groupIdx].activeMom - // get connection from running map + // get connection from active map isRunning := true record, ok := mom.GetRecord(utils.Key(hashId)) if !ok { - // fallback on expired map if not found + // fallback on terminating map if not found isRunning = false - mom := cs.groups[groupIdx].expiredMom + mom := cs.groups[groupIdx].terminatingMom record, ok = mom.GetRecord(utils.Key(hashId)) if !ok { return nil, false, false @@ -104,50 +106,46 @@ func (cs *connectionStore) getConnection(hashId uint64) (connection, bool, bool) return conn, true, isRunning } -func (cs *connectionStore) expireConnection(hashId uint64) { - conn, ok, running := cs.getConnection(hashId) +func (cs *connectionStore) setConnectionTerminating(hashId uint64) { + conn, ok, active := cs.getConnection(hashId) if !ok { log.Panicf("BUG. connection hash %x doesn't exist", hashId) return - } else if !running { - // connection is already expired + } else if !active { + // connection is terminating return } groupIdx := cs.hashId2groupIdx[hashId] groupLabel := cs.groups[groupIdx].labelValue - runningMom := cs.groups[groupIdx].runningMom - expiredMom := cs.groups[groupIdx].expiredMom - - // Set the expiry time to half of EndConnectionTimeout - timeoutMs := cs.groups[groupIdx].scheduling.EndConnectionTimeout.Duration.Milliseconds() - updatedExpiry := cs.now().Add(time.Duration(timeoutMs/2) * time.Millisecond) - conn.setExpiryTime(updatedExpiry) - - // Remove expired connection from running map - runningMom.RemoveRecord(utils.Key(hashId)) - runningLen := cs.groups[groupIdx].runningMom.Len() - cs.metrics.runningConnStoreLength.WithLabelValues(groupLabel).Set(float64(runningLen)) - - // Add expired connection to expired map - err := expiredMom.AddRecord(utils.Key(hashId), conn) + activeMom := cs.groups[groupIdx].activeMom + terminatingMom := cs.groups[groupIdx].terminatingMom + timeout := cs.groups[groupIdx].scheduling.TerminatingTimeout.Duration + newExpiryTime := cs.now().Add(timeout) + conn.setExpiryTime(newExpiryTime) + // Remove connection from active map + activeMom.RemoveRecord(utils.Key(hashId)) + activeLen := cs.groups[groupIdx].activeMom.Len() + cs.metrics.connStoreLength.WithLabelValues(groupLabel, activeLabel).Set(float64(activeLen)) + // Add connection to terminating map + err := terminatingMom.AddRecord(utils.Key(hashId), conn) if err != nil { log.Errorf("BUG. connection with hash %x already exists in store. %v", hashId, conn) } - expiredLen := cs.groups[groupIdx].expiredMom.Len() - cs.metrics.expiredConnStoreLength.WithLabelValues(groupLabel).Set(float64(expiredLen)) + terminatingLen := cs.groups[groupIdx].terminatingMom.Len() + cs.metrics.connStoreLength.WithLabelValues(groupLabel, terminatingLabel).Set(float64(terminatingLen)) } func (cs *connectionStore) updateConnectionExpiryTime(hashId uint64) { - conn, ok, running := cs.getConnection(hashId) + conn, ok, active := cs.getConnection(hashId) if !ok { log.Panicf("BUG. connection hash %x doesn't exist", hashId) return - } else if !running { - // connection is expired. expiry time can't be updated anymore + } else if !active { + // connection is terminating. expiry time can't be updated anymore return } groupIdx := cs.hashId2groupIdx[hashId] - mom := cs.groups[groupIdx].runningMom + mom := cs.groups[groupIdx].activeMom timeout := cs.groups[groupIdx].scheduling.EndConnectionTimeout.Duration newExpiryTime := cs.now().Add(timeout) conn.setExpiryTime(newExpiryTime) @@ -160,16 +158,16 @@ func (cs *connectionStore) updateConnectionExpiryTime(hashId uint64) { } func (cs *connectionStore) updateNextHeartbeatTime(hashId uint64) { - conn, ok, running := cs.getConnection(hashId) + conn, ok, active := cs.getConnection(hashId) if !ok { log.Panicf("BUG. connection hash %x doesn't exist", hashId) return - } else if !running { - // connection is expired. heartbeat are disabled + } else if !active { + // connection is terminating. heartbeat are disabled return } groupIdx := cs.hashId2groupIdx[hashId] - mom := cs.groups[groupIdx].runningMom + mom := cs.groups[groupIdx].activeMom timeout := cs.groups[groupIdx].scheduling.HeartbeatInterval.Duration newNextHeartbeatTime := cs.now().Add(timeout) conn.setNextHeartbeatTime(newNextHeartbeatTime) @@ -200,12 +198,14 @@ func (cs *connectionStore) popEndConnectionOfMap(mom *utils.MultiOrderedMap, gro }) groupLabel := group.labelValue momLen := mom.Len() + var phaseLabel string switch mom { - case group.runningMom: - cs.metrics.runningConnStoreLength.WithLabelValues(groupLabel).Set(float64(momLen)) - case group.expiredMom: - cs.metrics.expiredConnStoreLength.WithLabelValues(groupLabel).Set(float64(momLen)) + case group.activeMom: + phaseLabel = activeLabel + case group.terminatingMom: + phaseLabel = terminatingLabel } + cs.metrics.connStoreLength.WithLabelValues(groupLabel, phaseLabel).Set(float64(momLen)) return poppedConnections } @@ -215,10 +215,10 @@ func (cs *connectionStore) popEndConnections() []connection { // In each scheduling group iterate over them by their expiry time from old to new. var poppedConnections []connection for _, group := range cs.groups { - // Pop expired connection first - poppedConnections = append(poppedConnections, cs.popEndConnectionOfMap(group.expiredMom, group)...) - // Pop running connections that expired without TCP flag - poppedConnections = append(poppedConnections, cs.popEndConnectionOfMap(group.runningMom, group)...) + // Pop terminating connections first + poppedConnections = append(poppedConnections, cs.popEndConnectionOfMap(group.terminatingMom, group)...) + // Pop active connections that expired without TCP flag + poppedConnections = append(poppedConnections, cs.popEndConnectionOfMap(group.activeMom, group)...) } return poppedConnections } @@ -228,7 +228,7 @@ func (cs *connectionStore) prepareHeartbeats() []connection { // Iterate over the connections by scheduling groups. // In each scheduling group iterate over them by their next heartbeat time from old to new. for _, group := range cs.groups { - group.runningMom.IterateFrontToBack(nextHeartbeatTimeOrder, func(r utils.Record) (shouldDelete, shouldStop bool) { + group.activeMom.IterateFrontToBack(nextHeartbeatTimeOrder, func(r utils.Record) (shouldDelete, shouldStop bool) { conn := r.(connection) nextHeartbeat := conn.getNextHeartbeatTime() needToReport := cs.now().After(nextHeartbeat) @@ -272,10 +272,10 @@ func newConnectionStore(scheduling []api.ConnTrackSchedulingGroup, metrics *metr groups := make([]*groupType, len(scheduling)) for groupIdx, sg := range scheduling { groups[groupIdx] = &groupType{ - scheduling: sg, - runningMom: utils.NewMultiOrderedMap(expiryOrder, nextHeartbeatTimeOrder), - expiredMom: utils.NewMultiOrderedMap(expiryOrder, nextHeartbeatTimeOrder), - labelValue: schedulingGroupToLabelValue(groupIdx, sg), + scheduling: sg, + activeMom: utils.NewMultiOrderedMap(expiryOrder, nextHeartbeatTimeOrder), + terminatingMom: utils.NewMultiOrderedMap(expiryOrder, nextHeartbeatTimeOrder), + labelValue: schedulingGroupToLabelValue(groupIdx, sg), } }