Skip to content

Commit

Permalink
addressed feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Apr 3, 2023
1 parent ba3488b commit 34e0453
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 212 deletions.
4 changes: 2 additions & 2 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ Following is the supported API format for specifying connection tracking:
count: count
min: min
max: max
copyFirst: copyFirst
copyLast: copyLast
first: first
last: last
splitAB: When true, 2 output fields will be created. One for A->B and one for B->A flows.
input: The input field to base the operation on. When omitted, 'name' is used
scheduling: list of timeouts and intervals to apply per selector
Expand Down
14 changes: 11 additions & 3 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
| **Labels** | classification |


### conntrack_memory_connections
| **Name** | conntrack_memory_connections |
### conntrack_memory_connections_expired
| **Name** | conntrack_memory_connections_expired |
|:---|:---|
| **Description** | The total number of tracked connections in memory. |
| **Description** | The total number of tracked expired connections in memory. |
| **Type** | gauge |
| **Labels** | group |


### conntrack_memory_connections_running
| **Name** | conntrack_memory_connections_running |
|:---|:---|
| **Description** | The total number of tracked running connections in memory. |
| **Type** | gauge |
| **Labels** | group |

Expand Down
14 changes: 7 additions & 7 deletions pkg/api/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ type OutputField struct {
}

type ConnTrackOperationEnum struct {
Sum string `yaml:"sum" json:"sum" doc:"sum"`
Count string `yaml:"count" json:"count" doc:"count"`
Min string `yaml:"min" json:"min" doc:"min"`
Max string `yaml:"max" json:"max" doc:"max"`
CopyFirst string `yaml:"copyFirst" json:"copyFirst" doc:"copyFirst"`
CopyLast string `yaml:"copyLast" json:"copyLast" doc:"copyLast"`
Sum string `yaml:"sum" json:"sum" doc:"sum"`
Count string `yaml:"count" json:"count" doc:"count"`
Min string `yaml:"min" json:"min" doc:"min"`
Max string `yaml:"max" json:"max" doc:"max"`
First string `yaml:"first" json:"first" doc:"first"`
Last string `yaml:"last" json:"last" doc:"last"`
}

type ConnTrackSchedulingGroup struct {
Expand Down Expand Up @@ -259,7 +259,7 @@ func isOperationValid(value string, splitAB bool) bool {
case ConnTrackOperationName("Count"):
case ConnTrackOperationName("Min"):
case ConnTrackOperationName("Max"):
case ConnTrackOperationName("CopyFirst"), ConnTrackOperationName("CopyLast"):
case ConnTrackOperationName("First"), ConnTrackOperationName("Last"):
valid = !splitAB
default:
valid = false
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/conntrack_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestConnTrack(t *testing.T) {
}, test2.Interval(10*time.Millisecond))

// Wait a moment to make the connections expired
time.Sleep(2 * time.Second)
time.Sleep(5 * time.Second)

// Send something to the pipeline to allow the connection tracking output end connection records
in <- config.GenericMap{"DstAddr": "1.2.3.4"}
Expand Down
6 changes: 3 additions & 3 deletions pkg/pipeline/extract/conntrack/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type aggregator interface {
// addField adds an aggregate field to the connection
addField(conn connection)
// update updates the aggregate field in the connection based on the flow log.
update(conn connection, flowLog config.GenericMap, d direction, isNew bool)
update(conn connection, flowLog config.GenericMap, d direction, isFirst bool)
}

type aggregateBase struct {
Expand Down Expand Up @@ -78,10 +78,10 @@ func newAggregator(of api.OutputField) (aggregator, error) {
case api.ConnTrackOperationName("Max"):
aggBase.initVal = -math.MaxFloat64
agg = &aMax{aggBase}
case api.ConnTrackOperationName("CopyFirst"):
case api.ConnTrackOperationName("First"):
aggBase.initVal = nil
agg = &aFirst{aggBase}
case api.ConnTrackOperationName("CopyLast"):
case api.ConnTrackOperationName("Last"):
aggBase.initVal = nil
agg = &aLast{aggBase}
default:
Expand Down
135 changes: 32 additions & 103 deletions pkg/pipeline/extract/conntrack/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ func TestNewAggregator_Invalid(t *testing.T) {
Input: "Input",
})
require.NotNil(t, err)

// invalid first agg
_, err = newAggregator(api.OutputField{
Operation: "first",
SplitAB: true,
Input: "Input",
})
require.NotNil(t, err)
}

func TestNewAggregator_Valid(t *testing.T) {
Expand Down Expand Up @@ -88,6 +96,21 @@ func TestNewAggregator_Valid(t *testing.T) {
outputField: api.OutputField{Name: "MyAgg", Operation: "min"},
expected: &aMin{aggregateBase{"MyAgg", "MyAgg", false, math.MaxFloat64}},
},
{
name: "Default first",
outputField: api.OutputField{Name: "MyCp", Operation: "first"},
expected: &aFirst{aggregateBase{"MyCp", "MyCp", false, nil}},
},
{
name: "Custom input first",
outputField: api.OutputField{Name: "MyCp", Operation: "first", Input: "MyInput"},
expected: &aFirst{aggregateBase{"MyInput", "MyCp", false, nil}},
},
{
name: "Default last",
outputField: api.OutputField{Name: "MyCp", Operation: "last"},
expected: &aLast{aggregateBase{"MyCp", "MyCp", false, nil}},
},
}

for _, test := range table {
Expand All @@ -106,6 +129,8 @@ func TestAddField_and_Update(t *testing.T) {
{Name: "numFlowLogs", Operation: "count"},
{Name: "minFlowLogBytes", Operation: "min", Input: "Bytes"},
{Name: "maxFlowLogBytes", Operation: "max", Input: "Bytes"},
{Name: "FirstFlowDirection", Operation: "first", Input: "FlowDirection"},
{Name: "LastFlowDirection", Operation: "last", Input: "FlowDirection"},
}
var aggs []aggregator
for _, of := range ofs {
Expand All @@ -119,7 +144,8 @@ func TestAddField_and_Update(t *testing.T) {
portA := 1
portB := 9002
protocolA := 6
flowDir := 0
flowDirA := 0
flowDirB := 1

table := []struct {
name string
Expand All @@ -129,23 +155,23 @@ func TestAddField_and_Update(t *testing.T) {
}{
{
name: "flowLog 1",
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 100, 10, false),
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirA, 100, 10, false),
direction: dirAB,
expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(0), "Packets": float64(10), "maxFlowLogBytes": float64(100), "minFlowLogBytes": float64(100), "numFlowLogs": float64(1)},
expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(0), "Packets": float64(10), "maxFlowLogBytes": float64(100), "minFlowLogBytes": float64(100), "numFlowLogs": float64(1), "FirstFlowDirection": 0, "LastFlowDirection": 0},
},
{
name: "flowLog 2",
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDir, 200, 20, false),
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirB, 200, 20, false),
direction: dirBA,
expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(200), "Packets": float64(30), "maxFlowLogBytes": float64(200), "minFlowLogBytes": float64(100), "numFlowLogs": float64(2)},
expected: map[string]interface{}{"Bytes_AB": float64(100), "Bytes_BA": float64(200), "Packets": float64(30), "maxFlowLogBytes": float64(200), "minFlowLogBytes": float64(100), "numFlowLogs": float64(2), "FirstFlowDirection": 0, "LastFlowDirection": 1},
},
}

