Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Common cache with expiry time #204

Merged
merged 8 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 21 additions & 88 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
package encode

import (
"container/list"
"fmt"
"net/http"
"os"
"sync"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
Expand Down Expand Up @@ -62,26 +60,15 @@ type entrySignature struct {

type entryInfo struct {
eInfo entrySignature
}

type metricCacheEntry struct {
labels prometheus.Labels
timeStamp int64
e *list.Element
key string
PromMetric
}

type metricCache map[string]*metricCacheEntry

type EncodeProm struct {
mu sync.Mutex
port string
prefix string
metrics map[string]metricInfo
expiryTime int64
mList *list.List
mCache metricCache
mCache *utils.TimedCache
exitChan <-chan struct{}
PrevRecords []config.GenericMap
}
Expand All @@ -94,8 +81,6 @@ var metricsProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{
// Encode encodes a metric before being stored
func (e *EncodeProm) Encode(metrics []config.GenericMap) {
log.Debugf("entering EncodeProm Encode")
e.mu.Lock()
defer e.mu.Unlock()
out := make([]config.GenericMap, 0)
for _, metric := range metrics {
// TODO: We may need different handling for histograms
Expand All @@ -106,7 +91,6 @@ func (e *EncodeProm) Encode(metrics []config.GenericMap) {
e.PrevRecords = out
log.Debugf("out = %v", out)
log.Debugf("cache = %v", e.mCache)
log.Debugf("list = %v", e.mList)
}

func (e *EncodeProm) EncodeMetric(metricRecord config.GenericMap) []config.GenericMap {
Expand Down Expand Up @@ -134,9 +118,9 @@ func (e *EncodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener
Labels: entryLabels,
},
}

cEntry := e.saveEntryInCache(entry, entryLabels)
cEntry.PromMetric.metricType = mInfo.PromMetric.metricType
key := generateCacheKey(&entry.eInfo)
e.mCache.UpdateCacheEntry(key, entry)
entry.PromMetric.metricType = mInfo.PromMetric.metricType
// push the metric record to prometheus
switch mInfo.PromMetric.metricType {
case api.PromEncodeOperationName("Gauge"):
Expand All @@ -146,15 +130,15 @@ func (e *EncodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener
continue
}
mInfo.promGauge.With(entryLabels).Set(metricValueFloat)
cEntry.PromMetric.promGauge = mInfo.promGauge
entry.PromMetric.promGauge = mInfo.promGauge
case api.PromEncodeOperationName("Counter"):
metricValueFloat, err := utils.ConvertToFloat64(metricValue)
if err != nil {
log.Errorf("value cannot be converted to float64. err: %v, metric: %v, key: %v, value: %v", err, metricName, mInfo.input, metricValue)
continue
}
mInfo.promCounter.With(entryLabels).Add(metricValueFloat)
cEntry.PromMetric.promCounter = mInfo.promCounter
entry.PromMetric.promCounter = mInfo.promCounter
case api.PromEncodeOperationName("Histogram"):
metricValueSlice, ok := metricValue.([]float64)
if !ok {
Expand All @@ -164,7 +148,7 @@ func (e *EncodeProm) EncodeMetric(metricRecord config.GenericMap) []config.Gener
for _, v := range metricValueSlice {
mInfo.promHist.With(entryLabels).Observe(v)
}
cEntry.PromMetric.promHist = mInfo.promHist
entry.PromMetric.promHist = mInfo.promHist
}

entryMap := map[string]interface{}{
Expand All @@ -184,31 +168,19 @@ func generateCacheKey(sig *entrySignature) string {
return eInfoString
}

func (e *EncodeProm) saveEntryInCache(entry entryInfo, entryLabels map[string]string) *metricCacheEntry {
// save item in cache; use eInfo as key to the cache
var cEntry *metricCacheEntry
nowInSecs := time.Now().Unix()
eInfoString := generateCacheKey(&entry.eInfo)
cEntry, ok := e.mCache[eInfoString]
if ok {
// item already exists in cache; update the element and move to end of list
cEntry.timeStamp = nowInSecs
// move to end of list
e.mList.MoveToBack(cEntry.e)
} else {
// create new entry for cache
cEntry = &metricCacheEntry{
labels: entryLabels,
timeStamp: nowInSecs,
key: eInfoString,
}
// place at end of list
log.Debugf("adding entry = %v", cEntry)
cEntry.e = e.mList.PushBack(cEntry)
e.mCache[eInfoString] = cEntry
log.Debugf("mlist = %v", e.mList)
// callback function from lru cleanup
func (e *EncodeProm) Cleanup(sourceEntry interface{}) {
entry := sourceEntry.(entryInfo)
// clean up the entry
log.Debugf("deleting %v", entry)
switch entry.PromMetric.metricType {
case api.PromEncodeOperationName("Gauge"):
entry.PromMetric.promGauge.Delete(entry.eInfo.Labels)
case api.PromEncodeOperationName("Counter"):
entry.PromMetric.promCounter.Delete(entry.eInfo.Labels)
case api.PromEncodeOperationName("Histogram"):
entry.PromMetric.promHist.Delete(entry.eInfo.Labels)
}
return cEntry
}

func (e *EncodeProm) cleanupExpiredEntriesLoop() {
Expand All @@ -219,46 +191,8 @@ func (e *EncodeProm) cleanupExpiredEntriesLoop() {
log.Debugf("exiting cleanupExpiredEntriesLoop because of signal")
return
case <-ticker.C:
e.cleanupExpiredEntries()
}
}
}

// cleanupExpiredEntries - any entry that has expired should be removed from the prometheus reporting and cache
func (e *EncodeProm) cleanupExpiredEntries() {
log.Debugf("entering cleanupExpiredEntries")
e.mu.Lock()
defer e.mu.Unlock()
log.Debugf("cache = %v", e.mCache)
log.Debugf("list = %v", e.mList)
nowInSecs := time.Now().Unix()
expireTime := nowInSecs - e.expiryTime
// go through the list until we reach recently used entries
for {
entry := e.mList.Front()
if entry == nil {
return
}
c := entry.Value.(*metricCacheEntry)
log.Debugf("timeStamp = %d, expireTime = %d", c.timeStamp, expireTime)
log.Debugf("c = %v", c)
if c.timeStamp > expireTime {
// no more expired items
return
}

// clean up the entry
log.Debugf("nowInSecs = %d, deleting %v", nowInSecs, c)
switch c.PromMetric.metricType {
case api.PromEncodeOperationName("Gauge"):
c.PromMetric.promGauge.Delete(c.labels)
case api.PromEncodeOperationName("Counter"):
c.PromMetric.promCounter.Delete(c.labels)
case api.PromEncodeOperationName("Histogram"):
c.PromMetric.promHist.Delete(c.labels)
e.mCache.CleanupExpiredEntries(e.expiryTime, e)
}
delete(e.mCache, c.key)
e.mList.Remove(entry)
}
}

Expand Down Expand Up @@ -347,8 +281,7 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) {
prefix: promPrefix,
metrics: metrics,
expiryTime: expiryTime,
mList: list.New(),
mCache: make(metricCache),
mCache: utils.NewTimedCache(),
exitChan: utils.ExitChannel(),
PrevRecords: make([]config.GenericMap, 0),
}
Expand Down
21 changes: 8 additions & 13 deletions pkg/pipeline/encode/encode_prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package encode

import (
"container/list"
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/flowlogs-pipeline/pkg/test"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -123,27 +123,23 @@ func Test_NewEncodeProm(t *testing.T) {
require.Equal(t, gEntryInfo1["value"], int(bytesA))

// verify entries are in cache; one for the gauge and one for the counter
entriesMap := encodeProm.mCache
require.Equal(t, 2, len(entriesMap))
entriesMapLen := encodeProm.mCache.GetCacheLen()
require.Equal(t, 2, entriesMapLen)

eInfo := entrySignature{
Name: "test_Bytes",
Labels: entryLabels1,
}

eInfoBytes := generateCacheKey(&eInfo)
encodeProm.mu.Lock()
_, found := encodeProm.mCache[string(eInfoBytes)]
encodeProm.mu.Unlock()
_, found := encodeProm.mCache.GetCacheEntry(string(eInfoBytes))
require.Equal(t, true, found)

// wait a couple seconds so that the entry will expire
time.Sleep(2 * time.Second)
encodeProm.cleanupExpiredEntries()
entriesMap = encodeProm.mCache
encodeProm.mu.Lock()
require.Equal(t, 0, len(entriesMap))
encodeProm.mu.Unlock()
encodeProm.mCache.CleanupExpiredEntries(encodeProm.expiryTime, encodeProm)
entriesMapLen = encodeProm.mCache.GetCacheLen()
require.Equal(t, 0, entriesMapLen)
}

func Test_EncodeAggregate(t *testing.T) {
Expand All @@ -170,8 +166,7 @@ func Test_EncodeAggregate(t *testing.T) {
labelNames: []string{"by", "aggregate"},
},
},
mList: list.New(),
mCache: make(metricCache),
mCache: utils.NewTimedCache(),
}

newEncode.Encode(metrics)
Expand Down
42 changes: 13 additions & 29 deletions pkg/pipeline/extract/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,16 @@ package aggregate

import (
"container/heap"
"container/list"
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
log "github.com/sirupsen/logrus"
)

Expand All @@ -47,8 +46,7 @@ type NormalizedValues string

type Aggregate struct {
Definition api.AggregateDefinition
GroupsMap map[NormalizedValues]*GroupState
GroupsList *list.List
cache *utils.TimedCache
mutex *sync.Mutex
expiryTime int64
}
Expand All @@ -60,8 +58,6 @@ type GroupState struct {
recentCount int
totalValue float64
totalCount int
lastUpdatedTime int64
listElement *list.Element
}

func (aggregate Aggregate) LabelsFromEntry(entry config.GenericMap) (Labels, bool) {
Expand Down Expand Up @@ -129,7 +125,8 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu
aggregate.mutex.Lock()
defer aggregate.mutex.Unlock()

groupState, ok := aggregate.GroupsMap[normalizedValues]
var groupState *GroupState
oldEntry, ok := aggregate.cache.GetCacheEntry(string(normalizedValues))
if !ok {
groupState = &GroupState{normalizedValues: normalizedValues}
initVal := getInitValue(string(aggregate.Definition.Operation))
Expand All @@ -138,11 +135,10 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu
if aggregate.Definition.Operation == OperationRawValues {
groupState.recentRawValues = make([]float64, 0)
}
aggregate.GroupsMap[normalizedValues] = groupState
groupState.listElement = aggregate.GroupsList.PushBack(groupState)
} else {
aggregate.GroupsList.MoveToBack(groupState.listElement)
groupState = oldEntry.(*GroupState)
}
aggregate.cache.UpdateCacheEntry(string(normalizedValues), groupState)

// update value
recordKey := aggregate.Definition.RecordKey
Expand Down Expand Up @@ -180,7 +176,6 @@ func (aggregate Aggregate) UpdateByEntry(entry config.GenericMap, normalizedValu
// update count
groupState.totalCount += 1
groupState.recentCount += 1
groupState.lastUpdatedTime = time.Now().Unix()

return nil
}
Expand Down Expand Up @@ -209,7 +204,10 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap {
defer aggregate.mutex.Unlock()

var metrics []config.GenericMap
for _, group := range aggregate.GroupsMap {

// iterate over the items in the cache
aggregate.cache.Iterate(func(key string, value interface{}) {
group := value.(*GroupState)
metrics = append(metrics, config.GenericMap{
"name": aggregate.Definition.Name,
"operation": aggregate.Definition.Operation,
Expand All @@ -229,7 +227,7 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap {
}
group.recentCount = 0
group.recentOpValue = getInitValue(string(aggregate.Definition.Operation))
}
})

if aggregate.Definition.TopK > 0 {
metrics = aggregate.computeTopK(metrics)
Expand All @@ -238,22 +236,8 @@ func (aggregate Aggregate) GetMetrics() []config.GenericMap {
return metrics
}

func (aggregate Aggregate) cleanupExpiredEntries() {
nowInSecs := time.Now().Unix()
expireTime := nowInSecs - aggregate.expiryTime

for {
listEntry := aggregate.GroupsList.Front()
if listEntry == nil {
return
}
pCacheInfo := listEntry.Value.(*GroupState)
if pCacheInfo.lastUpdatedTime > expireTime {
return
}
delete(aggregate.GroupsMap, pCacheInfo.normalizedValues)
aggregate.GroupsList.Remove(listEntry)
}
func (aggregate Aggregate) Cleanup(entry interface{}) {
// nothing special to do in this callback function
}

// functions to manipulate a heap to generate TopK entries
Expand Down
Loading