Skip to content

Commit

Permalink
NETOBSERV-1692: allow KEEP filtering logic
Browse files Browse the repository at this point in the history
  • Loading branch information
jotak committed Nov 13, 2024
1 parent 5bb2146 commit 1c8c864
Show file tree
Hide file tree
Showing 4 changed files with 315 additions and 23 deletions.
14 changes: 14 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ Following is the supported API format for filter transformations:
remove_entry_if_equal: removes the entry if the field value equals specified value
remove_entry_if_not_equal: removes the entry if the field value does not equal specified value
remove_entry_all_satisfied: removes the entry if all of the defined rules are satisfied
keep_entry: keeps the entry if the set of rules are all satisfied
add_field: adds (input) field to the entry; overrides previous value if present (key=input, value=value)
add_field_if_doesnt_exist: adds a field to the entry if the field does not exist
add_field_if: add output field set to assignee if input field satisfies criteria from parameters field
Expand All @@ -186,6 +187,19 @@ Following is the supported API format for filter transformations:
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
keepEntryAllSatisfied: configuration for keep_entry rule
type: (enum) one of the following:
keep_entry_if_exists: keeps the entry if the field exists
keep_entry_if_doesnt_exist: keeps the entry if the field does not exist
keep_entry_if_equal: keeps the entry if the field value equals specified value
keep_entry_if_not_equal: keeps the entry if the field value does not equal specified value
keep_entry_if_regex_match: keeps the entry if the field value matches the specified regex
keep_entry_if_not_regex_match: keeps the entry if the field value does not match the specified regex
keepEntry: configuration for keep_entry_* rules
input: entry input field
value: specified value of input field:
castInt: set true to cast the value field as an int (numeric values are float64 otherwise)
keepEntrySampling: sampling value for keep_entry type: 1 flow on <sampling> is kept
addField: configuration for add_field rule
input: entry input field
value: specified value of input field:
Expand Down
83 changes: 72 additions & 11 deletions pkg/api/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,23 @@

package api

import (
"errors"
"regexp"
)

type TransformFilter struct {
Rules []TransformFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"`
}