conn := NewConnBuilder(nil).Build()
for _, agg := range aggs {
agg.addField(conn)
}
expectedInits := map[string]interface{}{"Bytes_AB": float64(0), "Bytes_BA": float64(0), "Packets": float64(0), "maxFlowLogBytes": float64(-math.MaxFloat64), "minFlowLogBytes": float64(math.MaxFloat64), "numFlowLogs": float64(0)}
expectedInits := map[string]interface{}{"Bytes_AB": float64(0), "Bytes_BA": float64(0), "Packets": float64(0), "maxFlowLogBytes": float64(-math.MaxFloat64), "minFlowLogBytes": float64(math.MaxFloat64), "numFlowLogs": float64(0), "FirstFlowDirection": nil, "LastFlowDirection": nil}
require.Equal(t, expectedInits, conn.(*connType).aggFields)

for i, test := range table {
Expand All @@ -157,100 +183,3 @@ func TestAddField_and_Update(t *testing.T) {
})
}
}

func TestNewCopy_Invalid(t *testing.T) {
var err error

_, err = newAggregator(api.OutputField{
Operation: "copyFirst",
SplitAB: true,
Input: "Input",
})
require.NotNil(t, err)
}

func TestNewCopy_Valid(t *testing.T) {
table := []struct {
name string
outputField api.OutputField
expected aggregator
}{
{
name: "Default first",
outputField: api.OutputField{Name: "MyCp", Operation: "copyFirst"},
expected: &aFirst{aggregateBase{"MyCp", "MyCp", false, nil}},
},
{
name: "Custom input first",
outputField: api.OutputField{Name: "MyCp", Operation: "copyFirst", Input: "MyInput"},
expected: &aFirst{aggregateBase{"MyInput", "MyCp", false, nil}},
},
{
name: "Default last",
outputField: api.OutputField{Name: "MyCp", Operation: "copyLast"},
expected: &aLast{aggregateBase{"MyCp", "MyCp", false, nil}},
},
}

for _, test := range table {
t.Run(test.name, func(t *testing.T) {
c, err := newAggregator(test.outputField)
require.NoError(t, err)
require.Equal(t, test.expected, c)
})
}
}

