Skip to content

Commit

Permalink
NETOBSERV-1275: Introduce new "INNER" direction for inner-node traffic (
Browse files Browse the repository at this point in the history
#378)

* Introduce new "INNER" direction for inner-node traffic

Related PR: netobserv/flowlogs-pipeline#483

The flows (and duplicates) generated for inner-node traffic differs compared to node-to-node traffic, and reinterpret direction isn't able to decide between ingress or egress. This is causing discrepancies with the dedup mechanism that filters out flows where Duplicate=true and also favors ingress over egress, since the Duplicate flag can be set randomly on ingress or on egress.

To fix that, the proposed solution is to create this new INNER direction specifically for this kind of traffic. Deduping this INNER traffic can then rely solely on the Duplicate flag, since that flag was set from a single Agent (single node) there will always be only one Duplicate=false.

* fix fmt

* Fix test

* Update texts & API for doc

* fix linter
  • Loading branch information
jotak authored Sep 8, 2023
1 parent 2ec3c5d commit bffb860
Show file tree
Hide file tree
Showing 12 changed files with 48 additions and 24 deletions.
4 changes: 2 additions & 2 deletions pkg/model/filters/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ func SplitForReportersMerge(q SingleQuery) (SingleQuery, SingleQuery) {
// (Note that we use DstOwnerName both as an optimization as it's a Loki index,
// and as convenience because looking for empty fields won't work if they aren't indexed)
q1 := SingleQuery{
NewMatch(fields.FlowDirection, `"`+constants.Ingress+`"`),
NewMatch(fields.FlowDirection, `"`+string(constants.Ingress)+`","`+string(constants.Inner)+`"`),
}
q2 := SingleQuery{
NewMatch(fields.FlowDirection, `"`+constants.Egress+`"`),
NewMatch(fields.FlowDirection, `"`+string(constants.Egress)+`"`),
NewMatch(fields.DstOwnerName, `""`),
}
for _, m := range q {
Expand Down
8 changes: 4 additions & 4 deletions pkg/model/filters/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ func TestParseCommon(t *testing.T) {
}

func TestSplitForReportersMerge_NoSplit(t *testing.T) {
q1, q2 := SplitForReportersMerge(SingleQuery{NewMatch("srcns", "a"), NewMatch("FlowDirection", constants.Ingress)})
q1, q2 := SplitForReportersMerge(SingleQuery{NewMatch("srcns", "a"), NewMatch("FlowDirection", string(constants.Ingress))})
assert.Nil(t, q2)
assert.Len(t, q1, 2)
assert.Equal(t, SingleQuery{
NewMatch("srcns", "a"),
NewMatch("FlowDirection", constants.Ingress),
NewMatch("FlowDirection", string(constants.Ingress)),
}, q1)
}

Expand All @@ -92,13 +92,13 @@ func TestSplitForReportersMerge(t *testing.T) {

assert.Len(t, q1, 3)
assert.Equal(t, SingleQuery{
NewMatch("FlowDirection", `"`+constants.Ingress+`"`),
NewMatch("FlowDirection", `"`+string(constants.Ingress)+`","`+string(constants.Inner)+`"`),
NewMatch("srcns", "a"),
NewMatch("dstns", "b"),
}, q1)
assert.Len(t, q2, 4)
assert.Equal(t, SingleQuery{
NewMatch("FlowDirection", `"`+constants.Egress+`"`),
NewMatch("FlowDirection", `"`+string(constants.Egress)+`"`),
NewMatch("DstK8S_OwnerName", `""`),
NewMatch("srcns", "a"),
NewMatch("dstns", "b"),
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func TestLokiConfigurationForTopology(t *testing.T) {
req2 := lokiMock.Calls[1].Arguments[1].(*http.Request)
queries := []string{req1.URL.Query().Get("query"), req2.URL.Query().Get("query")}
expected := []string{
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName) (rate({app="netobserv-flowcollector",FlowDirection="0"}|~` + "`" + `Duplicate":false` + "`" + `|json|unwrap Bytes|__error__=""[1m])))`,
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName) (rate({app="netobserv-flowcollector",FlowDirection=~"^0$|^2$"}|~` + "`" + `Duplicate":false` + "`" + `|json|unwrap Bytes|__error__=""[1m])))`,
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName) (rate({app="netobserv-flowcollector",FlowDirection="1",DstK8S_OwnerName=""}|~` + "`" + `Duplicate":false` + "`" + `|json|unwrap Bytes|__error__=""[1m])))`,
}
// We don't predict the order so sort both actual and expected
Expand Down
6 changes: 4 additions & 2 deletions pkg/utils/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package constants
type MetricType string
type RecordType string
type PacketLoss string
type Direction string

const (
AppLabel = "app"
Expand All @@ -28,8 +29,9 @@ const (
PacketLossSent PacketLoss = "sent"
PacketLossAll PacketLoss = "all"
DefaultPacketLoss PacketLoss = PacketLossAll
Ingress = "0"
Egress = "1"
Ingress Direction = "0"
Egress Direction = "1"
Inner Direction = "2"
)

var AnyConnectionType = []string{
Expand Down
7 changes: 4 additions & 3 deletions web/locales/en/plugin__netobserv-plugin.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"Log type": "Log type",
"Only available when FlowCollector.processor.logTypes option equals \"CONNECTIONS\", \"ENDED_CONNECTIONS\" or \"ALL\"": "Only available when FlowCollector.processor.logTypes option equals \"CONNECTIONS\", \"ENDED_CONNECTIONS\" or \"ALL\"",
"Only available when FlowCollector.processor.logTypes option equals \"FLOWS\" or \"ALL\"": "Only available when FlowCollector.processor.logTypes option equals \"FLOWS\" or \"ALL\"",
"A flow might be reported from several interfaces, and from both source and destination nodes, making it appear several times. By default, duplicates are hidden. Showing duplicates is not possible in Overview and Topology tabs to avoid altering metric calculations. Use the Direction filter to switch between ingress / egress traffic.": "A flow might be reported from several interfaces, and from both source and destination nodes, making it appear several times. By default, duplicates are hidden. Showing duplicates is not possible in Overview and Topology tabs to avoid altering metric calculations. Use the Direction filter to switch between ingress / egress traffic.",
"A flow might be reported from several interfaces, and from both source and destination nodes, making it appear several times. By default, duplicates are hidden. Showing duplicates is not possible in Overview and Topology tabs to avoid altering metric calculations. Use the Direction filter to switch between ingress, egress and inner-node traffic.": "A flow might be reported from several interfaces, and from both source and destination nodes, making it appear several times. By default, duplicates are hidden. Showing duplicates is not possible in Overview and Topology tabs to avoid altering metric calculations. Use the Direction filter to switch between ingress, egress and inner-node traffic.",
"Duplicated flows": "Duplicated flows",
"Show duplicates": "Show duplicates",
"Whether each query result has to match all the filters or just any of them": "Whether each query result has to match all the filters or just any of them",
Expand Down Expand Up @@ -230,6 +230,7 @@
"reporting": "reporting",
"Ingress": "Ingress",
"Egress": "Egress",
"Inner": "Inner",
"dropped": "dropped",
"dropped by": "dropped by",
"sent": "sent",
Expand Down Expand Up @@ -369,7 +370,7 @@
"Protocol": "Protocol",
"The value of the protocol number in the IP packet header": "The value of the protocol number in the IP packet header",
"Direction": "Direction",
"The direction of the Flow observed at the Observation Point.": "The direction of the Flow observed at the Observation Point.",
"The direction of the Flow observed at the Node observation point.": "The direction of the Flow observed at the Node observation point.",
"Interface": "Interface",
"The network interface of the Flow.": "The network interface of the Flow.",
"The total aggregated number of bytes.": "The total aggregated number of bytes.",
Expand Down Expand Up @@ -450,7 +451,7 @@
"A IANA name like TCP, UDP": "A IANA name like TCP, UDP",
"Empty double quotes \"\" for undefined protocol": "Empty double quotes \"\" for undefined protocol",
"Unknown direction": "Unknown direction",
"Specify the direction of the Flow observed at the Observation Point.": "Specify the direction of the Flow observed at the Observation Point.",
"Specify the direction of the Flow observed at the Node observation point.": "Specify the direction of the Flow observed at the Node observation point.",
"Network interface": "Network interface",
"Specify a network interface.": "Specify a network interface.",
"Specify a single conversation hash Id.": "Specify a single conversation hash Id.",
Expand Down
12 changes: 11 additions & 1 deletion web/src/api/ipfix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { RecordType } from '../model/flow-query';

// Please keep this file documented: it is used in doc generation
// To regenerate doc, run `make generate-doc` - and also check this page:
// eslint-disable-next-line max-len
// https://github.com/netobserv/network-observability-operator/blob/main/docs/GeneratingAsciidocAPI.md#generate-asciidoc-for-flows-json-format-reference

export interface Record {
Expand Down Expand Up @@ -32,6 +31,15 @@ export enum FlowDirection {
/** Incoming traffic, from node observation point */
Ingress = '0',
/** Outgoing traffic, from node observation point */
Egress = '1',
/** Inner traffic, ie. same source and destination node */
Inner = '2'
}

export enum InterfaceDirection {
/** Incoming traffic, from network interface observation point */
Ingress = '0',
/** Outgoing traffic, from network interface observation point */
Egress = '1'
}

Expand Down Expand Up @@ -72,6 +80,8 @@ export interface Fields {
Proto: number;
/** Network interface */
Interface?: string;
/** Flow direction from the network interface observation point */
IfDirection?: InterfaceDirection;
/** TCP flags */
Flags?: number;
/** Number of packets in this flow */
Expand Down
2 changes: 1 addition & 1 deletion web/src/components/dropdowns/query-options-dropdown.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ export const QueryOptionsPanel: React.FC<QueryOptionsDropdownProps> = ({
<Tooltip
content={t(
// eslint-disable-next-line max-len
'A flow might be reported from several interfaces, and from both source and destination nodes, making it appear several times. By default, duplicates are hidden. Showing duplicates is not possible in Overview and Topology tabs to avoid altering metric calculations. Use the Direction filter to switch between ingress / egress traffic.'
'A flow might be reported from several interfaces, and from both source and destination nodes, making it appear several times. By default, duplicates are hidden. Showing duplicates is not possible in Overview and Topology tabs to avoid altering metric calculations. Use the Direction filter to switch between ingress, egress and inner-node traffic.'
)}
>
<div className="pf-c-select__menu-group-title">
Expand Down
8 changes: 7 additions & 1 deletion web/src/components/netflow-record/record-field.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,13 @@ export const RecordField: React.FC<{
case ColumnsId.flowdir:
return singleContainer(
simpleTextWithTooltip(
value === FlowDirection.Ingress ? t('Ingress') : value === FlowDirection.Egress ? t('Egress') : t('n/a')
value === FlowDirection.Ingress
? t('Ingress')
: value === FlowDirection.Egress
? t('Egress')
: value === FlowDirection.Inner
? t('Inner')
: t('n/a')
)
);
case ColumnsId.packets:
Expand Down
2 changes: 1 addition & 1 deletion web/src/utils/columns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ export const getExtraColumns = (t: TFunction): Column[] => {
{
id: ColumnsId.flowdir,
name: t('Direction'),
tooltip: t('The direction of the Flow observed at the Observation Point.'),
tooltip: t('The direction of the Flow observed at the Node observation point.'),
fieldName: 'FlowDirection',
quickFilter: 'direction',
isSelected: false,
Expand Down
2 changes: 1 addition & 1 deletion web/src/utils/filter-definitions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ export const getFilterDefinitions = (
}
return invalid(t('Unknown direction'));
},
hint: t('Specify the direction of the Flow observed at the Observation Point.'),
hint: t('Specify the direction of the Flow observed at the Node observation point.'),
encoder: simpleFiltersEncoder('FlowDirection'),
overlap: false
},
Expand Down
3 changes: 2 additions & 1 deletion web/src/utils/filter-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ export const getProtocolOptions = (value: string): Promise<FilterOption[]> => {
export const getDirectionOptions = (t: TFunction): FilterOption[] => {
return [
{ name: t('Ingress'), value: String(FlowDirection.Ingress) },
{ name: t('Egress'), value: String(FlowDirection.Egress) }
{ name: t('Egress'), value: String(FlowDirection.Egress) },
{ name: t('Inner'), value: String(FlowDirection.Inner) }
];
};

Expand Down
16 changes: 10 additions & 6 deletions web/src/utils/flows.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import * as _ from 'lodash';
import { FlowDirection, Record } from '../api/ipfix';
import { Record } from '../api/ipfix';
import { get5Tuple } from './ids';

export const mergeFlowReporters = (flows: Record[]): Record[] => {
// The purpose of this function is to determine if, for a given 5 tuple, we'll look at INGRESS or EGRESS reporter
// The assumption is that INGRESS alone, or EGRESS alone always provide a complete visiblity however
// Ingress traffic will also contains pktDrop and DNS responses
// The logic is to index flows by 5 tuples, then for each indexed set, keep only the INGRESS if present, otherwise EGRESS
// The purpose of this function is to determine if, for a given 5 tuple, we'll look at INGRESS, EGRESS or INNER reporter
// The assumption is that INGRESS alone, EGRESS alone or INNER alone always provide a complete visiblity
// Favor whichever contains pktDrop and/or DNS responses
const grouped = _.groupBy(flows, get5Tuple);
const filtersIndex = _.mapValues(
grouped,
(recs: Record[]) =>
(recs.find(r => r.labels.FlowDirection === FlowDirection.Ingress) || recs[0]).labels.FlowDirection
(
recs.find(
r =>
r.fields.DnsId !== undefined || r.fields.PktDropBytes !== undefined || r.fields.PktDropPackets !== undefined
) || recs[0]
).labels.FlowDirection
);
return flows.filter((r: Record) => r.labels.FlowDirection === filtersIndex[get5Tuple(r)]);
};

0 comments on commit bffb860

Please sign in to comment.