diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 79bd4bb05af..a7f34d88d92 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -3995,28 +3995,6 @@ "fieldType": "string", "fieldCategory": "experimental" }, - { - "kind": "field", - "name": "cost_attribution_label", - "required": false, - "desc": "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'attrib' cost attribution's label's value. Applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total.", - "fieldValue": null, - "fieldDefaultValue": "", - "fieldFlag": "validation.cost-attribution-label", - "fieldType": "string", - "fieldCategory": "experimental" - }, - { - "kind": "field", - "name": "max_cost_attribution_per_user", - "required": false, - "desc": "The maximum number of cost attribution labels per user, across the cluster. 0 to disable cost attribution.", - "fieldValue": null, - "fieldDefaultValue": 0, - "fieldFlag": "validation.max-cost-attribution-per-user", - "fieldType": "int", - "fieldCategory": "experimental" - }, { "kind": "field", "name": "max_fetched_chunks_per_query", @@ -4325,6 +4303,28 @@ "fieldType": "int", "fieldCategory": "experimental" }, + { + "kind": "field", + "name": "cost_attribution_label", + "required": false, + "desc": "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'attrib' cost attribution's label's value. Applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total.", + "fieldValue": null, + "fieldDefaultValue": "", + "fieldFlag": "validation.cost-attribution-label", + "fieldType": "string", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "max_cost_attribution_per_user", + "required": false, + "desc": "The maximum number of cost attribution labels per user, across the cluster. 0 to disable cost attribution.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "validation.max-cost-attribution-per-user", + "fieldType": "int", + "fieldCategory": "experimental" + }, { "kind": "field", "name": "ruler_evaluation_delay_duration", @@ -18165,9 +18165,10 @@ "kind": "field", "name": "custom_registry_path", "required": false, - "desc": "", + "desc": "Defines a custom path for the registry. When specified, Mimir will expose cost attribution metrics through this custom path instead of using the default Prometheus registry.", "fieldValue": null, "fieldDefaultValue": "", + "fieldFlag": "custom-registry-path", "fieldType": "string", "fieldCategory": "advanced" }, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 122fa61a989..f07e7e8967c 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1135,6 +1135,8 @@ Usage of ./cmd/mimir/mimir: Configuration file to load. -cost-attribution-eviction-interval duration [experimental] Interval at which to evict inactive cost attributions. (default 10m0s) + -custom-registry-path string + Defines a custom path for the registry. When specified, Mimir will expose cost attribution metrics through this custom path instead of using the default Prometheus registry. -debug.block-profile-rate int Fraction of goroutine blocking events that are reported in the blocking profile. 1 to include every blocking event in the profile, 0 to disable. -debug.mutex-profile-fraction int diff --git a/pkg/costattribution/caimpl/managerImpl.go b/pkg/costattribution/caimpl/managerImpl.go new file mode 100644 index 00000000000..53c01e466c3 --- /dev/null +++ b/pkg/costattribution/caimpl/managerImpl.go @@ -0,0 +1,141 @@ +package caimpl + +import ( + "context" + "fmt" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/mimir/pkg/util/validation" +) + +type ManagerImpl struct { + services.Service + logger log.Logger + attributionTracker *AttributionTrackerGroup + inactiveTimeout time.Duration + invalidValue string +} + +// NewManager creates a new cost attribution manager. which is responsible for managing the cost attribution of series. +// It will clean up inactive series and update the cost attribution of series every 3 minutes. +func NewManager(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides) *ManagerImpl { + s := &ManagerImpl{ + attributionTracker: newAttributionTrackerGroup(limits), + inactiveTimeout: inactiveTimeout, + logger: logger, + invalidValue: "__unaccounted__", + } + + s.Service = services.NewTimerService(cleanupInterval, nil, s.iteration, nil).WithName("cost attribution manager") + return s +} + +func (m *ManagerImpl) iteration(_ context.Context) error { + m.attributionTracker.purgeInactiveAttributions(m.inactiveTimeout) + return nil +} + +// EnabledForUser returns true if the cost attribution is enabled for the user +func (m *ManagerImpl) EnabledForUser(userID string) bool { + return m.attributionTracker.limits.CostAttributionLabel(userID) != "" +} + +// GetUserAttributionLabel returns the cost attribution label for the user, first it will try to get the label from the cache, +// if not found, it will get the label from the config +func (m *ManagerImpl) GetUserAttributionLabel(userID string) string { + if m.EnabledForUser(userID) { + return m.attributionTracker.getUserAttributionLabelFromCache(userID) + } + m.attributionTracker.deleteUserTracerFromCache(userID) + return "" +} + +// GetUserAttributionLimit returns the cost attribution limit for the user, first it will try to get the limit from the cache, +// if not found, it will get the limit from the config +func (m *ManagerImpl) GetUserAttributionLimit(userID string) int { + if m.EnabledForUser(userID) { + return m.attributionTracker.getUserAttributionLimitFromCache(userID) + } + m.attributionTracker.deleteUserTracerFromCache(userID) + return 0 +} + +func (m *ManagerImpl) UpdateAttributionTimestamp(user string, lbs labels.Labels, now time.Time) string { + // if cost attribution is not enabled for the user, return empty string + if !m.EnabledForUser(user) { + m.attributionTracker.deleteUserTracerFromCache(user) + return "" + } + + // when cost attribution is enabled, the label has to be set. the cache would be updated with the label + lb := m.attributionTracker.getUserAttributionLabelFromCache(user) + // this should not happened, if user is enabled for cost attribution, the label has to be set + if lb == "" { + return "" + } + val := lbs.Get(lb) + + if m.attributionTracker.attributionLimitExceeded(user, val, now) { + val = m.invalidValue + level.Error(m.logger).Log("msg", fmt.Sprintf("set attribution label to \"%s\" since user has reached the limit of cost attribution labels", m.invalidValue)) + } + m.attributionTracker.updateAttributionCacheForUser(user, lb, val, now) + return val +} + +// SetActiveSeries adjust the input attribution and sets the active series gauge for the given user and attribution +func (m *ManagerImpl) SetActiveSeries(userID, attribution string, value float64) { + attribution = m.adjustUserAttribution(userID, attribution) + + m.attributionTracker.mu.Lock() + defer m.attributionTracker.mu.Unlock() + if tracker, exists := m.attributionTracker.trackersByUserID[userID]; exists { + tracker.activeSeriesPerUserAttribution.WithLabelValues(userID, attribution).Set(value) + } +} + +// IncrementDiscardedSamples increments the discarded samples counter for a given user and attribution +func (m *ManagerImpl) IncrementDiscardedSamples(userID, attribution string, value float64) { + attribution = m.adjustUserAttribution(userID, attribution) + m.attributionTracker.mu.RLock() + defer m.attributionTracker.mu.RUnlock() + if tracker, exists := m.attributionTracker.trackersByUserID[userID]; exists { + tracker.discardedSampleAttribution.WithLabelValues(userID, attribution).Add(value) + } +} + +// IncrementReceivedSamples increments the received samples counter for a given user and attribution +func (m *ManagerImpl) IncrementReceivedSamples(userID, attribution string, value float64) { + attribution = m.adjustUserAttribution(userID, attribution) + m.attributionTracker.mu.RLock() + defer m.attributionTracker.mu.RUnlock() + if tracker, exists := m.attributionTracker.trackersByUserID[userID]; exists { + tracker.receivedSamplesAttribution.WithLabelValues(userID, attribution).Add(value) + } +} + +func (m *ManagerImpl) adjustUserAttribution(userID, attribution string) string { + if m.attributionTracker.attributionLimitExceeded(userID, attribution, time.Now()) { + return m.invalidValue + } + return attribution +} + +func (m *ManagerImpl) Collect(out chan<- prometheus.Metric) { + m.attributionTracker.mu.RLock() + defer m.attributionTracker.mu.RUnlock() + for _, tracker := range m.attributionTracker.trackersByUserID { + tracker.Collect(out) + } +} + +// Describe implements prometheus.Collector. +func (m *ManagerImpl) Describe(chan<- *prometheus.Desc) { + // this is an unchecked collector +} diff --git a/pkg/costattribution/caimpl/managerImpl_test.go b/pkg/costattribution/caimpl/managerImpl_test.go new file mode 100644 index 00000000000..f7e0a48a4a9 --- /dev/null +++ b/pkg/costattribution/caimpl/managerImpl_test.go @@ -0,0 +1,193 @@ +package caimpl + +import ( + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/mimir/pkg/util/validation" +) + +func newTestManager() *ManagerImpl { + logger := log.NewNopLogger() + limits, _ := validation.NewOverrides(validation.Limits{}, validation.NewMockTenantLimits(map[string]*validation.Limits{ + "user1": { + MaxCostAttributionPerUser: 5, + CostAttributionLabel: "team", + }, + "user2": { + MaxCostAttributionPerUser: 2, + CostAttributionLabel: "", + }, + "user3": { + MaxCostAttributionPerUser: 2, + CostAttributionLabel: "department", + }, + })) + inactiveTimeout := 2 * time.Minute + cleanupInterval := 1 * time.Minute + return NewManager(cleanupInterval, inactiveTimeout, logger, limits) +} + +func Test_NewManager(t *testing.T) { + manager := newTestManager() + assert.NotNil(t, manager, "Expected manager to be initialized") + assert.NotNil(t, manager.attributionTracker, "Expected attribution tracker to be initialized") + assert.Equal(t, "__unaccounted__", manager.invalidValue, "Expected invalidValue to be initialized") +} + +func Test_EnabledForUser(t *testing.T) { + manager := newTestManager() + assert.True(t, manager.EnabledForUser("user1"), "Expected cost attribution to be enabled for user1") + assert.False(t, manager.EnabledForUser("user2"), "Expected cost attribution to be disabled for user2") + assert.False(t, manager.EnabledForUser("user4"), "Expected cost attribution to be disabled for user4") +} + +func Test_GetUserAttributionLabel(t *testing.T) { + manager := newTestManager() + assert.Equal(t, "team", manager.GetUserAttributionLabel("user1")) + assert.Equal(t, "", manager.GetUserAttributionLabel("user2")) + assert.Equal(t, "department", manager.GetUserAttributionLabel("user3")) + assert.Equal(t, 2, len(manager.attributionTracker.trackersByUserID)) + assert.Equal(t, "team", manager.attributionTracker.trackersByUserID["user1"].trackedLabel) + assert.Equal(t, "department", manager.attributionTracker.trackersByUserID["user3"].trackedLabel) +} + +func Test_GetUserAttributionLimit(t *testing.T) { + manager := newTestManager() + assert.Equal(t, 5, manager.GetUserAttributionLimit("user1")) + assert.Equal(t, 0, manager.GetUserAttributionLimit("user2")) + assert.Equal(t, 0, manager.GetUserAttributionLimit("user4")) +} + +func Test_UpdateAttributionTimestamp(t *testing.T) { + manager := newTestManager() + + lbls := labels.NewBuilder(labels.EmptyLabels()) + tm1, tm2, tm3 := "bar", "foo", "baz" + t.Run("Should update the timestamp when limit not reached for the user attribution", func(t *testing.T) { + lbls.Set("department", tm1) + result := manager.UpdateAttributionTimestamp("user3", lbls.Labels(), time.Unix(0, 0)) + assert.Equal(t, tm1, result, "Expected attribution to be returned since user is enabled for cost attribution, and limit is not reached") + assert.NotNil(t, manager.attributionTracker.trackersByUserID["user3"].attributionTimestamps[tm1]) + assert.Equal(t, int64(0), manager.attributionTracker.trackersByUserID["user3"].attributionTimestamps[tm1].Load()) + + lbls.Set("department", tm2) + result = manager.UpdateAttributionTimestamp("user3", lbls.Labels(), time.Unix(1, 0)) + assert.Equal(t, tm2, result, "Expected attribution to be returned since user is enabled for cost attribution, and limit is not reached") + assert.NotNil(t, manager.attributionTracker.trackersByUserID["user3"].attributionTimestamps[tm2]) + assert.Equal(t, int64(1), manager.attributionTracker.trackersByUserID["user3"].attributionTimestamps[tm2].Load()) + }) + + t.Run("Should only update the timestamp of invalide when limit reached for the user attribution", func(t *testing.T) { + lbls.Set("department", tm3) + result := manager.UpdateAttributionTimestamp("user3", lbls.Labels(), time.Unix(2, 0)) + assert.Equal(t, manager.invalidValue, result, "Expected invalidValue to be returned since user has reached the limit of cost attribution labels") + assert.NotNil(t, manager.attributionTracker.trackersByUserID["user3"].attributionTimestamps[manager.invalidValue]) + assert.Equal(t, int64(2), manager.attributionTracker.trackersByUserID["user3"].attributionTimestamps[manager.invalidValue].Load()) + + lbls.Set("department", tm1) + result = manager.UpdateAttributionTimestamp("user3", lbls.Labels(), time.Unix(3, 0)) + assert.Equal(t, manager.invalidValue, result, "Expected invalidValue to be returned since user has reached the limit of cost attribution labels") + assert.Equal(t, int64(3), manager.attributionTracker.trackersByUserID["user3"].attributionTimestamps[manager.invalidValue].Load()) + }) +} + +func Test_SetActiveSeries(t *testing.T) { + manager := newTestManager() + reg := prometheus.NewRegistry() + err := reg.Register(manager) + require.NoError(t, err) + userID := "user1" + + lbls := labels.NewBuilder(labels.EmptyLabels()) + + t.Run("Should set the active series gauge for the given user and attribution", func(t *testing.T) { + lbls.Set("team", "foo") + val := manager.UpdateAttributionTimestamp(userID, lbls.Labels(), time.Unix(0, 0)) + manager.SetActiveSeries(userID, val, 1.0) + expectedMetrics := ` + # HELP cortex_ingester_active_series_attribution The total number of active series per user and attribution. + # TYPE cortex_ingester_active_series_attribution gauge + cortex_ingester_active_series_attribution{team="foo",user="user1"} 1 + ` + metricNames := []string{ + "cortex_ingester_active_series_attribution", + } + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) + }) + + t.Run("Should set the active series gauge for all users and attributions enabled and ignore disabled user", func(t *testing.T) { + userID = "user3" + lbls.Set("department", "bar") + val := manager.UpdateAttributionTimestamp(userID, lbls.Labels(), time.Unix(0, 0)) + manager.SetActiveSeries(userID, val, 2.0) + + lbls.Set("department", "baz") + val = manager.UpdateAttributionTimestamp(userID, lbls.Labels(), time.Unix(0, 0)) + manager.SetActiveSeries(userID, val, 3.0) + + expectedMetrics := ` + # HELP cortex_ingester_active_series_attribution The total number of active series per user and attribution. + # TYPE cortex_ingester_active_series_attribution gauge + cortex_ingester_active_series_attribution{department="bar",user="user3"} 2 + cortex_ingester_active_series_attribution{department="baz",user="user3"} 3 + cortex_ingester_active_series_attribution{team="foo",user="user1"} 1 + ` + metricNames := []string{ + "cortex_ingester_active_series_attribution", + } + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) + }) + + t.Run("Cleanup the active series gauge for the given user and attribution when cost attribution disabled", func(t *testing.T) { + limits := manager.attributionTracker.limits + defer func() { manager.attributionTracker.limits = limits }() + userID = "user3" + lbls.Set("department", "baz") + + overrides, _ := validation.NewOverrides(validation.Limits{}, validation.NewMockTenantLimits(map[string]*validation.Limits{ + userID: { + MaxCostAttributionPerUser: 2, + CostAttributionLabel: "", + }, + })) + manager.attributionTracker.limits = overrides + val := manager.UpdateAttributionTimestamp(userID, lbls.Labels(), time.Unix(5, 0)) + manager.SetActiveSeries(userID, val, 3.0) + + expectedMetrics := ` + # HELP cortex_ingester_active_series_attribution The total number of active series per user and attribution. + # TYPE cortex_ingester_active_series_attribution gauge + cortex_ingester_active_series_attribution{team="foo",user="user1"} 1 + ` + metricNames := []string{ + "cortex_ingester_active_series_attribution", + } + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) + }) + + t.Run("Should ignore setting the active series gauge for disabled user", func(t *testing.T) { + userID = "user2" + lbls.Set("department", "bar") + val := manager.UpdateAttributionTimestamp(userID, lbls.Labels(), time.Unix(0, 0)) + manager.SetActiveSeries(userID, val, 4.0) + + expectedMetrics := ` + # HELP cortex_ingester_active_series_attribution The total number of active series per user and attribution. + # TYPE cortex_ingester_active_series_attribution gauge + cortex_ingester_active_series_attribution{team="foo",user="user1"} 1 + ` + metricNames := []string{ + "cortex_ingester_active_series_attribution", + } + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) + }) +} diff --git a/pkg/costattribution/caimpl/tracker.go b/pkg/costattribution/caimpl/tracker.go new file mode 100644 index 00000000000..255b6413907 --- /dev/null +++ b/pkg/costattribution/caimpl/tracker.go @@ -0,0 +1,64 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package caimpl + +import ( + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/atomic" +) + +type Tracker struct { + trackedLabel string + attributionLimit int + activeSeriesPerUserAttribution *prometheus.GaugeVec + receivedSamplesAttribution *prometheus.CounterVec + discardedSampleAttribution *prometheus.CounterVec + attributionTimestamps map[string]*atomic.Int64 + coolDownDeadline *atomic.Int64 +} + +func (t *Tracker) cleanupTrackerAttribution(userID, attribution string) { + t.activeSeriesPerUserAttribution.DeleteLabelValues(userID, attribution) + t.receivedSamplesAttribution.DeleteLabelValues(userID, attribution) + t.discardedSampleAttribution.DeleteLabelValues(userID, attribution) +} + +func (t *Tracker) cleanupTracker(userID string) { + filter := prometheus.Labels{"user": userID} + t.activeSeriesPerUserAttribution.DeletePartialMatch(filter) + t.receivedSamplesAttribution.DeletePartialMatch(filter) + t.discardedSampleAttribution.DeletePartialMatch(filter) +} + +func newTracker(trackedLabel string, limit int) (*Tracker, error) { + m := &Tracker{ + trackedLabel: trackedLabel, + attributionLimit: limit, + attributionTimestamps: map[string]*atomic.Int64{}, + coolDownDeadline: atomic.NewInt64(0), + discardedSampleAttribution: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_discarded_samples_attribution_total", + Help: "The total number of samples that were discarded per attribution.", + }, []string{"user", trackedLabel}), + receivedSamplesAttribution: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_received_samples_attribution_total", + Help: "The total number of samples that were received per attribution.", + }, []string{"user", trackedLabel}), + activeSeriesPerUserAttribution: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_active_series_attribution", + Help: "The total number of active series per user and attribution.", + }, []string{"user", trackedLabel}), + } + return m, nil +} + +func (t *Tracker) Collect(out chan<- prometheus.Metric) { + t.activeSeriesPerUserAttribution.Collect(out) + t.receivedSamplesAttribution.Collect(out) + t.discardedSampleAttribution.Collect(out) +} + +// Describe implements prometheus.Collector. +func (t *Tracker) Describe(chan<- *prometheus.Desc) { + // this is an unchecked collector +} diff --git a/pkg/costattribution/caimpl/tracker_group.go b/pkg/costattribution/caimpl/tracker_group.go new file mode 100644 index 00000000000..f6fcc78e9ee --- /dev/null +++ b/pkg/costattribution/caimpl/tracker_group.go @@ -0,0 +1,189 @@ +package caimpl + +import ( + "sync" + "time" + + "go.uber.org/atomic" + + "github.com/grafana/mimir/pkg/util/validation" +) + +type AttributionTrackerGroup struct { + mu sync.RWMutex + trackersByUserID map[string]*Tracker + limits *validation.Overrides +} + +func newAttributionTrackerGroup(limits *validation.Overrides) *AttributionTrackerGroup { + return &AttributionTrackerGroup{ + trackersByUserID: make(map[string]*Tracker), + limits: limits, + mu: sync.RWMutex{}, + } +} + +// getUserAttributionLabelFromCache is read user attribution label through cache, if not found, get from config +func (atg *AttributionTrackerGroup) getUserAttributionLabelFromCache(userID string) string { + atg.mu.RLock() + defer atg.mu.RUnlock() + // if the user is not enabled for cost attribution, we don't need to track the attribution + if atg.limits.CostAttributionLabel(userID) == "" { + return "" + } + if _, exists := atg.trackersByUserID[userID]; !exists { + atg.trackersByUserID[userID], _ = newTracker(atg.limits.CostAttributionLabel(userID), atg.limits.MaxCostAttributionPerUser(userID)) + } + return atg.trackersByUserID[userID].trackedLabel +} + +// getUserAttributionLimitFromCache is read per user attribution limit through cache, if not found, get from config +// always call only when the user is enabled for cost attribution +func (atg *AttributionTrackerGroup) getUserAttributionLimitFromCache(userID string) int { + atg.mu.Lock() + defer atg.mu.Unlock() + if _, exists := atg.trackersByUserID[userID]; !exists { + atg.trackersByUserID[userID], _ = newTracker(atg.limits.CostAttributionLabel(userID), atg.limits.MaxCostAttributionPerUser(userID)) + } + return atg.trackersByUserID[userID].attributionLimit +} + +// deleteUserTracerFromCache is delete user from cache since the user is disabled for cost attribution +func (atg *AttributionTrackerGroup) deleteUserTracerFromCache(userID string) { + atg.mu.Lock() + defer atg.mu.Unlock() + if _, exists := atg.trackersByUserID[userID]; !exists { + return + } + // clean up tracker metrics and delete the tracker + atg.trackersByUserID[userID].cleanupTracker(userID) + delete(atg.trackersByUserID, userID) +} + +// updateAttributionCacheForUser function is guaranteed to update label and limit for the user in the cache +// if the label has changed, we will create a new tracker, and won't update the timestamp +// if the label has not changed, we will update the attribution timestamp +// if the limit is set to 0 or label is empty, we skip the update +func (atg *AttributionTrackerGroup) updateAttributionCacheForUser(userID, label, attribution string, now time.Time) { + // If the limit is set to 0, we don't need to track the attribution, clean the cache if exists + if atg.limits.CostAttributionLabel(userID) == "" || atg.limits.MaxCostAttributionPerUser(userID) <= 0 { + atg.deleteUserTracerFromCache(userID) + return + } + ts := now.Unix() + + // if the cache is not update to date, we reset cache + if atg.getUserAttributionLabelFromCache(userID) != atg.limits.CostAttributionLabel(userID) || atg.getUserAttributionLimitFromCache(userID) != atg.limits.MaxCostAttributionPerUser(userID) { + atg.trackersByUserID[userID], _ = newTracker(atg.limits.CostAttributionLabel(userID), atg.limits.MaxCostAttributionPerUser(userID)) + } + + if label != atg.getUserAttributionLabelFromCache(userID) { + return + } + + /// update attribution timestamp + if groupTs := atg.trackersByUserID[userID].attributionTimestamps[attribution]; groupTs != nil { + groupTs.Store(ts) + return + } + + // if the user attribution is not exist, we add an attribution timestamp + atg.mu.Lock() + defer atg.mu.Unlock() + atg.trackersByUserID[userID].attributionTimestamps[attribution] = atomic.NewInt64(ts) +} + +func (atg *AttributionTrackerGroup) purgeInactiveAttributionsForUser(userID string, deadline int64) []string { + atg.mu.RLock() + var inactiveAttributions []string + if atg.trackersByUserID[userID] == nil { + return nil + } + atg.mu.RUnlock() + + atg.mu.Lock() + if atg.trackersByUserID[userID].trackedLabel != atg.limits.CostAttributionLabel(userID) { + // reset everything if the label has changed + atg.trackersByUserID[userID], _ = newTracker(atg.limits.CostAttributionLabel(userID), atg.limits.MaxCostAttributionPerUser(userID)) + } + atg.mu.Unlock() + + atg.mu.RLock() + attributionTimestamps := atg.trackersByUserID[userID].attributionTimestamps + if attributionTimestamps == nil { + return nil + } + for attr, ts := range attributionTimestamps { + if ts.Load() <= deadline { + inactiveAttributions = append(inactiveAttributions, attr) + } + } + atg.mu.RUnlock() + if len(inactiveAttributions) == 0 { + return nil + } + + // Cleanup inactive groups + atg.mu.Lock() + defer atg.mu.Unlock() + + for i := 0; i < len(inactiveAttributions); { + inactiveAttribution := inactiveAttributions[i] + groupTs := atg.trackersByUserID[userID].attributionTimestamps[inactiveAttribution] + if groupTs != nil && groupTs.Load() <= deadline { + delete(atg.trackersByUserID[userID].attributionTimestamps, inactiveAttribution) + i++ + } else { + inactiveAttributions[i] = inactiveAttributions[len(inactiveAttributions)-1] + inactiveAttributions = inactiveAttributions[:len(inactiveAttributions)-1] + } + } + + return inactiveAttributions +} + +func (atg *AttributionTrackerGroup) purgeInactiveAttributions(inactiveTimeout time.Duration) { + atg.mu.RLock() + userIDs := make([]string, 0, len(atg.trackersByUserID)) + for userID := range atg.trackersByUserID { + userIDs = append(userIDs, userID) + } + atg.mu.RUnlock() + + currentTime := time.Now() + for _, userID := range userIDs { + inactiveAttributions := atg.purgeInactiveAttributionsForUser(userID, currentTime.Add(-inactiveTimeout).UnixNano()) + for _, attribution := range inactiveAttributions { + atg.trackersByUserID[userID].cleanupTrackerAttribution(userID, attribution) + } + } +} + +func (atg *AttributionTrackerGroup) attributionLimitExceeded(userID, attribution string, now time.Time) bool { + // if we are still at the cooldown period, we will consider the limit reached + atg.mu.RLock() + defer atg.mu.RUnlock() + // if the user is not exist, we don't need to check the limit + if atg.trackersByUserID[userID] == nil { + return false + } + + if v := atg.trackersByUserID[userID].coolDownDeadline; v != nil && v.Load() > now.UnixNano() { + return true + } + + // if the user attribution is already exist and we are not in the cooldown period, we don't need to check the limit + _, exists := atg.trackersByUserID[userID].attributionTimestamps[attribution] + if exists { + return false + } + + // if the user has reached the limit, we will set the cooldown period which is 20 minutes + maxReached := len(atg.trackersByUserID[userID].attributionTimestamps) >= atg.limits.MaxCostAttributionPerUser(userID) + if maxReached { + atg.trackersByUserID[userID].coolDownDeadline.Store(now.Add(20 * time.Minute).UnixNano()) + return true + } + + return maxReached +} diff --git a/pkg/costattribution/caimpl/tracker_group_test.go b/pkg/costattribution/caimpl/tracker_group_test.go new file mode 100644 index 00000000000..fb3eb87d3d0 --- /dev/null +++ b/pkg/costattribution/caimpl/tracker_group_test.go @@ -0,0 +1,96 @@ +package caimpl + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/grafana/mimir/pkg/util/validation" +) + +func TestUpdateAttributionTimestampForUser(t *testing.T) { + t.Run("Should not update the timestamp for the user if attribution lable is not set", func(t *testing.T) { + // Create mock limits + limiter, err := validation.NewOverrides(validation.Limits{CostAttributionLabel: "", MaxCostAttributionPerUser: 5}, nil) + assert.NoError(t, err) + + trackerGroup := newAttributionTrackerGroup(limiter) + assert.NotNil(t, trackerGroup) + + ts := time.Unix(1, 0) + trackerGroup.updateAttributionCacheForUser("tenantA", "platform", "platformA", ts) + trackerGroup.updateAttributionCacheForUser("tenantB", "platform", "teamB", ts) + + assert.Equal(t, 0, len(trackerGroup.trackersByUserID)) + }) + + t.Run("Should not update the timestamp for the user if max cost attribution per user is 0", func(t *testing.T) { + // Create mock limits + limiter, err := validation.NewOverrides(validation.Limits{CostAttributionLabel: "platform", MaxCostAttributionPerUser: 0}, nil) + assert.NoError(t, err) + + trackerGroup := newAttributionTrackerGroup(limiter) + assert.NotNil(t, trackerGroup) + + ts := time.Unix(1, 0) + trackerGroup.updateAttributionCacheForUser("tenantA", "platform", "platformA", ts) + trackerGroup.updateAttributionCacheForUser("tenantB", "platform", "teamB", ts) + + assert.Equal(t, 0, len(trackerGroup.trackersByUserID)) + }) + + t.Run("Should update the timestamp for the user attribution", func(t *testing.T) { + // Create mock limits + limiter, err := validation.NewOverrides(validation.Limits{CostAttributionLabel: "platform", MaxCostAttributionPerUser: 5}, nil) + assert.NoError(t, err) + + trackerGroup := newAttributionTrackerGroup(limiter) + assert.NotNil(t, trackerGroup) + + ts := time.Unix(1, 0) + trackerGroup.updateAttributionCacheForUser("tenantA", "platform", "fooA", ts) + trackerGroup.updateAttributionCacheForUser("tenantB", "platform", "barA", ts) + + assert.Equal(t, 2, len(trackerGroup.trackersByUserID)) + fmt.Println(trackerGroup.trackersByUserID) + assert.NotNil(t, trackerGroup.trackersByUserID["tenantA"]) + assert.NotNil(t, trackerGroup.trackersByUserID["tenantA"].attributionTimestamps["fooA"]) + assert.Equal(t, int64(1), trackerGroup.trackersByUserID["tenantA"].attributionTimestamps["fooA"].Load()) + + trackerGroup.updateAttributionCacheForUser("tenantB", "platform", "barA", ts.Add(time.Second)) + assert.Equal(t, int64(2), trackerGroup.trackersByUserID["tenantB"].attributionTimestamps["barA"].Load()) + }) +} + +func TestGetUserAttributionLabel(t *testing.T) { + t.Run("Should return the cost attribution label for the user", func(t *testing.T) { + // Create mock limits + limiter, err := validation.NewOverrides(validation.Limits{CostAttributionLabel: "platform", MaxCostAttributionPerUser: 5}, nil) + assert.NoError(t, err) + + trackerGroup := newAttributionTrackerGroup(limiter) + assert.NotNil(t, trackerGroup) + trackerGroup.updateAttributionCacheForUser("tenantA", "platform", "fooA", time.Unix(0, 0)) + + assert.Equal(t, "platform", trackerGroup.getUserAttributionLabelFromCache("tenantA")) + }) + + t.Run("Should return the default cost attribution label for the user if it is in cache", func(t *testing.T) { + // Create mock limits + limiter, err := validation.NewOverrides(validation.Limits{CostAttributionLabel: "platform", MaxCostAttributionPerUser: 5}, nil) + assert.NoError(t, err) + + trackerGroup := newAttributionTrackerGroup(limiter) + assert.NotNil(t, trackerGroup) + + assert.Equal(t, "platform", trackerGroup.getUserAttributionLabelFromCache("tenantA")) + + // update the timestamp for the user, so cache is updated + trackerGroup.updateAttributionCacheForUser("tenantA", "platform", "fooA", time.Unix(0, 0)) + + // still read the cost attribution label from cache until cache is updated by timed service + assert.Equal(t, "platform", trackerGroup.getUserAttributionLabelFromCache("tenantA")) + }) +} diff --git a/pkg/costattribution/caimpl/tracker_test.go b/pkg/costattribution/caimpl/tracker_test.go new file mode 100644 index 00000000000..7427e6a1540 --- /dev/null +++ b/pkg/costattribution/caimpl/tracker_test.go @@ -0,0 +1,55 @@ +package caimpl + +import ( + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_NewTracker(t *testing.T) { + reg := prometheus.NewRegistry() + + // Initialize a new Tracker + trackedLabel := "platform" + tracker, err := newTracker(trackedLabel, 5) + require.NoError(t, err) + err = reg.Register(tracker) + require.NoError(t, err) + + // Simulate some values in the metrics + userID := "user1" + attribution := "foo" + tracker.activeSeriesPerUserAttribution.WithLabelValues(userID, attribution).Set(1.0) + tracker.receivedSamplesAttribution.WithLabelValues(userID, attribution).Add(5) + tracker.discardedSampleAttribution.WithLabelValues(userID, attribution).Add(2) + + expectedMetrics := ` + # HELP cortex_discarded_samples_attribution_total The total number of samples that were discarded per attribution. + # TYPE cortex_discarded_samples_attribution_total counter + cortex_discarded_samples_attribution_total{platform="foo",user="user1"} 2 + # HELP cortex_ingester_active_series_attribution The total number of active series per user and attribution. + # TYPE cortex_ingester_active_series_attribution gauge + cortex_ingester_active_series_attribution{platform="foo",user="user1"} 1 + # HELP cortex_received_samples_attribution_total The total number of samples that were received per attribution. + # TYPE cortex_received_samples_attribution_total counter + cortex_received_samples_attribution_total{platform="foo",user="user1"} 5 + ` + + metricNames := []string{ + "cortex_discarded_samples_attribution_total", + "cortex_received_samples_attribution_total", + "cortex_ingester_active_series_attribution", + } + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) + + // Clean the tracker for the user attribution + tracker.cleanupTrackerAttribution(userID, attribution) + + // Verify that metrics have been cleaned + expectedMetrics = `` + assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(expectedMetrics), metricNames...)) +} diff --git a/pkg/costattribution/manager.go b/pkg/costattribution/manager.go new file mode 100644 index 00000000000..60c2b92517c --- /dev/null +++ b/pkg/costattribution/manager.go @@ -0,0 +1,24 @@ +package costattribution + +import ( + "time" + + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" +) + +type Manager interface { + services.Service + + EnabledForUser(userID string) bool + GetUserAttributionLabel(userID string) string + GetUserAttributionLimit(userID string) int + UpdateAttributionTimestamp(user string, lbs labels.Labels, now time.Time) string + SetActiveSeries(userID, attribution string, value float64) + IncrementDiscardedSamples(userID, attribution string, value float64) + IncrementReceivedSamples(userID, attribution string, value float64) + + Collect(out chan<- prometheus.Metric) + Describe(chan<- *prometheus.Desc) +} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d127bfe00d1..7fdeb4a3622 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -48,12 +48,12 @@ import ( "golang.org/x/sync/errgroup" "github.com/grafana/mimir/pkg/cardinality" + "github.com/grafana/mimir/pkg/costattribution" ingester_client "github.com/grafana/mimir/pkg/ingester/client" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/storage/ingest" "github.com/grafana/mimir/pkg/util" - "github.com/grafana/mimir/pkg/util/costattribution" "github.com/grafana/mimir/pkg/util/globalerror" mimir_limiter "github.com/grafana/mimir/pkg/util/limiter" util_math "github.com/grafana/mimir/pkg/util/math" @@ -106,7 +106,7 @@ type Distributor struct { distributorsLifecycler *ring.BasicLifecycler distributorsRing *ring.Ring healthyInstancesCount *atomic.Uint32 - costAttributionSvc *costattribution.CostAttributionCleanupService + costAttributionMng costattribution.Manager // For handling HA replicas. HATracker *haTracker @@ -307,7 +307,7 @@ func (m *PushMetrics) deleteUserMetrics(user string) { } // New constructs a new Distributor -func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *costattribution.CostAttributionCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) { +func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionMng costattribution.Manager, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) { clientMetrics := ingester_client.NewMetrics(reg) if cfg.IngesterClientFactory == nil { cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) { @@ -342,7 +342,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove healthyInstancesCount: atomic.NewUint32(0), limits: limits, HATracker: haTracker, - costAttributionSvc: costAttributionClenaupService, + costAttributionMng: costAttributionMng, ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ @@ -1669,29 +1669,28 @@ func tokenForMetadata(userID string, metricName string) uint32 { func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, now time.Time) { var receivedSamples, receivedExemplars, receivedMetadata int - costAttributionSize := 0 - caEnabled := d.costAttributionSvc != nil && d.costAttributionSvc.EnabledForUser(userID) + costattributionLimit := 0 + caEnabled := d.costAttributionMng != nil && d.costAttributionMng.EnabledForUser(userID) if caEnabled { - costAttributionSize = d.costAttributionSvc.GetUserAttributionLimit(userID) + costattributionLimit = d.costAttributionMng.GetUserAttributionLimit(userID) } - costAttribution := make(map[string]int, costAttributionSize) + costAttribution := make(map[string]int, costattributionLimit) for _, ts := range req.Timeseries { receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms) receivedExemplars += len(ts.TimeSeries.Exemplars) if caEnabled { - attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels), now) + attribution := d.costAttributionMng.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels), now) costAttribution[attribution]++ } } receivedMetadata = len(req.Metadata) if caEnabled { for lv, count := range costAttribution { - d.costAttributionSvc.IncrementReceivedSamples(userID, lv, float64(count)) + d.costAttributionMng.IncrementReceivedSamples(userID, lv, float64(count)) } - } else { - d.receivedSamples.WithLabelValues(userID).Add(float64(receivedSamples)) } + d.receivedSamples.WithLabelValues(userID).Add(float64(receivedSamples)) d.receivedExemplars.WithLabelValues(userID).Add(float64(receivedExemplars)) d.receivedMetadata.WithLabelValues(userID).Add(float64(receivedMetadata)) } diff --git a/pkg/ingester/activeseries/active_series.go b/pkg/ingester/activeseries/active_series.go index 7cc2d39554b..56abe4f4527 100644 --- a/pkg/ingester/activeseries/active_series.go +++ b/pkg/ingester/activeseries/active_series.go @@ -11,13 +11,13 @@ import ( "sync" "time" - "github.com/grafana/mimir/pkg/util/costattribution" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/util/zeropool" "go.uber.org/atomic" + "github.com/grafana/mimir/pkg/costattribution" asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model" ) @@ -50,7 +50,7 @@ type ActiveSeries struct { matchers *asmodel.Matchers lastMatchersUpdate time.Time - costAttributionSvc *costattribution.CostAttributionCleanupService + costAttributionMng costattribution.Manager // The duration after which series become inactive. // Also used to determine if enough time has passed since configuration reload for valid results. @@ -68,7 +68,7 @@ type seriesStripe struct { // Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated // without holding the lock -- hence the atomic). oldestEntryTs atomic.Int64 - costAttributionSvc *costattribution.CostAttributionCleanupService + costAttributionMng costattribution.Manager mu sync.RWMutex refs map[storage.SeriesRef]seriesEntry active uint32 // Number of active entries in this stripe. Only decreased during purge or clear. @@ -97,16 +97,16 @@ func NewActiveSeries( asm *asmodel.Matchers, timeout time.Duration, userID string, - costAttributionSvc *costattribution.CostAttributionCleanupService, + costAttributionMng costattribution.Manager, ) *ActiveSeries { c := &ActiveSeries{ matchers: asm, timeout: timeout, userID: userID, - costAttributionSvc: costAttributionSvc, + costAttributionMng: costAttributionMng, } // Stripes are pre-allocated so that we only read on them and no lock is required. for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionSvc) + c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionMng) } return c @@ -123,7 +123,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *asmodel.Matchers, now time.Time) { defer c.matchersMutex.Unlock() for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionSvc) + c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionMng) } c.matchers = asm c.lastMatchersUpdate = now @@ -230,7 +230,7 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot } func (c *ActiveSeries) ActiveByAttributionValue() map[string]uint32 { - total := make(map[string]uint32, c.costAttributionSvc.GetUserAttributionLimit(c.userID)) + total := make(map[string]uint32, c.costAttributionMng.GetUserAttributionLimit(c.userID)) for s := 0; s < numStripes; s++ { c.stripes[s].mu.RLock() for k, v := range c.stripes[s].costAttributionValues { @@ -426,8 +426,8 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef // here if we have a cost attribution label, we can split the serie count based on the value of the label // we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly - if s.costAttributionSvc != nil && s.costAttributionSvc.GetUserAttributionLabel(s.userID) != "" { - attributionValue := s.costAttributionSvc.UpdateAttributionTimestamp(s.userID, series, time.Unix(0, nowNanos)) + if s.costAttributionMng != nil && s.costAttributionMng.GetUserAttributionLabel(s.userID) != "" { + attributionValue := s.costAttributionMng.UpdateAttributionTimestamp(s.userID, series, time.Unix(0, nowNanos)) s.costAttributionValues[attributionValue]++ e.attributionValue = attributionValue } @@ -459,7 +459,7 @@ func (s *seriesStripe) reinitialize( asm *asmodel.Matchers, deleted *deletedSeries, userID string, - costAttributionSvc *costattribution.CostAttributionCleanupService, + costAttributionMng costattribution.Manager, ) { s.mu.Lock() defer s.mu.Unlock() @@ -475,7 +475,7 @@ func (s *seriesStripe) reinitialize( s.activeMatching = resizeAndClear(len(asm.MatcherNames()), s.activeMatching) s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms) s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets) - s.costAttributionSvc = costAttributionSvc + s.costAttributionMng = costAttributionMng } func (s *seriesStripe) purge(keepUntil time.Time) { diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 43bd42d4f0a..56db53b7c5b 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -51,6 +51,7 @@ import ( "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" + "github.com/grafana/mimir/pkg/costattribution" "github.com/grafana/mimir/pkg/ingester/activeseries" asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model" "github.com/grafana/mimir/pkg/ingester/client" @@ -63,7 +64,6 @@ import ( "github.com/grafana/mimir/pkg/storage/tsdb/block" "github.com/grafana/mimir/pkg/usagestats" "github.com/grafana/mimir/pkg/util" - "github.com/grafana/mimir/pkg/util/costattribution" "github.com/grafana/mimir/pkg/util/globalerror" "github.com/grafana/mimir/pkg/util/limiter" util_log "github.com/grafana/mimir/pkg/util/log" @@ -311,7 +311,7 @@ type Ingester struct { activeGroups *util.ActiveGroupsCleanupService - costAttributionSvc *costattribution.CostAttributionCleanupService + costAttributionMng costattribution.Manager tsdbMetrics *tsdbMetrics @@ -380,7 +380,7 @@ func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus } // New returns an Ingester that uses Mimir block storage. -func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionCleanupService *costattribution.CostAttributionCleanupService, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { +func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionMng costattribution.Manager, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { i, err := newIngester(cfg, limits, registerer, logger) if err != nil { return nil, err @@ -388,7 +388,7 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, i.ingestionRate = util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval) i.metrics = newIngesterMetrics(registerer, cfg.ActiveSeriesMetrics.Enabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests, &i.inflightPushRequestsBytes) i.activeGroups = activeGroupsCleanupService - i.costAttributionSvc = costAttributionCleanupService + i.costAttributionMng = costAttributionMng // We create a circuit breaker, which will be activated on a successful completion of starting. i.circuitBreaker = newIngesterCircuitBreaker(i.cfg.PushCircuitBreaker, i.cfg.ReadCircuitBreaker, logger, registerer) @@ -788,15 +788,14 @@ func (i *Ingester) updateActiveSeries(now time.Time) { allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := userDB.activeSeries.ActiveWithMatchers() i.metrics.activeSeriesLoading.DeleteLabelValues(userID) if allActive > 0 { - caEnabled := i.costAttributionSvc != nil && i.costAttributionSvc.EnabledForUser(userID) + caEnabled := i.costAttributionMng != nil && i.costAttributionMng.EnabledForUser(userID) if caEnabled { labelAttributions := userDB.activeSeries.ActiveByAttributionValue() for label, count := range labelAttributions { - i.costAttributionSvc.SetActiveSeries(userID, label, float64(count)) + i.costAttributionMng.SetActiveSeries(userID, label, float64(count)) } - } else { - i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(allActive)) } + i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(allActive)) } else { i.metrics.activeSeriesPerUser.DeletePartialMatch(prometheus.Labels{"user": userID}) } @@ -1283,9 +1282,9 @@ func (i *Ingester) updateMetricsFromPushStats(userID string, group string, stats db.ingestedAPISamples.Add(int64(stats.succeededSamplesCount)) } } - if i.costAttributionSvc != nil && i.costAttributionSvc.EnabledForUser(userID) { + if i.costAttributionMng != nil && i.costAttributionMng.EnabledForUser(userID) { for label, count := range stats.failedSamplesAttribution { - i.costAttributionSvc.IncrementDiscardedSamples(userID, label, float64(count)) + i.costAttributionMng.IncrementDiscardedSamples(userID, label, float64(count)) } } } @@ -1303,7 +1302,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre if costLabel != "" { // get the label value and update the timestamp, // if the cordianlity is reached or we are currently in cooldown period, function would returned __unaccounted__ - costAttrib := i.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(labels), startAppend) + costAttrib := i.costAttributionMng.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(labels), startAppend) stats.failedSamplesAttribution[costAttrib]++ } @@ -1414,7 +1413,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre var costAttrib string // when cost attribution label is set if costLabel != "" { - costAttrib = i.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels), startAppend) + costAttrib = i.costAttributionMng.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels), startAppend) } // The labels must be sorted (in our case, it's guaranteed a write request @@ -2671,7 +2670,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD asmodel.NewMatchers(matchersConfig), i.cfg.ActiveSeriesMetrics.IdleTimeout, userID, - i.costAttributionSvc, + i.costAttributionMng, ), seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 3efd678b1a8..75f282f28c3 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -51,6 +51,7 @@ import ( "github.com/grafana/mimir/pkg/blockbuilder" "github.com/grafana/mimir/pkg/compactor" "github.com/grafana/mimir/pkg/continuoustest" + "github.com/grafana/mimir/pkg/costattribution" "github.com/grafana/mimir/pkg/distributor" "github.com/grafana/mimir/pkg/flusher" "github.com/grafana/mimir/pkg/frontend" @@ -74,7 +75,6 @@ import ( "github.com/grafana/mimir/pkg/usagestats" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/activitytracker" - "github.com/grafana/mimir/pkg/util/costattribution" util_log "github.com/grafana/mimir/pkg/util/log" "github.com/grafana/mimir/pkg/util/noauth" "github.com/grafana/mimir/pkg/util/process" @@ -175,7 +175,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.IntVar(&c.MaxSeparateMetricsGroupsPerUser, "max-separate-metrics-groups-per-user", 1000, "Maximum number of groups allowed per user by which specified distributor and ingester metrics can be further separated.") f.BoolVar(&c.EnableGoRuntimeMetrics, "enable-go-runtime-metrics", false, "Set to true to enable all Go runtime metrics, such as go_sched_* and go_memstats_*.") f.BoolVar(&c.TimeseriesUnmarshalCachingOptimizationEnabled, "timeseries-unmarshal-caching-optimization-enabled", true, "Enables optimized marshaling of timeseries.") - + f.StringVar(&c.CustomRegistryPath, "custom-registry-path", "", "Defines a custom path for the registry. When specified, Mimir will expose cost attribution metrics through this custom path instead of using the default Prometheus registry.") c.API.RegisterFlags(f) c.registerServerFlagsWithChangedDefaultValues(f) c.Distributor.RegisterFlags(f, logger) @@ -714,7 +714,7 @@ type Mimir struct { TenantLimits validation.TenantLimits Overrides *validation.Overrides ActiveGroupsCleanup *util.ActiveGroupsCleanupService - CostAttributionCleanup *costattribution.CostAttributionCleanupService + CostAttributionManager costattribution.Manager Distributor *distributor.Distributor Ingester *ingester.Ingester diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 8da436becf1..33e941ad491 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/alertmanager/featurecontrol" "github.com/prometheus/alertmanager/matchers/compat" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/config" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/rules" @@ -42,6 +43,7 @@ import ( "github.com/grafana/mimir/pkg/blockbuilder" "github.com/grafana/mimir/pkg/compactor" "github.com/grafana/mimir/pkg/continuoustest" + "github.com/grafana/mimir/pkg/costattribution/caimpl" "github.com/grafana/mimir/pkg/distributor" "github.com/grafana/mimir/pkg/flusher" "github.com/grafana/mimir/pkg/frontend" @@ -60,13 +62,11 @@ import ( "github.com/grafana/mimir/pkg/usagestats" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/activitytracker" - "github.com/grafana/mimir/pkg/util/costattribution" util_log "github.com/grafana/mimir/pkg/util/log" "github.com/grafana/mimir/pkg/util/validation" "github.com/grafana/mimir/pkg/util/validation/exporter" "github.com/grafana/mimir/pkg/util/version" "github.com/grafana/mimir/pkg/vault" - "github.com/prometheus/client_golang/prometheus/promhttp" ) // The various modules that make up Mimir. @@ -464,7 +464,7 @@ func (t *Mimir) initDistributorService() (serv services.Service, err error) { t.Cfg.Distributor.IngestStorageConfig = t.Cfg.IngestStorage t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, - t.ActiveGroupsCleanup, t.CostAttributionCleanup, t.IngesterRing, t.IngesterPartitionInstanceRing, + t.ActiveGroupsCleanup, t.CostAttributionManager, t.IngesterRing, t.IngesterPartitionInstanceRing, canJoinDistributorsRing, t.Registerer, util_log.Logger) if err != nil { return @@ -648,17 +648,21 @@ func (t *Mimir) initActiveGroupsCleanupService() (services.Service, error) { } func (t *Mimir) initCostAttributionService() (services.Service, error) { + reg := t.Registerer if t.Cfg.CustomRegistryPath != "" { + // if custom registry path is provided, create a custom registry and use it for cost attribution service customRegistry := prometheus.NewRegistry() // Register the custom registry with the provided URL. // This allows users to expose custom metrics on a separate endpoint. // This is useful when users want to expose metrics that are not part of the default Mimir metrics. http.Handle(t.Cfg.CustomRegistryPath, promhttp.HandlerFor(customRegistry, promhttp.HandlerOpts{Registry: customRegistry})) - t.CostAttributionCleanup = costattribution.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides, customRegistry) - return t.CostAttributionCleanup, nil + reg = customRegistry } - t.CostAttributionCleanup = costattribution.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides, t.Registerer) - return t.CostAttributionCleanup, nil + // if custom registry path is not provided, use the default registry + t.CostAttributionManager = caimpl.NewManager(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides) + err := reg.Register(t.CostAttributionManager) + + return t.CostAttributionManager, err } func (t *Mimir) tsdbIngesterConfig() { @@ -672,7 +676,7 @@ func (t *Mimir) initIngesterService() (serv services.Service, err error) { t.Cfg.Ingester.IngestStorageConfig = t.Cfg.IngestStorage t.tsdbIngesterConfig() - t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, t.IngesterRing, t.IngesterPartitionRingWatcher, t.ActiveGroupsCleanup, t.CostAttributionCleanup, t.Registerer, util_log.Logger) + t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, t.IngesterRing, t.IngesterPartitionRingWatcher, t.ActiveGroupsCleanup, t.CostAttributionManager, t.Registerer, util_log.Logger) if err != nil { return } diff --git a/pkg/util/costattribution/cost_attribution.go b/pkg/util/costattribution/cost_attribution.go deleted file mode 100644 index ab47346a35d..00000000000 --- a/pkg/util/costattribution/cost_attribution.go +++ /dev/null @@ -1,294 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package costattribution - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/grafana/dskit/services" - "github.com/grafana/mimir/pkg/util/validation" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/prometheus/prometheus/model/labels" - "go.uber.org/atomic" -) - -type Tracker struct { - trackedLabel string - activeSeriesPerUserAttribution *prometheus.GaugeVec - receivedSamplesAttribution *prometheus.CounterVec - discardedSampleAttribution *prometheus.CounterVec - attributionTimestamps map[string]*atomic.Int64 - coolDownDeadline *atomic.Int64 -} - -func (m *Tracker) RemoveAttributionMetricsForUser(userID, attribution string) { - m.activeSeriesPerUserAttribution.DeleteLabelValues(userID, attribution) - m.receivedSamplesAttribution.DeleteLabelValues(userID, attribution) - m.discardedSampleAttribution.DeleteLabelValues(userID, attribution) -} - -func NewCostAttributionTracker(reg prometheus.Registerer, trackedLabel string) *Tracker { - m := &Tracker{ - trackedLabel: trackedLabel, - attributionTimestamps: map[string]*atomic.Int64{}, - coolDownDeadline: atomic.NewInt64(0), - discardedSampleAttribution: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_discarded_samples_attribution_total", - Help: "The total number of samples that were discarded per attribution.", - }, []string{"user", trackedLabel}), - receivedSamplesAttribution: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_received_samples_attribution_total", - Help: "The total number of samples that were received per attribution.", - }, []string{"user", trackedLabel}), - activeSeriesPerUserAttribution: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "cortex_ingester_active_series_attribution", - Help: "The total number of active series per user and attribution.", - }, []string{"user", trackedLabel}), - } - return m -} - -type CostAttribution struct { - mu sync.RWMutex - trackers map[string]*Tracker - limits *validation.Overrides - reg prometheus.Registerer -} - -func NewCostAttribution(limits *validation.Overrides, reg prometheus.Registerer) *CostAttribution { - return &CostAttribution{ - trackers: make(map[string]*Tracker), - limits: limits, - reg: reg, - mu: sync.RWMutex{}, - } -} - -// UpdateAttributionTimestampForUser function is only guaranteed to update to the -// timestamp provided even if it is smaller than the existing value -func (ca *CostAttribution) UpdateAttributionTimestampForUser(userID, attribution string, now time.Time) { - // If the limit is set to 0, we don't need to track the attribution - if ca.limits.MaxCostAttributionPerUser(userID) <= 0 { - return - } - - ts := now.UnixNano() - ca.mu.Lock() - // create new tracker if not exists - if _, exists := ca.trackers[userID]; !exists { - // the attribution label and values should be managed by cache - ca.trackers[userID] = NewCostAttributionTracker(ca.reg, ca.limits.CostAttributionLabel(userID)) - } - ca.mu.Unlock() - ca.mu.RLock() - if groupTs := ca.trackers[userID].attributionTimestamps[attribution]; groupTs != nil { - groupTs.Store(ts) - return - } - ca.mu.RUnlock() - ca.mu.Lock() - defer ca.mu.Unlock() - ca.trackers[userID].attributionTimestamps[attribution] = atomic.NewInt64(ts) -} - -func (ca *CostAttribution) purgeInactiveAttributionsForUser(userID string, deadline int64) []string { - ca.mu.RLock() - var inactiveAttributions []string - if ca.trackers[userID] == nil || ca.trackers[userID].attributionTimestamps == nil { - return nil - } - - attributionTimestamps := ca.trackers[userID].attributionTimestamps - for attr, ts := range attributionTimestamps { - if ts.Load() <= deadline { - inactiveAttributions = append(inactiveAttributions, attr) - } - } - ca.mu.RUnlock() - - if len(inactiveAttributions) == 0 { - return nil - } - - // Cleanup inactive groups - ca.mu.Lock() - defer ca.mu.Unlock() - - for i := 0; i < len(inactiveAttributions); { - inactiveAttribution := inactiveAttributions[i] - groupTs := ca.trackers[userID].attributionTimestamps[inactiveAttribution] - if groupTs != nil && groupTs.Load() <= deadline { - delete(ca.trackers[userID].attributionTimestamps, inactiveAttribution) - i++ - } else { - inactiveAttributions[i] = inactiveAttributions[len(inactiveAttributions)-1] - inactiveAttributions = inactiveAttributions[:len(inactiveAttributions)-1] - } - } - - return inactiveAttributions -} - -func (ca *CostAttribution) purgeInactiveAttributions(inactiveTimeout time.Duration) { - ca.mu.RLock() - userIDs := make([]string, 0, len(ca.trackers)) - for userID := range ca.trackers { - userIDs = append(userIDs, userID) - } - ca.mu.RUnlock() - - currentTime := time.Now() - for _, userID := range userIDs { - inactiveAttributions := ca.purgeInactiveAttributionsForUser(userID, currentTime.Add(-inactiveTimeout).UnixNano()) - for _, attribution := range inactiveAttributions { - ca.trackers[userID].RemoveAttributionMetricsForUser(userID, attribution) - } - } -} - -func (ca *CostAttribution) attributionLimitExceeded(userID, attribution string) bool { - // if we are still at the cooldown period, we will consider the limit reached - ca.mu.RLock() - defer ca.mu.RUnlock() - // if the user is not exist, we don't need to check the limit - if ca.trackers[userID] == nil { - return false - } - - now := time.Now() - if v := ca.trackers[userID].coolDownDeadline; v != nil && v.Load() > now.UnixNano() { - return true - } - - // if the user attribution is already exist and we are not in the cooldown period, we don't need to check the limit - _, exists := ca.trackers[userID].attributionTimestamps[attribution] - if exists { - return false - } - - // if the user has reached the limit, we will set the cooldown period which is 20 minutes - maxReached := len(ca.trackers[userID].attributionTimestamps) >= ca.limits.MaxCostAttributionPerUser(userID) - if maxReached { - ca.mu.Lock() - ca.trackers[userID].coolDownDeadline.Store(now.Add(20 * time.Minute).UnixNano()) - ca.mu.Unlock() - return true - } - - return maxReached -} - -type CostAttributionCleanupService struct { - services.Service - logger log.Logger - costAttribution *CostAttribution - inactiveTimeout time.Duration - invalidValue string -} - -type CostAttributionMetricsCleaner interface { - RemoveAttributionMetricsForUser(userID, attribution string) -} - -func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides, reg prometheus.Registerer) *CostAttributionCleanupService { - s := &CostAttributionCleanupService{ - costAttribution: NewCostAttribution(limits, reg), - inactiveTimeout: inactiveTimeout, - logger: logger, - invalidValue: "__unaccounted__", - } - - s.Service = services.NewTimerService(cleanupInterval, nil, s.iteration, nil).WithName("cost attribution cleanup") - return s -} - -// IncrementReceivedSamples increments the received samples counter for a given user and attribution -func (s *CostAttributionCleanupService) IncrementReceivedSamples(userID, attribution string, value float64) { - attribution = s.GetUserAttribution(userID, attribution) - s.costAttribution.mu.RLock() - defer s.costAttribution.mu.RUnlock() - if tracker, exists := s.costAttribution.trackers[userID]; exists { - tracker.receivedSamplesAttribution.WithLabelValues(userID, attribution).Add(value) - } -} - -// IncrementDiscardedSamples increments the discarded samples counter for a given user and attribution -func (s *CostAttributionCleanupService) IncrementDiscardedSamples(userID, attribution string, value float64) { - attribution = s.GetUserAttribution(userID, attribution) - s.costAttribution.mu.RLock() - defer s.costAttribution.mu.RUnlock() - if tracker, exists := s.costAttribution.trackers[userID]; exists { - tracker.discardedSampleAttribution.WithLabelValues(userID, attribution).Add(value) - } -} - -// SetActiveSeries sets the active series gauge for a given user and attribution -func (s *CostAttributionCleanupService) SetActiveSeries(userID, attribution string, value float64) { - attribution = s.GetUserAttribution(userID, attribution) - s.costAttribution.mu.RLock() - defer s.costAttribution.mu.RUnlock() - if tracker, exists := s.costAttribution.trackers[userID]; exists { - tracker.activeSeriesPerUserAttribution.WithLabelValues(userID, attribution).Set(value) - } -} - -func (s *CostAttributionCleanupService) GetUserAttribution(userID, attribution string) string { - // not tracking cost attribution for this user, this shouldn't happen - if s.costAttribution.limits.MaxCostAttributionPerUser(userID) <= 0 { - return attribution - } - if s.costAttribution.attributionLimitExceeded(userID, attribution) { - return s.invalidValue - } - return attribution -} - -func (s *CostAttributionCleanupService) GetUserAttributionLabel(userID string) string { - s.costAttribution.mu.RLock() - defer s.costAttribution.mu.RUnlock() - if s.costAttribution != nil { - if val, exists := s.costAttribution.trackers[userID]; exists { - return val.trackedLabel - } - } - return "" -} - -func (s *CostAttributionCleanupService) EnabledForUser(userID string) bool { - return s.costAttribution.limits.CostAttributionLabel(userID) != "" -} - -func (s *CostAttributionCleanupService) GetUserAttributionLimit(userID string) int { - return s.costAttribution.limits.MaxCostAttributionPerUser(userID) -} - -func (s *CostAttributionCleanupService) UpdateAttributionTimestamp(user string, lbs labels.Labels, now time.Time) string { - if s.costAttribution.trackers[user] == nil || s.costAttribution.trackers[user].trackedLabel == "" { - return "" - } - attribution := lbs.Get(s.costAttribution.trackers[user].trackedLabel) - // empty label is not normal, if user set attribution label, the metrics send has to include the label - if attribution == "" { - level.Error(s.logger).Log("msg", "set attribution label to \"\" since missing cost attribution label in metrics") - return attribution - } - - if s.costAttribution.attributionLimitExceeded(user, attribution) { - attribution = s.invalidValue - level.Error(s.logger).Log("msg", fmt.Sprintf("set attribution label to \"%s\" since user has reached the limit of cost attribution labels", s.invalidValue)) - } - - s.costAttribution.UpdateAttributionTimestampForUser(user, attribution, now) - return attribution -} - -func (s *CostAttributionCleanupService) iteration(_ context.Context) error { - s.costAttribution.purgeInactiveAttributions(s.inactiveTimeout) - return nil -} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 1d14d03baca..b2e44a8074c 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -143,10 +143,6 @@ type Limits struct { // User defined label to give the option of subdividing specific metrics by another label SeparateMetricsGroupLabel string `yaml:"separate_metrics_group_label" json:"separate_metrics_group_label" category:"experimental"` - // User defined label to give the cost distribution by values of the label - CostAttributionLabel string `yaml:"cost_attribution_label" json:"cost_attribution_label" category:"experimental"` - MaxCostAttributionPerUser int `yaml:"max_cost_attribution_per_user" json:"max_cost_attribution_per_user" category:"experimental"` - // Querier enforced limits. MaxChunksPerQuery int `yaml:"max_fetched_chunks_per_query" json:"max_fetched_chunks_per_query"` MaxEstimatedChunksPerQueryMultiplier float64 `yaml:"max_estimated_fetched_chunks_per_query_multiplier" json:"max_estimated_fetched_chunks_per_query_multiplier" category:"experimental"` @@ -183,6 +179,10 @@ type Limits struct { LabelValuesMaxCardinalityLabelNamesPerRequest int `yaml:"label_values_max_cardinality_label_names_per_request" json:"label_values_max_cardinality_label_names_per_request"` ActiveSeriesResultsMaxSizeBytes int `yaml:"active_series_results_max_size_bytes" json:"active_series_results_max_size_bytes" category:"experimental"` + // Cost attribution and limit. + CostAttributionLabel string `yaml:"cost_attribution_label" json:"cost_attribution_label" category:"experimental"` + MaxCostAttributionPerUser int `yaml:"max_cost_attribution_per_user" json:"max_cost_attribution_per_user" category:"experimental"` + // Ruler defaults and limits. RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"` RulerTenantShardSize int `yaml:"ruler_tenant_shard_size" json:"ruler_tenant_shard_size"`