Skip to content

Commit

Permalink
update otel metric to changes in prom metric
Browse files Browse the repository at this point in the history
  • Loading branch information
KalmanMeth committed Nov 22, 2023
1 parent 879b890 commit 1d4010a
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 60 deletions.
14 changes: 7 additions & 7 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ const defaultExpiryTime = time.Duration(2 * time.Minute)

type gaugeInfo struct {
gauge *prometheus.GaugeVec
info *metricInfo
info *MetricInfo
}

type counterInfo struct {
counter *prometheus.CounterVec
info *metricInfo
info *MetricInfo
}

type histoInfo struct {
histo *prometheus.HistogramVec
info *metricInfo
info *MetricInfo
}

type EncodeProm struct {
Expand Down Expand Up @@ -165,7 +165,7 @@ func (e *EncodeProm) Encode(metricRecord config.GenericMap) {
}
}

func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *metricInfo, m *prometheus.MetricVec) (map[string]string, float64) {
func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *MetricInfo, m *prometheus.MetricVec) (map[string]string, float64) {
val := e.extractGenericValue(flow, info)
if val == nil {
return nil, 0
Expand All @@ -189,7 +189,7 @@ func (e *EncodeProm) prepareMetric(flow config.GenericMap, info *metricInfo, m *
return entryLabels, floatVal
}

func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *metricInfo, m *prometheus.MetricVec) (map[string]string, []float64) {
func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *MetricInfo, m *prometheus.MetricVec) (map[string]string, []float64) {
val := e.extractGenericValue(flow, info)
if val == nil {
return nil, nil
Expand All @@ -210,8 +210,8 @@ func (e *EncodeProm) prepareAggHisto(flow config.GenericMap, info *metricInfo, m
return entryLabels, values
}

func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *metricInfo) interface{} {
for _, pred := range info.filterPredicates {
func (e *EncodeProm) extractGenericValue(flow config.GenericMap, info *MetricInfo) interface{} {
for _, pred := range info.FilterPredicates {
if !pred(flow) {
return nil
}
Expand Down
32 changes: 16 additions & 16 deletions pkg/pipeline/encode/encode_prom_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,28 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/config"
)

type predicate func(flow config.GenericMap) bool
type Predicate func(flow config.GenericMap) bool

type metricInfo struct {
type MetricInfo struct {
api.PromMetricsItem
filterPredicates []predicate
FilterPredicates []Predicate
}

func presence(filter api.PromMetricsFilter) predicate {
func Presence(filter api.PromMetricsFilter) Predicate {
return func(flow config.GenericMap) bool {
_, found := flow[filter.Key]
return found
}
}

func absence(filter api.PromMetricsFilter) predicate {
func Absence(filter api.PromMetricsFilter) Predicate {
return func(flow config.GenericMap) bool {
_, found := flow[filter.Key]
return !found
}
}

func exact(filter api.PromMetricsFilter) predicate {
func Exact(filter api.PromMetricsFilter) Predicate {
return func(flow config.GenericMap) bool {
if val, found := flow[filter.Key]; found {
sVal, ok := val.(string)
Expand All @@ -42,7 +42,7 @@ func exact(filter api.PromMetricsFilter) predicate {
}
}

func regex(filter api.PromMetricsFilter) predicate {
func regex(filter api.PromMetricsFilter) Predicate {
r, _ := regexp.Compile(filter.Value)
return func(flow config.GenericMap) bool {
if val, found := flow[filter.Key]; found {
Expand All @@ -56,27 +56,27 @@ func regex(filter api.PromMetricsFilter) predicate {
}
}

func filterToPredicate(filter api.PromMetricsFilter) predicate {
func filterToPredicate(filter api.PromMetricsFilter) Predicate {
switch filter.Type {
case api.PromFilterExact:
return exact(filter)
return Exact(filter)
case api.PromFilterPresence:
return presence(filter)
return Presence(filter)
case api.PromFilterAbsence:
return absence(filter)
return Absence(filter)
case api.PromFilterRegex:
return regex(filter)
}
// Default = exact
return exact(filter)
// Default = Exact
return Exact(filter)
}

func CreateMetricInfo(def api.PromMetricsItem) *metricInfo {
mi := metricInfo{
func CreateMetricInfo(def api.PromMetricsItem) *MetricInfo {
mi := MetricInfo{
PromMetricsItem: def,
}
for _, f := range def.GetFilters() {
mi.filterPredicates = append(mi.filterPredicates, filterToPredicate(f))
mi.FilterPredicates = append(mi.FilterPredicates, filterToPredicate(f))
}
return &mi
}
54 changes: 17 additions & 37 deletions pkg/pipeline/encode/opentelemetry/encode_otlpmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package opentelemetry

import (
"context"
"fmt"
"strings"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
Expand All @@ -36,7 +34,6 @@ import (
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"k8s.io/utils/strings/slices"
)

// TODO: Refactor the code that is common with encode_prom
Expand All @@ -47,20 +44,20 @@ const (

type counterInfo struct {
counter *metric.Float64Counter
info api.PromMetricsItem
info *encode.MetricInfo
}

type gaugeInfo struct {
gauge *metric.Float64ObservableGauge
info api.PromMetricsItem
info *encode.MetricInfo
obs Float64Gauge
}

// TBD: Handle histograms
/*
type histoInfo struct {
histo *metric.Float64Histogram
info api.PromMetricsItem
info *encode.MetricInfo
}
*/

Expand All @@ -87,7 +84,7 @@ func (e *EncodeOtlpMetrics) Encode(metricRecord config.GenericMap) {

// Process counters
for _, mInfo := range e.counters {
labels, value, _ := e.prepareMetric(metricRecord, &mInfo.info)
labels, value, _ := e.prepareMetric(metricRecord, mInfo.info)
if labels == nil {
continue
}
Expand All @@ -99,7 +96,7 @@ func (e *EncodeOtlpMetrics) Encode(metricRecord config.GenericMap) {

// Process gauges
for _, mInfo := range e.gauges {
labels, value, key := e.prepareMetric(metricRecord, &mInfo.info)
labels, value, key := e.prepareMetric(metricRecord, mInfo.info)
if labels == nil {
continue
}
Expand All @@ -111,7 +108,7 @@ func (e *EncodeOtlpMetrics) Encode(metricRecord config.GenericMap) {
// TBD: Process histograms
}

func (e *EncodeOtlpMetrics) prepareMetric(flow config.GenericMap, info *api.PromMetricsItem) (map[string]string, float64, string) {
func (e *EncodeOtlpMetrics) prepareMetric(flow config.GenericMap, info *encode.MetricInfo) (map[string]string, float64, string) {
val := e.extractGenericValue(flow, info)
if val == nil {
return nil, 0, ""
Expand All @@ -121,7 +118,7 @@ func (e *EncodeOtlpMetrics) prepareMetric(flow config.GenericMap, info *api.Prom
return nil, 0, ""
}

entryLabels, key := encode.ExtractLabelsAndKey(flow, info)
entryLabels, key := encode.ExtractLabelsAndKey(flow, &info.PromMetricsItem)
// Update entry for expiry mechanism (the entry itself is its own cleanup function)
_, ok := e.mCache.UpdateCacheEntry(key, entryLabels)
if !ok {
Expand All @@ -131,28 +128,10 @@ func (e *EncodeOtlpMetrics) prepareMetric(flow config.GenericMap, info *api.Prom
return entryLabels, floatVal, key
}

func (e *EncodeOtlpMetrics) extractGenericValue(flow config.GenericMap, info *api.PromMetricsItem) interface{} {
for _, filter := range info.GetFilters() {
val, found := flow[filter.Key]
switch filter.Value {
case "nil":
if found {
return nil
}
case "!nil":
if !found {
return nil
}
default:
if found {
sVal, ok := val.(string)
if !ok {
sVal = fmt.Sprint(val)
}
if !slices.Contains(strings.Split(filter.Value, "|"), sVal) {
return nil
}
}
func (e *EncodeOtlpMetrics) extractGenericValue(flow config.GenericMap, info *encode.MetricInfo) interface{} {
for _, pred := range info.FilterPredicates {
if !pred(flow) {
return nil
}
}
if info.ValueKey == "" {
Expand Down Expand Up @@ -194,12 +173,13 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar
counters := []counterInfo{}
gauges := []gaugeInfo{}

for _, mInfo := range cfg.Metrics {
fullMetricName := cfg.Prefix + mInfo.Name
labels := mInfo.Labels
for _, mCfg := range cfg.Metrics {
fullMetricName := cfg.Prefix + mCfg.Name
labels := mCfg.Labels
log.Debugf("fullMetricName = %v", fullMetricName)
log.Debugf("Labels = %v", labels)
switch mInfo.Type {
mInfo := encode.CreateMetricInfo(mCfg)
switch mCfg.Type {
case api.PromEncodeOperationName("Counter"):
counter, err := meter.Float64Counter(fullMetricName)
if err != nil {
Expand Down Expand Up @@ -229,7 +209,7 @@ func NewEncodeOtlpMetrics(opMetrics *operational.Metrics, params config.StagePar
gauges = append(gauges, gInfo)
// TBD: handle histograms
case "default":
log.Errorf("invalid metric type = %v, skipping", mInfo.Type)
log.Errorf("invalid metric type = %v, skipping", mCfg.Type)
continue
}
}
Expand Down

0 comments on commit 1d4010a

Please sign in to comment.