Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1692: allow KEEP filtering logic #740

Merged
merged 4 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ Following is the supported API format for filter transformations:
remove_entry_if_equal: removes the entry if the field value equals specified value
remove_entry_if_not_equal: removes the entry if the field value does not equal specified value
remove_entry_all_satisfied: removes the entry if all of the defined rules are satisfied
keep_entry_all_satisfied: keeps the entry if the set of rules are all satisfied
add_field: adds (input) field to the entry; overrides previous value if present (key=input, value=value)
add_field_if_doesnt_exist: adds a field to the entry if the field does not exist
add_field_if: add output field set to assignee if input field satisfies criteria from parameters field
Expand All @@ -187,6 +188,19 @@ Following is the supported API format for filter transformations:
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
keepEntryAllSatisfied: configuration for keep_entry rule
type: (enum) one of the following:
keep_entry_if_exists: keeps the entry if the field exists
keep_entry_if_doesnt_exist: keeps the entry if the field does not exist
keep_entry_if_equal: keeps the entry if the field value equals specified value
keep_entry_if_not_equal: keeps the entry if the field value does not equal specified value
keep_entry_if_regex_match: keeps the entry if the field value matches the specified regex
keep_entry_if_not_regex_match: keeps the entry if the field value does not match the specified regex
keepEntry: configuration for keep_entry_* rules
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
keepEntrySampling: sampling value for keep_entry type: 1 flow on <sampling> is kept
addField: configuration for add_field rule
input: entry input field
value: specified value of input field:
Expand Down
22 changes: 22 additions & 0 deletions pkg/api/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
RemoveEntryIfEqual TransformFilterEnum = "remove_entry_if_equal" // removes the entry if the field value equals specified value
RemoveEntryIfNotEqual TransformFilterEnum = "remove_entry_if_not_equal" // removes the entry if the field value does not equal specified value
RemoveEntryAllSatisfied TransformFilterEnum = "remove_entry_all_satisfied" // removes the entry if all of the defined rules are satisfied
KeepEntryAllSatisfied TransformFilterEnum = "keep_entry_all_satisfied" // keeps the entry if the set of rules are all satisfied
AddField TransformFilterEnum = "add_field" // adds (input) field to the entry; overrides previous value if present (key=input, value=value)
AddFieldIfDoesntExist TransformFilterEnum = "add_field_if_doesnt_exist" // adds a field to the entry if the field does not exist
AddFieldIf TransformFilterEnum = "add_field_if" // add output field set to assignee if input field satisfies criteria from parameters field
Expand All @@ -55,11 +56,24 @@ const (
RemoveEntryIfNotEqualD TransformFilterRemoveEntryEnum = "remove_entry_if_not_equal" // removes the entry if the field value does not equal specified value
)

type TransformFilterKeepEntryEnum string

const (
KeepEntryIfExists TransformFilterKeepEntryEnum = "keep_entry_if_exists" // keeps the entry if the field exists
KeepEntryIfDoesntExist TransformFilterKeepEntryEnum = "keep_entry_if_doesnt_exist" // keeps the entry if the field does not exist
KeepEntryIfEqual TransformFilterKeepEntryEnum = "keep_entry_if_equal" // keeps the entry if the field value equals specified value
KeepEntryIfNotEqual TransformFilterKeepEntryEnum = "keep_entry_if_not_equal" // keeps the entry if the field value does not equal specified value
KeepEntryIfRegexMatch TransformFilterKeepEntryEnum = "keep_entry_if_regex_match" // keeps the entry if the field value matches the specified regex
KeepEntryIfNotRegexMatch TransformFilterKeepEntryEnum = "keep_entry_if_not_regex_match" // keeps the entry if the field value does not match the specified regex
)

