Skip to content

Commit

Permalink
Share filter predicate code (prom+transform)
Browse files Browse the repository at this point in the history
Move prom-encode predicates filtering code to its own package and share
it with "keep_entry" transforms
  • Loading branch information
jotak committed Nov 13, 2024
1 parent 417769d commit 76131e0
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 218 deletions.
63 changes: 12 additions & 51 deletions pkg/api/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,14 @@

package api

import (
"errors"
"regexp"
)

type TransformFilter struct {
Rules []TransformFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"`
}

func (tf *TransformFilter) Preprocess() error {
var errs []error
func (tf *TransformFilter) Preprocess() {
for i := range tf.Rules {
if err := tf.Rules[i].preprocess(); err != nil {
errs = append(errs, err)
}
tf.Rules[i].preprocess()
}
return errors.Join(errs...)
}

type TransformFilterEnum string
Expand Down Expand Up @@ -92,35 +83,22 @@ type TransformFilterRule struct {
ConditionalSampling []*SamplingCondition `yaml:"conditionalSampling,omitempty" json:"conditionalSampling,omitempty" doc:"sampling configuration rules"`
}

func (r *TransformFilterRule) preprocess() error {
var errs []error
func (r *TransformFilterRule) preprocess() {
if r.RemoveField != nil {
if err := r.RemoveField.preprocess(false); err != nil {
errs = append(errs, err)
}
r.RemoveField.preprocess()
}
if r.RemoveEntry != nil {
if err := r.RemoveEntry.preprocess(false); err != nil {
errs = append(errs, err)
}
r.RemoveEntry.preprocess()
}
for i := range r.RemoveEntryAllSatisfied {
if err := r.RemoveEntryAllSatisfied[i].RemoveEntry.preprocess(false); err != nil {
errs = append(errs, err)
}
r.RemoveEntryAllSatisfied[i].RemoveEntry.preprocess()
}
for i := range r.KeepEntryAllSatisfied {
err := r.KeepEntryAllSatisfied[i].KeepEntry.preprocess(r.KeepEntryAllSatisfied[i].Type == KeepEntryIfRegexMatch || r.KeepEntryAllSatisfied[i].Type == KeepEntryIfNotRegexMatch)
if err != nil {
errs = append(errs, err)
}
r.KeepEntryAllSatisfied[i].KeepEntry.preprocess()
}
for i := range r.ConditionalSampling {
if err := r.ConditionalSampling[i].preprocess(); err != nil {
errs = append(errs, err)
}
r.ConditionalSampling[i].preprocess()
}
return errors.Join(errs...)
}

type TransformFilterGenericRule struct {
Expand All @@ -129,25 +107,12 @@ type TransformFilterGenericRule struct {
CastInt bool `yaml:"castInt,omitempty" json:"castInt,omitempty" doc:"set true to cast the value field as an int (numeric values are float64 otherwise)"`
}

func (r *TransformFilterGenericRule) preprocess(isRegex bool) error {
if isRegex {
if s, ok := r.Value.(string); ok {
v, err := regexp.Compile(s)
if err != nil {
r.Value = nil
return err
}
r.Value = v
} else {
r.Value = nil
return errors.New("regex filter expects string value")
}
} else if r.CastInt {
func (r *TransformFilterGenericRule) preprocess() {
if r.CastInt {
if f, ok := r.Value.(float64); ok {
r.Value = int(f)
}
}
return nil
}

type TransformFilterRuleWithAssignee struct {
Expand All @@ -172,12 +137,8 @@ type SamplingCondition struct {
Rules []*RemoveEntryRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"rules to be satisfied for this sampling configuration"`
}

func (s *SamplingCondition) preprocess() error {
var errs []error
func (s *SamplingCondition) preprocess() {
for i := range s.Rules {
if err := s.Rules[i].RemoveEntry.preprocess(false); err != nil {
errs = append(errs, err)
}
s.Rules[i].RemoveEntry.preprocess()
}
return errors.Join(errs...)
}
109 changes: 12 additions & 97 deletions pkg/pipeline/encode/encode_prom_metric.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,15 @@
package encode

import (
"fmt"
"regexp"
"strings"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/utils/filters"
)

type Predicate func(flow config.GenericMap) bool

var variableExtractor = regexp.MustCompile(`\$\(([^\)]+)\)`)

type MetricInfo struct {
*api.MetricsItem
FilterPredicates []Predicate
FilterPredicates []filters.Predicate
MappedLabels []MappedLabel
}

Expand All @@ -32,104 +26,25 @@ func (m *MetricInfo) TargetLabels() []string {
return targetLabels
}

func Presence(filter api.MetricsFilter) Predicate {
return func(flow config.GenericMap) bool {
_, found := flow[filter.Key]
return found
}
}

func Absence(filter api.MetricsFilter) Predicate {
return func(flow config.GenericMap) bool {
_, found := flow[filter.Key]
return !found
}
}

func Equal(filter api.MetricsFilter) Predicate {
varLookups := extractVarLookups(filter.Value)
return func(flow config.GenericMap) bool {
if val, found := flow[filter.Key]; found {
sVal, ok := val.(string)
if !ok {
sVal = fmt.Sprint(val)
}
value := filter.Value
if len(varLookups) > 0 {
value = injectVars(flow, value, varLookups)
}
return sVal == value
}
return false
}
}

func NotEqual(filter api.MetricsFilter) Predicate {
pred := Equal(filter)
return func(flow config.GenericMap) bool { return !pred(flow) }
}

func Regex(filter api.MetricsFilter) Predicate {
r, _ := regexp.Compile(filter.Value)
return func(flow config.GenericMap) bool {
if val, found := flow[filter.Key]; found {
sVal, ok := val.(string)
if !ok {
sVal = fmt.Sprint(val)
}
return r.MatchString(sVal)
}
return false
}
}

func NotRegex(filter api.MetricsFilter) Predicate {
pred := Regex(filter)
return func(flow config.GenericMap) bool { return !pred(flow) }
}

func filterToPredicate(filter api.MetricsFilter) Predicate {
func filterToPredicate(filter api.MetricsFilter) filters.Predicate {
switch filter.Type {
case api.MetricFilterEqual:
return Equal(filter)
return filters.Equal(filter.Key, filter.Value, true)
case api.MetricFilterNotEqual:
return NotEqual(filter)
return filters.NotEqual(filter.Key, filter.Value, true)
case api.MetricFilterPresence:
return Presence(filter)
return filters.Presence(filter.Key)
case api.MetricFilterAbsence:
return Absence(filter)
return filters.Absence(filter.Key)
case api.MetricFilterRegex:
return Regex(filter)
r, _ := regexp.Compile(filter.Value)
return filters.Regex(filter.Key, r)
case api.MetricFilterNotRegex:
return NotRegex(filter)
r, _ := regexp.Compile(filter.Value)
return filters.NotRegex(filter.Key, r)
}
// Default = Exact
return Equal(filter)
}

func extractVarLookups(value string) [][]string {
// Extract list of variables to lookup
// E.g: filter "$(SrcAddr):$(SrcPort)" would return [SrcAddr,SrcPort]
if len(value) > 0 {
return variableExtractor.FindAllStringSubmatch(value, -1)
}
return nil
}

func injectVars(flow config.GenericMap, filterValue string, varLookups [][]string) string {
injected := filterValue
for _, matchGroup := range varLookups {
var value string
if rawVal, found := flow[matchGroup[1]]; found {
if sVal, ok := rawVal.(string); ok {
value = sVal
} else {
value = fmt.Sprint(rawVal)
}
}
injected = strings.ReplaceAll(injected, matchGroup[0], value)
}
return injected
return filters.Equal(filter.Key, filter.Value, true)
}

func CreateMetricInfo(def *api.MetricsItem) *MetricInfo {
Expand Down
9 changes: 0 additions & 9 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,12 +754,3 @@ func Test_MultipleProm(t *testing.T) {

// TODO: Add test for different addresses, but need to deal with StartPromServer (ListenAndServe)
}

func Test_Filters_extractVarLookups(t *testing.T) {
variables := extractVarLookups("$(abc)--$(def)")

require.Equal(t, [][]string{{"$(abc)", "abc"}, {"$(def)", "def"}}, variables)

variables = extractVarLookups("")
require.Empty(t, variables)
}
Loading

0 comments on commit 76131e0

Please sign in to comment.