func (tf *TransformFilter) Preprocess() {
func (tf *TransformFilter) Preprocess() error {
var errs []error
for i := range tf.Rules {
tf.Rules[i].preprocess()
if err := tf.Rules[i].preprocess(); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}

type TransformFilterEnum string
Expand All @@ -37,6 +46,7 @@ const (
RemoveEntryIfEqual TransformFilterEnum = "remove_entry_if_equal" // removes the entry if the field value equals specified value
RemoveEntryIfNotEqual TransformFilterEnum = "remove_entry_if_not_equal" // removes the entry if the field value does not equal specified value
RemoveEntryAllSatisfied TransformFilterEnum = "remove_entry_all_satisfied" // removes the entry if all of the defined rules are satisfied
KeepEntry TransformFilterEnum = "keep_entry" // keeps the entry if the set of rules are all satisfied
AddField TransformFilterEnum = "add_field" // adds (input) field to the entry; overrides previous value if present (key=input, value=value)
AddFieldIfDoesntExist TransformFilterEnum = "add_field_if_doesnt_exist" // adds a field to the entry if the field does not exist
AddFieldIf TransformFilterEnum = "add_field_if" // add output field set to assignee if input field satisfies criteria from parameters field
Expand All @@ -55,11 +65,24 @@ const (
RemoveEntryIfNotEqualD TransformFilterRemoveEntryEnum = "remove_entry_if_not_equal" // removes the entry if the field value does not equal specified value
)

type TransformFilterKeepEntryEnum string

const (
KeepEntryIfExists TransformFilterKeepEntryEnum = "keep_entry_if_exists" // keeps the entry if the field exists
KeepEntryIfDoesntExist TransformFilterKeepEntryEnum = "keep_entry_if_doesnt_exist" // keeps the entry if the field does not exist
KeepEntryIfEqual TransformFilterKeepEntryEnum = "keep_entry_if_equal" // keeps the entry if the field value equals specified value
KeepEntryIfNotEqual TransformFilterKeepEntryEnum = "keep_entry_if_not_equal" // keeps the entry if the field value does not equal specified value
KeepEntryIfRegexMatch TransformFilterKeepEntryEnum = "keep_entry_if_regex_match" // keeps the entry if the field value matches the specified regex
KeepEntryIfNotRegexMatch TransformFilterKeepEntryEnum = "keep_entry_if_not_regex_match" // keeps the entry if the field value does not match the specified regex
)

type TransformFilterRule struct {
Type TransformFilterEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
RemoveField *TransformFilterGenericRule `yaml:"removeField,omitempty" json:"removeField,omitempty" doc:"configuration for remove_field rule"`
RemoveEntry *TransformFilterGenericRule `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"`
RemoveEntryAllSatisfied []*RemoveEntryRule `yaml:"removeEntryAllSatisfied,omitempty" json:"removeEntryAllSatisfied,omitempty" doc:"configuration for remove_entry_all_satisfied rule"`
KeepEntryAllSatisfied []*KeepEntryRule `yaml:"keepEntryAllSatisfied,omitempty" json:"keepEntryAllSatisfied,omitempty" doc:"configuration for keep_entry rule"`
KeepEntrySampling uint16 `yaml:"keepEntrySampling,omitempty" json:"keepEntrySampling,omitempty" doc:"sampling value for keep_entry type: 1 flow on <sampling> is kept"`
AddField *TransformFilterGenericRule `yaml:"addField,omitempty" json:"addField,omitempty" doc:"configuration for add_field rule"`
AddFieldIfDoesntExist *TransformFilterGenericRule `yaml:"addFieldIfDoesntExist,omitempty" json:"addFieldIfDoesntExist,omitempty" doc:"configuration for add_field_if_doesnt_exist rule"`
AddFieldIf *TransformFilterRuleWithAssignee `yaml:"addFieldIf,omitempty" json:"addFieldIf,omitempty" doc:"configuration for add_field_if rule"`
Expand All @@ -69,19 +92,35 @@ type TransformFilterRule struct {
ConditionalSampling []*SamplingCondition `yaml:"conditionalSampling,omitempty" json:"conditionalSampling,omitempty" doc:"sampling configuration rules"`
}

func (r *TransformFilterRule) preprocess() {
func (r *TransformFilterRule) preprocess() error {
var errs []error
if r.RemoveField != nil {
r.RemoveField.preprocess()
if err := r.RemoveField.preprocess(false); err != nil {
errs = append(errs, err)
}
}
if r.RemoveEntry != nil {
r.RemoveEntry.preprocess()
if err := r.RemoveEntry.preprocess(false); err != nil {
errs = append(errs, err)
}
}
for i := range r.RemoveEntryAllSatisfied {
r.RemoveEntryAllSatisfied[i].RemoveEntry.preprocess()
if err := r.RemoveEntryAllSatisfied[i].RemoveEntry.preprocess(false); err != nil {
errs = append(errs, err)
}
}
for i := range r.KeepEntryAllSatisfied {
err := r.KeepEntryAllSatisfied[i].KeepEntry.preprocess(r.KeepEntryAllSatisfied[i].Type == KeepEntryIfRegexMatch || r.KeepEntryAllSatisfied[i].Type == KeepEntryIfNotRegexMatch)
if err != nil {
errs = append(errs, err)
}
}
for i := range r.ConditionalSampling {
r.ConditionalSampling[i].preprocess()
if err := r.ConditionalSampling[i].preprocess(); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}

type TransformFilterGenericRule struct {
Expand All @@ -90,12 +129,25 @@ type TransformFilterGenericRule struct {
CastInt bool `yaml:"castInt,omitempty" json:"castInt,omitempty" doc:"set true to cast the value field as an int (numeric values are float64 otherwise)"`
}

func (r *TransformFilterGenericRule) preprocess() {
if r.CastInt {
func (r *TransformFilterGenericRule) preprocess(isRegex bool) error {
if isRegex {
if s, ok := r.Value.(string); ok {
v, err := regexp.Compile(s)
if err != nil {
r.Value = nil
return err
}
r.Value = v
} else {
r.Value = nil
return errors.New("regex filter expects string value")
}
} else if r.CastInt {
if f, ok := r.Value.(float64); ok {
r.Value = int(f)
}
}
return nil
}

type TransformFilterRuleWithAssignee struct {
Expand All @@ -110,13 +162,22 @@ type RemoveEntryRule struct {
RemoveEntry *TransformFilterGenericRule `yaml:"removeEntry,omitempty" json:"removeEntry,omitempty" doc:"configuration for remove_entry_* rules"`
}

type KeepEntryRule struct {
Type TransformFilterKeepEntryEnum `yaml:"type,omitempty" json:"type,omitempty" doc:"(enum) one of the following:"`
KeepEntry *TransformFilterGenericRule `yaml:"keepEntry,omitempty" json:"keepEntry,omitempty" doc:"configuration for keep_entry_* rules"`
}

type SamplingCondition struct {
Value uint16 `yaml:"value,omitempty" json:"value,omitempty" doc:"sampling value: 1 flow on <sampling> is kept"`
Rules []*RemoveEntryRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"rules to be satisfied for this sampling configuration"`
}

func (s *SamplingCondition) preprocess() {
func (s *SamplingCondition) preprocess() error {
var errs []error
for i := range s.Rules {
s.Rules[i].RemoveEntry.preprocess()
if err := s.Rules[i].RemoveEntry.preprocess(false); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
92 changes: 87 additions & 5 deletions pkg/pipeline/transform/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,28 @@ var (
)

type Filter struct {
Rules []api.TransformFilterRule
Rules []api.TransformFilterRule
KeepRules []api.TransformFilterRule
}

// Transform transforms a flow; if false is returned as a second argument, the entry is dropped
func (f *Filter) Transform(entry config.GenericMap) (config.GenericMap, bool) {
tlog.Tracef("f = %v", f)
outputEntry := entry.Copy()
labels := make(map[string]string)
if len(f.KeepRules) > 0 {
keep := false
for i := range f.KeepRules {
tlog.Tracef("keep rule = %v", f.KeepRules[i])
if applyRule(outputEntry, labels, &f.KeepRules[i]) {
keep = true
break
}
}
if !keep {
return nil, false
}
}
for i := range f.Rules {
tlog.Tracef("rule = %v", f.Rules[i])
if cont := applyRule(outputEntry, labels, &f.Rules[i]); !cont {
Expand Down Expand Up @@ -143,6 +157,8 @@ func applyRule(entry config.GenericMap, labels map[string]string, rule *api.Tran
return !isRemoveEntrySatisfied(entry, rule.RemoveEntryAllSatisfied)
case api.ConditionalSampling:
return sample(entry, rule.ConditionalSampling)
case api.KeepEntry:
return rollSampling(rule.KeepEntrySampling) && isKeepEntrySatisfied(entry, rule.KeepEntryAllSatisfied)
default:
tlog.Panicf("unknown type %s for transform.Filter rule: %v", rule.Type, rule)
}
Expand All @@ -159,25 +175,91 @@ func isRemoveEntrySatisfied(entry config.GenericMap, rules []*api.RemoveEntryRul
return true
}

func isKeepEntrySatisfied(entry config.GenericMap, rules []*api.KeepEntryRule) bool {
for _, r := range rules {
val, ok := entry[r.KeepEntry.Input]
switch r.Type {
case api.KeepEntryIfExists:
if !ok {
return false
}
case api.KeepEntryIfDoesntExist:
if ok {
return false
}
case api.KeepEntryIfEqual:
if !ok || val != r.KeepEntry.Value {
return false
}
case api.KeepEntryIfNotEqual:
if ok && val == r.KeepEntry.Value {
return false
}
case api.KeepEntryIfRegexMatch:
if ok {
match, ok := checkRegex(r.KeepEntry.Value, val)
if !ok || !match {
return false
}
} else {
return false
}
case api.KeepEntryIfNotRegexMatch:
if ok {
match, ok := checkRegex(r.KeepEntry.Value, val)
if !ok || match {
return false
}
} else {
return false
}
}
}
return true
}

// Returns (valid, match)
func checkRegex(maybeReg any, value any) (bool, bool) {
reg, ok := maybeReg.(*regexp.Regexp)
if !ok {
return false, false
}
return true, reg.MatchString(utils.ConvertToString(value))
}

func sample(entry config.GenericMap, rules []*api.SamplingCondition) bool {
for _, r := range rules {
if isRemoveEntrySatisfied(entry, r.Rules) {
return r.Value == 0 || (rndgen.Intn(int(r.Value)) == 0)
return rollSampling(r.Value)
}
}
return true
}

func rollSampling(value uint16) bool {
return value == 0 || (rndgen.Intn(int(value)) == 0)
}

// NewTransformFilter create a new filter transform
func NewTransformFilter(params config.StageParam) (Transformer, error) {
tlog.Debugf("entering NewTransformFilter")
keepRules := []api.TransformFilterRule{}
rules := []api.TransformFilterRule{}
if params.Transform != nil && params.Transform.Filter != nil {
params.Transform.Filter.Preprocess()
rules = params.Transform.Filter.Rules
if err := params.Transform.Filter.Preprocess(); err != nil {
return nil, err
}
for i := range params.Transform.Filter.Rules {
if params.Transform.Filter.Rules[i].Type == api.KeepEntry {
keepRules = append(keepRules, params.Transform.Filter.Rules[i])
} else {
rules = append(rules, params.Transform.Filter.Rules[i])
}
}
}
transformFilter := &Filter{
Rules: rules,
Rules: rules,
KeepRules: keepRules,
}
return transformFilter, nil
}
Loading

0 comments on commit 1c8c864

Please sign in to comment.