type TransformFilterRule struct {
Type TransformFilterEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
RemoveField *TransformFilterGenericRule `yaml:"removeField,omitempty" json:"removeField,omitempty" doc:"configuration for remove_field rule"`
RemoveEntry *TransformFilterGenericRule `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"`
RemoveEntryAllSatisfied []*RemoveEntryRule `yaml:"removeEntryAllSatisfied,omitempty" json:"removeEntryAllSatisfied,omitempty" doc:"configuration for remove_entry_all_satisfied rule"`
KeepEntryAllSatisfied []*KeepEntryRule `yaml:"keepEntryAllSatisfied,omitempty" json:"keepEntryAllSatisfied,omitempty" doc:"configuration for keep_entry rule"`
KeepEntrySampling uint16 `yaml:"keepEntrySampling,omitempty" json:"keepEntrySampling,omitempty" doc:"sampling value for keep_entry type: 1 flow on <sampling> is kept"`
AddField *TransformFilterGenericRule `yaml:"addField,omitempty" json:"addField,omitempty" doc:"configuration for add_field rule"`
AddFieldIfDoesntExist *TransformFilterGenericRule `yaml:"addFieldIfDoesntExist,omitempty" json:"addFieldIfDoesntExist,omitempty" doc:"configuration for add_field_if_doesnt_exist rule"`
AddFieldIf *TransformFilterRuleWithAssignee `yaml:"addFieldIf,omitempty" json:"addFieldIf,omitempty" doc:"configuration for add_field_if rule"`
Expand All @@ -79,6 +93,9 @@ func (r *TransformFilterRule) preprocess() {
for i := range r.RemoveEntryAllSatisfied {
r.RemoveEntryAllSatisfied[i].RemoveEntry.preprocess()
}
for i := range r.KeepEntryAllSatisfied {
r.KeepEntryAllSatisfied[i].KeepEntry.preprocess()
}
for i := range r.ConditionalSampling {
r.ConditionalSampling[i].preprocess()
}
Expand Down Expand Up @@ -110,6 +127,11 @@ type RemoveEntryRule struct {
RemoveEntry *TransformFilterGenericRule `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"`
}

type KeepEntryRule struct {
Type TransformFilterKeepEntryEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
KeepEntry *TransformFilterGenericRule `yaml:"keepEntry,omitempty" json:"keepEntry,omitempty" doc:"configuration for keep_entry_* rules"`
}

type SamplingCondition struct {
Value uint16 `yaml:"value,omitempty" json:"value,omitempty" doc:"sampling value: 1 flow on <sampling> is kept"`
Rules []*RemoveEntryRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"rules to be satisfied for this sampling configuration"`
Expand Down
107 changes: 12 additions & 95 deletions pkg/pipeline/encode/metrics/preprocess.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package metrics

import (
"fmt"
"regexp"
"strings"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/utils/filters"
)

type Predicate func(config.GenericMap) bool

var variableExtractor = regexp.MustCompile(`\$\(([^\)]+)\)`)

type Preprocessed struct {
*api.MetricsItem
filters []preprocessedFilter
Expand All @@ -27,7 +21,7 @@ type MappedLabel struct {
}

type preprocessedFilter struct {
predicate Predicate
predicate filters.Predicate
useFlat bool
}

Expand All @@ -42,102 +36,25 @@ func (p *Preprocessed) TargetLabels() []string {
return targetLabels
}

func Presence(filter api.MetricsFilter) Predicate {
return func(flow config.GenericMap) bool {
_, found := flow[filter.Key]
return found
}
}

func Absence(filter api.MetricsFilter) Predicate {
pred := Presence(filter)
return func(flow config.GenericMap) bool { return !pred(flow) }
}

func Equal(filter api.MetricsFilter) Predicate {
varLookups := extractVarLookups(filter.Value)
return func(flow config.GenericMap) bool {
if val, found := flow[filter.Key]; found {
sVal, ok := val.(string)
if !ok {
sVal = fmt.Sprint(val)
}
value := filter.Value
if len(varLookups) > 0 {
value = injectVars(flow, value, varLookups)
}
return sVal == value
}
return false
}
}

func NotEqual(filter api.MetricsFilter) Predicate {
pred := Equal(filter)
return func(flow config.GenericMap) bool { return !pred(flow) }
}

func Regex(filter api.MetricsFilter) Predicate {
r, _ := regexp.Compile(filter.Value)
return func(flow config.GenericMap) bool {
if val, found := flow[filter.Key]; found {
sVal, ok := val.(string)
if !ok {
sVal = fmt.Sprint(val)
}
return r.MatchString(sVal)
}
return false
}
}

func NotRegex(filter api.MetricsFilter) Predicate {
pred := Regex(filter)
return func(flow config.GenericMap) bool { return !pred(flow) }
}

func filterToPredicate(filter api.MetricsFilter) Predicate {
func filterToPredicate(filter api.MetricsFilter) filters.Predicate {
switch filter.Type {
case api.MetricFilterEqual:
return Equal(filter)
return filters.Equal(filter.Key, filter.Value, true)
case api.MetricFilterNotEqual:
return NotEqual(filter)
return filters.NotEqual(filter.Key, filter.Value, true)
case api.MetricFilterPresence:
return Presence(filter)
return filters.Presence(filter.Key)
case api.MetricFilterAbsence:
return Absence(filter)
return filters.Absence(filter.Key)
case api.MetricFilterRegex:
return Regex(filter)
r, _ := regexp.Compile(filter.Value)
return filters.Regex(filter.Key, r)
case api.MetricFilterNotRegex:
return NotRegex(filter)
r, _ := regexp.Compile(filter.Value)
return filters.NotRegex(filter.Key, r)
}
// Default = Exact
return Equal(filter)
}

func extractVarLookups(value string) [][]string {
// Extract list of variables to lookup
// E.g: filter "$(SrcAddr):$(SrcPort)" would return [SrcAddr,SrcPort]
if len(value) > 0 {
return variableExtractor.FindAllStringSubmatch(value, -1)
}
return nil
}

func injectVars(flow config.GenericMap, filterValue string, varLookups [][]string) string {
injected := filterValue
for _, matchGroup := range varLookups {
var value string
if rawVal, found := flow[matchGroup[1]]; found {
if sVal, ok := rawVal.(string); ok {
value = sVal
} else {
value = utils.ConvertToString(rawVal)
}
}
injected = strings.ReplaceAll(injected, matchGroup[0], value)
}
return injected
return filters.Equal(filter.Key, filter.Value, true)
}

func Preprocess(def *api.MetricsItem) *Preprocessed {
Expand Down
9 changes: 0 additions & 9 deletions pkg/pipeline/encode/metrics/preprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,6 @@ import (
"github.com/stretchr/testify/assert"
)

func Test_Filters_extractVarLookups(t *testing.T) {
variables := extractVarLookups("$(abc)--$(def)")

assert.Equal(t, [][]string{{"$(abc)", "abc"}, {"$(def)", "def"}}, variables)

variables = extractVarLookups("")
assert.Empty(t, variables)
}

func Test_Flatten(t *testing.T) {
pp := Preprocess(&api.MetricsItem{Flatten: []string{"interfaces", "events"}})
fl := pp.GenerateFlatParts(config.GenericMap{
Expand Down
63 changes: 59 additions & 4 deletions pkg/pipeline/transform/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/utils/filters"
"github.com/sirupsen/logrus"
)

Expand All @@ -37,14 +38,32 @@ var (
)

type Filter struct {
Rules []api.TransformFilterRule
Rules []api.TransformFilterRule
KeepRules []predicatesRule
}

type predicatesRule struct {
predicates []filters.Predicate
sampling uint16
}

// Transform transforms a flow; if false is returned as a second argument, the entry is dropped
func (f *Filter) Transform(entry config.GenericMap) (config.GenericMap, bool) {
tlog.Tracef("f = %v", f)
outputEntry := entry.Copy()
labels := make(map[string]string)
if len(f.KeepRules) > 0 {
keep := false
for _, r := range f.KeepRules {
if applyPredicates(outputEntry, r) {
keep = true
break
}
}
if !keep {
return nil, false
}
}
for i := range f.Rules {
tlog.Tracef("rule = %v", f.Rules[i])
if cont := applyRule(outputEntry, labels, &f.Rules[i]); !cont {
Expand Down Expand Up @@ -143,6 +162,9 @@ func applyRule(entry config.GenericMap, labels map[string]string, rule *api.Tran
return !isRemoveEntrySatisfied(entry, rule.RemoveEntryAllSatisfied)
case api.ConditionalSampling:
return sample(entry, rule.ConditionalSampling)
case api.KeepEntryAllSatisfied:
// This should be processed only in "applyPredicates". Failure to do so is a bug.
tlog.Panicf("unexpected KeepEntryAllSatisfied: %v", rule)
default:
tlog.Panicf("unknown type %s for transform.Filter rule: %v", rule.Type, rule)
}
Expand All @@ -159,25 +181,58 @@ func isRemoveEntrySatisfied(entry config.GenericMap, rules []*api.RemoveEntryRul
return true
}

func applyPredicates(entry config.GenericMap, rule predicatesRule) bool {
if !rollSampling(rule.sampling) {
return false
}
for _, p := range rule.predicates {
if !p(entry) {
return false
}
}
return true
}

func sample(entry config.GenericMap, rules []*api.SamplingCondition) bool {
for _, r := range rules {
if isRemoveEntrySatisfied(entry, r.Rules) {
return r.Value == 0 || (rndgen.Intn(int(r.Value)) == 0)
return rollSampling(r.Value)
}
}
return true
}

func rollSampling(value uint16) bool {
return value == 0 || (rndgen.Intn(int(value)) == 0)
}

// NewTransformFilter create a new filter transform
func NewTransformFilter(params config.StageParam) (Transformer, error) {
tlog.Debugf("entering NewTransformFilter")
keepRules := []predicatesRule{}
rules := []api.TransformFilterRule{}
if params.Transform != nil && params.Transform.Filter != nil {
params.Transform.Filter.Preprocess()
rules = params.Transform.Filter.Rules
for i := range params.Transform.Filter.Rules {
baseRules := &params.Transform.Filter.Rules[i]
if baseRules.Type == api.KeepEntryAllSatisfied {
pr := predicatesRule{sampling: baseRules.KeepEntrySampling}
for _, keepRule := range baseRules.KeepEntryAllSatisfied {
pred, err := filters.FromKeepEntry(keepRule)
if err != nil {
return nil, err
}
pr.predicates = append(pr.predicates, pred)
}
keepRules = append(keepRules, pr)
} else {
rules = append(rules, *baseRules)
}
}
}
transformFilter := &Filter{
Rules: rules,
Rules: rules,
KeepRules: keepRules,
}
return transformFilter, nil
}
Loading
Loading