func TestCopy_UpdateFirst(t *testing.T) {
ofs := []api.OutputField{
{Input: "FlowDirection", Name: "FirstFlowDirection", Operation: "copyFirst"},
{Input: "FlowDirection", Name: "LastFlowDirection", Operation: "copyLast"},
}
var cs []aggregator
for _, of := range ofs {
c, err := newAggregator(of)
require.NoError(t, err)
cs = append(cs, c)
}

ipA := "10.0.0.1"
ipB := "10.0.0.2"
portA := 1
portB := 9002
protocolA := 6
flowDirA := 0
flowDirB := 1

table := []struct {
name string
flowLog config.GenericMap
expected map[string]interface{}
}{
{
name: "flowLog 1",
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirA, 100, 10, false),
expected: map[string]interface{}{"FirstFlowDirection": 0, "LastFlowDirection": 0},
},
{
name: "flowLog 2",
flowLog: newMockFlowLog(ipA, portA, ipB, portB, protocolA, flowDirB, 200, 20, false),
expected: map[string]interface{}{"FirstFlowDirection": 0, "LastFlowDirection": 1},
},
}

conn := NewConnBuilder(nil).Build()
for _, agg := range cs {
agg.addField(conn)
}
expectedInits := map[string]interface{}{"FirstFlowDirection": nil, "LastFlowDirection": nil}
require.Equal(t, expectedInits, conn.(*connType).aggFields)

for i, test := range table {
t.Run(test.name, func(t *testing.T) {
for _, cp := range cs {
cp.update(conn, test.flowLog, 0, i == 0)
}
require.Equal(t, test.expected, conn.(*connType).aggFields)
})
}
}
2 changes: 1 addition & 1 deletion pkg/pipeline/extract/conntrack/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM
ct.metrics.inputRecords.WithLabelValues("rejected").Inc()
continue
}
conn, exists := ct.connStore.getConnection(computedHash.hashTotal)
conn, exists, _ := ct.connStore.getConnection(computedHash.hashTotal)
if !exists {
if (ct.config.MaxConnectionsTracked > 0) && (ct.connStore.len() >= ct.config.MaxConnectionsTracked) {
log.Warningf("too many connections; skipping flow log %v: ", fl)
Expand Down
9 changes: 5 additions & 4 deletions pkg/pipeline/extract/conntrack/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,8 +875,9 @@ func assertStoreConsistency(t *testing.T, extractor extract.Extractor) {
groupsLenSlice := make([]int, 0)
sumGroupsLen := 0
for _, g := range store.groups {
sumGroupsLen += g.mom.Len()
groupsLenSlice = append(groupsLenSlice, g.mom.Len())
sumGroupsLen += g.expiredMom.Len()
sumGroupsLen += g.runningMom.Len()
groupsLenSlice = append(groupsLenSlice, g.runningMom.Len())
}
require.Equal(t, hashLen, sumGroupsLen, "hashLen(=%v) != sum(%v)", hashLen, groupsLenSlice)
}
Expand Down Expand Up @@ -1055,8 +1056,8 @@ func TestDetectEndConnection(t *testing.T) {
[]config.GenericMap(nil),
},
{
"15s: end connection without duplicated",
startTime.Add(21 * time.Second),
"20s: end connection without duplicated",
startTime.Add(20 * time.Second),
[]config.GenericMap{},
[]config.GenericMap{
newMockRecordEndConnAB(ipA, portA, ipB, portB, protocolTCP, 111, 222, 11, 22, 2).withHash(hashIdTCP).get(),
Expand Down
31 changes: 20 additions & 11 deletions pkg/pipeline/extract/conntrack/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@ import (
)

var (
connStoreLengthDef = operational.DefineMetric(
"conntrack_memory_connections",
"The total number of tracked connections in memory.",
connStoreRunningLengthDef = operational.DefineMetric(
"conntrack_memory_connections_running",
"The total number of tracked running connections in memory.",
operational.TypeGauge,
"group",
)

connStoreExpiredLengthDef = operational.DefineMetric(
"conntrack_memory_connections_expired",
"The total number of tracked expired connections in memory.",
operational.TypeGauge,
"group",
)
Expand Down Expand Up @@ -53,17 +60,19 @@ var (
)

type metricsType struct {
connStoreLength *prometheus.GaugeVec
inputRecords *prometheus.CounterVec
outputRecords *prometheus.CounterVec
tcpFlags *prometheus.CounterVec
runningConnStoreLength *prometheus.GaugeVec
expiredConnStoreLength *prometheus.GaugeVec
inputRecords *prometheus.CounterVec
outputRecords *prometheus.CounterVec
tcpFlags *prometheus.CounterVec
}

func newMetrics(opMetrics *operational.Metrics) *metricsType {
return &metricsType{
connStoreLength: opMetrics.NewGaugeVec(&connStoreLengthDef),
inputRecords: opMetrics.NewCounterVec(&inputRecordsDef),
outputRecords: opMetrics.NewCounterVec(&outputRecordsDef),
tcpFlags: opMetrics.NewCounterVec(&tcpFlagsDef),
runningConnStoreLength: opMetrics.NewGaugeVec(&connStoreRunningLengthDef),
expiredConnStoreLength: opMetrics.NewGaugeVec(&connStoreExpiredLengthDef),
inputRecords: opMetrics.NewCounterVec(&inputRecordsDef),
outputRecords: opMetrics.NewCounterVec(&outputRecordsDef),
tcpFlags: opMetrics.NewCounterVec(&tcpFlagsDef),
}
}
Loading

0 comments on commit 34e0453

Please sign in to comment.