Skip to content

Commit

Permalink
change overflowsince to time.time
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Jan 16, 2025
1 parent 41e9f47 commit cb5512b
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 23 deletions.
24 changes: 11 additions & 13 deletions pkg/costattribution/active_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ type ActiveSeriesTracker struct {
maxCardinality int
activeSeriesPerUserAttribution *prometheus.Desc
overflowLabels []string
observed map[string]*atomic.Int64
logger log.Logger
observedMtx sync.RWMutex
overflowSince atomic.Int64
observed map[string]*atomic.Int64
overflowSince time.Time
overflowCounter atomic.Int64
cooldownDuration time.Duration
logger log.Logger
}

func newActiveSeriesTracker(userID string, trackedLabels []string, limit int, cooldownDuration time.Duration, logger log.Logger) *ActiveSeriesTracker {
Expand Down Expand Up @@ -80,12 +80,12 @@ func (at *ActiveSeriesTracker) Increment(lbls labels.Labels, now time.Time) {
at.observedMtx.RUnlock()
return
}
at.observedMtx.RUnlock()

if at.overflowSince.Load() > 0 {
if !at.overflowSince.IsZero() {
at.overflowCounter.Inc()
return
}
at.observedMtx.RUnlock()

at.observedMtx.Lock()
defer at.observedMtx.Unlock()
Expand All @@ -95,13 +95,13 @@ func (at *ActiveSeriesTracker) Increment(lbls labels.Labels, now time.Time) {
return
}

if at.overflowSince.Load() > 0 {
if !at.overflowSince.IsZero() {
at.overflowCounter.Inc()
return
}

if len(at.observed) >= at.maxCardinality {
at.overflowSince.Store(now.Unix())
at.overflowSince = now
at.overflowCounter.Inc()
return
}
Expand Down Expand Up @@ -138,20 +138,19 @@ func (at *ActiveSeriesTracker) Decrement(lbls labels.Labels) {
}
at.observedMtx.RUnlock()

if at.overflowSince.Load() > 0 {
at.observedMtx.RLock()
if !at.overflowSince.IsZero() {
at.overflowCounter.Dec()
return
}

at.observedMtx.RLock()
defer at.observedMtx.RUnlock()
panic(fmt.Errorf("decrementing non-existent active series: labels=%v, cost attribution keys: %v, the current observation map length: %d, the current cost attribution key: %s", lbls, at.labels, len(at.observed), buf.String()))
}

func (at *ActiveSeriesTracker) Collect(out chan<- prometheus.Metric) {
if at.overflowSince.Load() > 0 {
at.observedMtx.RLock()
if !at.overflowSince.IsZero() {
var activeSeries int64
at.observedMtx.RLock()
for _, as := range at.observed {
activeSeries += as.Load()
}
Expand All @@ -161,7 +160,6 @@ func (at *ActiveSeriesTracker) Collect(out chan<- prometheus.Metric) {
}
// We don't know the performance of out receiver, so we don't want to hold the lock for too long
var prometheusMetrics []prometheus.Metric
at.observedMtx.RLock()
for key, as := range at.observed {
keys := strings.Split(key, string(sep))
keys = append(keys, at.userID)
Expand Down
14 changes: 7 additions & 7 deletions pkg/costattribution/active_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@ func TestActiveTracker_IncrementDecrement(t *testing.T) {
lbls3 := labels.FromStrings("department", "baz", "service", "foo")

ast.Increment(lbls1, time.Unix(1, 0))
assert.Equal(t, int64(0), ast.overflowSince.Load(), "First observation, should not overflow")
assert.Equal(t, time.Unix(0, 0), ast.overflowSince, "First observation, should not overflow")
assert.Equal(t, 1, len(ast.observed))

ast.Decrement(lbls1)
assert.Equal(t, int64(0), ast.overflowSince.Load(), "First observation decremented, should not overflow")
assert.Equal(t, time.Unix(0, 0), ast.overflowSince, "First observation decremented, should not overflow")
assert.Equal(t, 0, len(ast.observed), "First observation decremented, should be removed since it reached 0")

ast.Increment(lbls1, time.Unix(2, 0))
ast.Increment(lbls2, time.Unix(2, 0))
assert.Equal(t, int64(0), ast.overflowSince.Load(), "Second observation, should not overflow")
assert.Equal(t, time.Unix(0, 0), ast.overflowSince, "Second observation, should not overflow")
assert.Equal(t, 2, len(ast.observed))

ast.Increment(lbls3, time.Unix(3, 0))
assert.Equal(t, int64(3), ast.overflowSince.Load(), "Third observation, should overflow")
assert.Equal(t, time.Unix(3, 0), ast.overflowSince, "Third observation, should overflow")
assert.Equal(t, 2, len(ast.observed))

ast.Increment(lbls3, time.Unix(4, 0))
assert.Equal(t, int64(3), ast.overflowSince.Load(), "Fourth observation, should stay overflow")
assert.Equal(t, time.Unix(3, 0), ast.overflowSince, "Fourth observation, should stay overflow")
assert.Equal(t, 2, len(ast.observed))
}

Expand All @@ -65,7 +65,7 @@ func TestActiveTracker_Concurrency(t *testing.T) {
// Verify no data races or inconsistencies
assert.True(t, len(ast.observed) > 0, "Observed set should not be empty after concurrent updates")
assert.LessOrEqual(t, len(ast.observed), ast.maxCardinality, "Observed count should not exceed max cardinality")
assert.NotEqual(t, 0, ast.overflowSince.Load(), "Expected state to be Overflow")
assert.False(t, ast.overflowSince.IsZero(), "Expected state to be Overflow")

expectedMetrics := `
# HELP cortex_ingester_attributed_active_series The total number of active series per user and attribution.
Expand All @@ -85,5 +85,5 @@ func TestActiveTracker_Concurrency(t *testing.T) {
wg.Wait()

assert.Equal(t, 0, len(ast.observed), "Observed set should be empty after all decrements")
assert.NotEqual(t, 0, ast.overflowSince.Load(), "Expected state still to be Overflow")
assert.False(t, ast.overflowSince.IsZero(), "Expected state still to be Overflow")
}
6 changes: 5 additions & 1 deletion pkg/costattribution/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,13 @@ func (m *Manager) purgeInactiveAttributionsUntil(deadline time.Time) error {
m.deleteSampleTracker(userID)
}

at.observedMtx.RLock()
// if the activeseries tracker has been in overflow for more than the cooldown duration, delete it
if at.overflowSince.Load() > 0 && time.Unix(at.overflowSince.Load(), 0).Add(at.cooldownDuration).Before(deadline) {
if !at.overflowSince.IsZero() && at.overflowSince.Add(at.cooldownDuration).Before(deadline) {
at.observedMtx.RUnlock()
m.deleteActiveTracker(userID)
} else {
at.observedMtx.RUnlock()
}
}
return nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/costattribution/sample_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ type SampleTracker struct {
receivedSamplesAttribution *prometheus.Desc
discardedSampleAttribution *prometheus.Desc
overflowLabels []string
observed map[string]*observation
logger log.Logger
observedMtx sync.RWMutex
observed map[string]*observation
// overflowSince is also protected by observedMtx, it is set when the max cardinality is exceeded
overflowSince time.Time
overflowCounter observation
cooldownDuration time.Duration
logger log.Logger
}

func newSampleTracker(userID string, trackedLabels []string, limit int, cooldown time.Duration, logger log.Logger) *SampleTracker {
Expand Down Expand Up @@ -93,6 +93,7 @@ func (st *SampleTracker) Collect(out chan<- prometheus.Metric) {
st.observedMtx.RLock()

if !st.overflowSince.IsZero() {
st.observedMtx.RUnlock()
out <- prometheus.MustNewConstMetric(st.receivedSamplesAttribution, prometheus.CounterValue, st.overflowCounter.receivedSample.Load(), st.overflowLabels[:len(st.overflowLabels)-1]...)
out <- prometheus.MustNewConstMetric(st.discardedSampleAttribution, prometheus.CounterValue, st.overflowCounter.totalDiscarded.Load(), st.overflowLabels...)
return
Expand Down

0 comments on commit cb5512b

Please sign in to comment.