Skip to content

Commit

Permalink
addressed feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Apr 4, 2023
1 parent 6466008 commit 1b1cc51
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 118 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -631,9 +631,11 @@ parameters:
Proto: 17
endConnectionTimeout: 5s
heartbeatInterval: 40s
terminatingTimeout: 5s
- selector: {} # Default group
endConnectionTimeout: 10s
heartbeatInterval: 30s
terminatingTimeout: 5s
```
A possible output would look like:
Expand Down
1 change: 1 addition & 0 deletions contrib/kubernetes/flowlogs-pipeline.conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ parameters:
scheduling:
- endConnectionTimeout: 10s
heartbeatInterval: 30s
terminatingTimeout: 5s
- name: transform_network
transform:
type: network
Expand Down
1 change: 1 addition & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ Following is the supported API format for specifying connection tracking:
scheduling: list of timeouts and intervals to apply per selector
selector: key-value map to match against connection fields to apply this scheduling
endConnectionTimeout: duration of time to wait from the last flow log to end a connection
terminatingTimeout: duration of time to wait from detected FIN flag to end a connection
heartbeatInterval: duration of time to wait between heartbeat reports of a connection
maxConnectionsTracked: maximum number of connections we keep in our cache (0 means no limit)
tcpFlags: settings for handling TCP flags
Expand Down
16 changes: 4 additions & 12 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,12 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
| **Labels** | classification |


### conntrack_memory_connections_expired
| **Name** | conntrack_memory_connections_expired |
### conntrack_memory_connections
| **Name** | conntrack_memory_connections |
|:---|:---|
| **Description** | The total number of tracked expired connections in memory. |
| **Description** | The total number of tracked connections in memory per group and phase. |
| **Type** | gauge |
| **Labels** | group |


### conntrack_memory_connections_running
| **Name** | conntrack_memory_connections_running |
|:---|:---|
| **Description** | The total number of tracked running connections in memory. |
| **Type** | gauge |
| **Labels** | group |
| **Labels** | group, phase |


### conntrack_output_records
Expand Down
1 change: 1 addition & 0 deletions network_definitions/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ extract:
- selector: {}
heartbeatInterval: 30s
endConnectionTimeout: 10s
terminatingTimeout: 5s
outputRecordTypes:
- newConnection
- flowLog
Expand Down
1 change: 1 addition & 0 deletions pkg/api/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type ConnTrackOperationEnum struct {
type ConnTrackSchedulingGroup struct {
Selector map[string]interface{} `yaml:"selector,omitempty" json:"selector,omitempty" doc:"key-value map to match against connection fields to apply this scheduling"`
EndConnectionTimeout Duration `yaml:"endConnectionTimeout,omitempty" json:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"`
TerminatingTimeout Duration `yaml:"terminatingTimeout,omitempty" json:"terminatingTimeout,omitempty" doc:"duration of time to wait from detected FIN flag to end a connection"`
HeartbeatInterval Duration `yaml:"heartbeatInterval,omitempty" json:"heartbeatInterval,omitempty" doc:"duration of time to wait between heartbeat reports of a connection"`
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/pipeline/conntrack_integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ parameters:
scheduling:
- selector: {}
endConnectionTimeout: 1s
heartbeatInterval: 10s
terminatingTimeout: 5s
outputRecordTypes:
- newConnection
- flowLog
Expand Down
14 changes: 3 additions & 11 deletions pkg/pipeline/extract/conntrack/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericM
builder := NewConnBuilder(ct.metrics)
conn = builder.
Hash(computedHash).
ShouldSwapAB(ct.config.TCPFlags.SwapAB && ct.shouldSwapAB(fl)).
ShouldSwapAB(ct.config.TCPFlags.SwapAB && ct.containsTcpFlag(fl, SYN_ACK_FLAG)).
KeysFrom(fl, ct.config.KeyDefinition, ct.endpointAFields, ct.endpointBFields).
Aggregators(ct.aggregators).
Build()
Expand Down Expand Up @@ -172,22 +172,14 @@ func (ct *conntrackImpl) updateConnection(conn connection, flowLog config.Generi
agg.update(conn, flowLog, d, isNew)
}

if ct.config.TCPFlags.DetectEndConnection && ct.isLastFlowLogOfConnection(flowLog) {
if ct.config.TCPFlags.DetectEndConnection && ct.containsTcpFlag(flowLog, FIN_FLAG) {
ct.metrics.tcpFlags.WithLabelValues("detectEndConnection").Inc()
ct.connStore.expireConnection(flowLogHash.hashTotal)
ct.connStore.setConnectionTerminating(flowLogHash.hashTotal)
} else {
ct.connStore.updateConnectionExpiryTime(flowLogHash.hashTotal)
}
}

func (ct *conntrackImpl) isLastFlowLogOfConnection(flowLog config.GenericMap) bool {
return ct.containsTcpFlag(flowLog, FIN_FLAG)
}

func (ct *conntrackImpl) shouldSwapAB(flowLog config.GenericMap) bool {
return ct.containsTcpFlag(flowLog, SYN_ACK_FLAG)
}

func (ct *conntrackImpl) containsTcpFlag(flowLog config.GenericMap, queryFlag uint32) bool {
tcpFlagsRaw, ok := flowLog[ct.config.TCPFlags.FieldName]
if ok {
Expand Down
Loading

0 comments on commit 1b1cc51

Please sign in to comment.