From f1b4e74f0521224a9d252c3669f46f40ac76c52a Mon Sep 17 00:00:00 2001 From: Ying WANG Date: Wed, 2 Oct 2024 17:08:56 +0200 Subject: [PATCH] seperate registry for cost attribution metrics --- cmd/mimir/config-descriptor.json | 10 + pkg/distributor/distributor.go | 1 - pkg/mimir/mimir.go | 3 +- pkg/mimir/modules.go | 19 +- pkg/util/costattribution/cost_attribution.go | 181 +++++++++++++------ 5 files changed, 153 insertions(+), 61 deletions(-) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index a8222277d87..79bd4bb05af 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -18161,6 +18161,16 @@ "fieldValue": null, "fieldDefaultValue": null }, + { + "kind": "field", + "name": "custom_registry_path", + "required": false, + "desc": "", + "fieldValue": null, + "fieldDefaultValue": "", + "fieldType": "string", + "fieldCategory": "advanced" + }, { "kind": "field", "name": "timeseries_unmarshal_caching_optimization_enabled", diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 05f8160d15d..9d9f2d9045f 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -358,7 +358,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove Name: "cortex_distributor_received_samples_total", Help: "The total number of received samples, excluding rejected and deduped samples.", }, []string{"user", "attrib"}), - receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_distributor_received_exemplars_total", Help: "The total number of received exemplars, excluding rejected and deduped exemplars.", diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 1443cb22705..3efd678b1a8 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -145,7 +145,8 @@ type Config struct { ContinuousTest continuoustest.Config `yaml:"-"` OverridesExporter exporter.Config `yaml:"overrides_exporter"` - Common CommonConfig `yaml:"common"` + Common CommonConfig `yaml:"common"` + CustomRegistryPath string `yaml:"custom_registry_path" category:"advanced"` TimeseriesUnmarshalCachingOptimizationEnabled bool `yaml:"timeseries_unmarshal_caching_optimization_enabled" category:"experimental"` } diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 35f0a8040f0..8da436becf1 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -66,6 +66,7 @@ import ( "github.com/grafana/mimir/pkg/util/validation/exporter" "github.com/grafana/mimir/pkg/util/version" "github.com/grafana/mimir/pkg/vault" + "github.com/prometheus/client_golang/prometheus/promhttp" ) // The various modules that make up Mimir. @@ -473,10 +474,6 @@ func (t *Mimir) initDistributorService() (serv services.Service, err error) { t.ActiveGroupsCleanup.Register(t.Distributor) } - if t.CostAttributionCleanup != nil { - t.CostAttributionCleanup.Register(t.Distributor) - } - return t.Distributor, nil } @@ -651,7 +648,16 @@ func (t *Mimir) initActiveGroupsCleanupService() (services.Service, error) { } func (t *Mimir) initCostAttributionService() (services.Service, error) { - t.CostAttributionCleanup = costattribution.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides) + if t.Cfg.CustomRegistryPath != "" { + customRegistry := prometheus.NewRegistry() + // Register the custom registry with the provided URL. + // This allows users to expose custom metrics on a separate endpoint. + // This is useful when users want to expose metrics that are not part of the default Mimir metrics. + http.Handle(t.Cfg.CustomRegistryPath, promhttp.HandlerFor(customRegistry, promhttp.HandlerOpts{Registry: customRegistry})) + t.CostAttributionCleanup = costattribution.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides, customRegistry) + return t.CostAttributionCleanup, nil + } + t.CostAttributionCleanup = costattribution.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides, t.Registerer) return t.CostAttributionCleanup, nil } @@ -675,9 +681,6 @@ func (t *Mimir) initIngesterService() (serv services.Service, err error) { t.ActiveGroupsCleanup.Register(t.Ingester) } - if t.CostAttributionCleanup != nil { - t.CostAttributionCleanup.Register(t.Ingester) - } return t.Ingester, nil } diff --git a/pkg/util/costattribution/cost_attribution.go b/pkg/util/costattribution/cost_attribution.go index 6af2f39fefd..1956be2625b 100644 --- a/pkg/util/costattribution/cost_attribution.go +++ b/pkg/util/costattribution/cost_attribution.go @@ -12,57 +12,96 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/services" "github.com/grafana/mimir/pkg/util/validation" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "go.uber.org/atomic" ) +type Tracker struct { + trackedLabel string + activeSeriesPerUserAttribution *prometheus.GaugeVec + receivedSamplesAttribution *prometheus.CounterVec + discardedSampleAttribution *prometheus.CounterVec + attributionTimestamps map[string]*atomic.Int64 + coolDownDeadline *atomic.Int64 +} + +func (m *Tracker) RemoveAttributionMetricsForUser(userID, attribution string) { + m.activeSeriesPerUserAttribution.DeleteLabelValues(userID, attribution) + m.receivedSamplesAttribution.DeleteLabelValues(userID, attribution) + m.discardedSampleAttribution.DeleteLabelValues(userID, attribution) +} + +func NewCostAttributionTracker(reg prometheus.Registerer, trackedLabel string) *Tracker { + m := &Tracker{ + trackedLabel: trackedLabel, + attributionTimestamps: map[string]*atomic.Int64{}, + coolDownDeadline: atomic.NewInt64(0), + discardedSampleAttribution: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_discarded_samples_attribution_total", + Help: "The total number of samples that were discarded per attribution.", + }, []string{"user", trackedLabel}), + receivedSamplesAttribution: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_received_samples_attribution_total", + Help: "The total number of samples that were received per attribution.", + }, []string{"user", trackedLabel}), + activeSeriesPerUserAttribution: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_active_series_attribution", + Help: "The total number of active series per user and attribution.", + }, []string{"user", trackedLabel}), + } + return m +} + type CostAttribution struct { - mu sync.RWMutex - timestampsPerUser map[string]map[string]*atomic.Int64 // map[user][group] -> timestamp - coolDownDeadline map[string]*atomic.Int64 - limits *validation.Overrides + mu sync.RWMutex + trackers map[string]*Tracker + limits *validation.Overrides + reg prometheus.Registerer } -func NewCostAttribution(limits *validation.Overrides) *CostAttribution { +func NewCostAttribution(limits *validation.Overrides, reg prometheus.Registerer) *CostAttribution { return &CostAttribution{ - timestampsPerUser: map[string]map[string]*atomic.Int64{}, - coolDownDeadline: map[string]*atomic.Int64{}, - limits: limits, + trackers: map[string]*Tracker{}, + limits: limits, + reg: reg, } } // UpdateAttributionTimestampForUser function is only guaranteed to update to the // timestamp provided even if it is smaller than the existing value func (ag *CostAttribution) UpdateAttributionTimestampForUser(userID, attribution string, now time.Time) { - ts := now.UnixNano() - ag.mu.RLock() - if groupTs := ag.timestampsPerUser[userID][attribution]; groupTs != nil { - ag.mu.RUnlock() - groupTs.Store(ts) + // If the limit is set to 0, we don't need to track the attribution + if ag.limits.MaxCostAttributionPerUser(userID) <= 0 { return } - ag.mu.RUnlock() + ts := now.UnixNano() ag.mu.Lock() - defer ag.mu.Unlock() - - if ag.timestampsPerUser[userID] == nil { - ag.timestampsPerUser[userID] = map[string]*atomic.Int64{attribution: atomic.NewInt64(ts)} - return + // create new tracker if not exists + if _, exists := ag.trackers[userID]; !exists { + ag.trackers[userID] = NewCostAttributionTracker(ag.reg, ag.limits.CostAttributionLabel(userID)) } - - if groupTs := ag.timestampsPerUser[userID][attribution]; groupTs != nil { + ag.mu.Unlock() + ag.mu.RLock() + if groupTs := ag.trackers[userID].attributionTimestamps[attribution]; groupTs != nil { groupTs.Store(ts) return } - - ag.timestampsPerUser[userID][attribution] = atomic.NewInt64(ts) + ag.mu.RUnlock() + ag.mu.Lock() + defer ag.mu.Unlock() + ag.trackers[userID].attributionTimestamps[attribution] = atomic.NewInt64(ts) } func (ag *CostAttribution) purgeInactiveAttributionsForUser(userID string, deadline int64) []string { ag.mu.RLock() var inactiveAttributions []string - attributionTimestamps := ag.timestampsPerUser[userID] + if ag.trackers[userID] == nil || ag.trackers[userID].attributionTimestamps == nil { + return nil + } + attributionTimestamps := ag.trackers[userID].attributionTimestamps for attr, ts := range attributionTimestamps { if ts.Load() <= deadline { inactiveAttributions = append(inactiveAttributions, attr) @@ -80,9 +119,9 @@ func (ag *CostAttribution) purgeInactiveAttributionsForUser(userID string, deadl for i := 0; i < len(inactiveAttributions); { inactiveAttribution := inactiveAttributions[i] - groupTs := ag.timestampsPerUser[userID][inactiveAttribution] + groupTs := ag.trackers[userID].attributionTimestamps[inactiveAttribution] if groupTs != nil && groupTs.Load() <= deadline { - delete(ag.timestampsPerUser[userID], inactiveAttribution) + delete(ag.trackers[userID].attributionTimestamps, inactiveAttribution) i++ } else { inactiveAttributions[i] = inactiveAttributions[len(inactiveAttributions)-1] @@ -93,10 +132,10 @@ func (ag *CostAttribution) purgeInactiveAttributionsForUser(userID string, deadl return inactiveAttributions } -func (ca *CostAttribution) purgeInactiveAttributions(inactiveTimeout time.Duration, cleanupFuncs ...func(string, string)) { +func (ca *CostAttribution) purgeInactiveAttributions(inactiveTimeout time.Duration) { ca.mu.RLock() - userIDs := make([]string, 0, len(ca.timestampsPerUser)) - for userID := range ca.timestampsPerUser { + userIDs := make([]string, 0, len(ca.trackers)) + for userID := range ca.trackers { userIDs = append(userIDs, userID) } ca.mu.RUnlock() @@ -105,32 +144,37 @@ func (ca *CostAttribution) purgeInactiveAttributions(inactiveTimeout time.Durati for _, userID := range userIDs { inactiveAttributions := ca.purgeInactiveAttributionsForUser(userID, currentTime.Add(-inactiveTimeout).UnixNano()) for _, attribution := range inactiveAttributions { - for _, cleanupFn := range cleanupFuncs { - cleanupFn(userID, attribution) - } + ca.trackers[userID].RemoveAttributionMetricsForUser(userID, attribution) } } } -func (ca *CostAttribution) attributionLimitExceeded(userID, attribution string, now time.Time) bool { +func (ca *CostAttribution) attributionLimitExceeded(userID, attribution string) bool { // if we are still at the cooldown period, we will consider the limit reached ca.mu.RLock() defer ca.mu.RUnlock() + // if the user is not exist, we don't need to check the limit + if ca.trackers[userID] == nil { + return false + } - if v, exists := ca.coolDownDeadline[userID]; exists && v.Load() > now.UnixNano() { + now := time.Now() + if v := ca.trackers[userID].coolDownDeadline; v != nil && v.Load() > now.UnixNano() { return true } // if the user attribution is already exist and we are not in the cooldown period, we don't need to check the limit - _, exists := ca.timestampsPerUser[userID][attribution] + _, exists := ca.trackers[userID].attributionTimestamps[attribution] if exists { return false } // if the user has reached the limit, we will set the cooldown period which is 20 minutes - maxReached := len(ca.timestampsPerUser[userID]) >= ca.limits.MaxCostAttributionPerUser(userID) + maxReached := len(ca.trackers[userID].attributionTimestamps) >= ca.limits.MaxCostAttributionPerUser(userID) if maxReached { - ca.coolDownDeadline[userID].Store(time.Now().Add(20 * time.Minute).UnixNano()) + ca.mu.Lock() + ca.trackers[userID].coolDownDeadline.Store(now.Add(20 * time.Minute).UnixNano()) + ca.mu.Unlock() return true } @@ -141,7 +185,6 @@ type CostAttributionCleanupService struct { services.Service logger log.Logger costAttribution *CostAttribution - cleanupFuncs []func(userID, attribution string) inactiveTimeout time.Duration invalidValue string } @@ -150,10 +193,9 @@ type CostAttributionMetricsCleaner interface { RemoveAttributionMetricsForUser(userID, attribution string) } -func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides, cleanupFns ...func(string, string)) *CostAttributionCleanupService { +func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides, reg prometheus.Registerer) *CostAttributionCleanupService { s := &CostAttributionCleanupService{ - costAttribution: NewCostAttribution(limits), - cleanupFuncs: cleanupFns, + costAttribution: NewCostAttribution(limits, reg), inactiveTimeout: inactiveTimeout, logger: logger, invalidValue: "__unaccounted__", @@ -163,12 +205,55 @@ func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Dura return s } +// IncrementReceivedSamples increments the received samples counter for a given user and attribution +func (s *CostAttributionCleanupService) IncrementReceivedSamples(userID, attribution string, value float64) { + attribution = s.GetUserAttribution(userID, attribution) + s.costAttribution.mu.RLock() + defer s.costAttribution.mu.RUnlock() + if tracker, exists := s.costAttribution.trackers[userID]; exists { + tracker.receivedSamplesAttribution.WithLabelValues(userID, attribution).Add(value) + } +} + +// IncrementDiscardedSamples increments the discarded samples counter for a given user and attribution +func (s *CostAttributionCleanupService) IncrementDiscardedSamples(userID, attribution string, value float64) { + attribution = s.GetUserAttribution(userID, attribution) + s.costAttribution.mu.RLock() + defer s.costAttribution.mu.RUnlock() + if tracker, exists := s.costAttribution.trackers[userID]; exists { + tracker.discardedSampleAttribution.WithLabelValues(userID, attribution).Add(value) + } +} + +// SetActiveSeries sets the active series gauge for a given user and attribution +func (s *CostAttributionCleanupService) SetActiveSeries(userID, attribution string, value float64) { + attribution = s.GetUserAttribution(userID, attribution) + s.costAttribution.mu.RLock() + defer s.costAttribution.mu.RUnlock() + if tracker, exists := s.costAttribution.trackers[userID]; exists { + tracker.activeSeriesPerUserAttribution.WithLabelValues(userID, attribution).Set(value) + } +} + +func (s *CostAttributionCleanupService) GetUserAttribution(userID, attribution string) string { + // not tracking cost attribution for this user, this shouldn't happen + if s.costAttribution.limits.MaxCostAttributionPerUser(userID) <= 0 { + return attribution + } + if s.costAttribution.attributionLimitExceeded(userID, attribution) { + return s.invalidValue + } + return attribution +} + func (s *CostAttributionCleanupService) UpdateAttributionTimestamp(user, attribution string, now time.Time) string { // empty label is not normal, if user set attribution label, the metrics send has to include the label if attribution == "" { - attribution = s.invalidValue - level.Error(s.logger).Log("msg", fmt.Sprintf("set attribution label to \"%s\" since missing cost attribution label in metrics", s.invalidValue)) - } else if s.costAttribution.attributionLimitExceeded(user, attribution, now) { + level.Error(s.logger).Log("msg", "set attribution label to \"\" since missing cost attribution label in metrics") + return attribution + } + + if s.costAttribution.attributionLimitExceeded(user, attribution) { attribution = s.invalidValue level.Error(s.logger).Log("msg", fmt.Sprintf("set attribution label to \"%s\" since user has reached the limit of cost attribution labels", s.invalidValue)) } @@ -178,12 +263,6 @@ func (s *CostAttributionCleanupService) UpdateAttributionTimestamp(user, attribu } func (s *CostAttributionCleanupService) iteration(_ context.Context) error { - s.costAttribution.purgeInactiveAttributions(s.inactiveTimeout, s.cleanupFuncs...) + s.costAttribution.purgeInactiveAttributions(s.inactiveTimeout) return nil } - -// Register registers the cleanup function from metricsCleaner to be called during each cleanup iteration. -// This function is NOT thread safe -func (s *CostAttributionCleanupService) Register(metricsCleaner CostAttributionMetricsCleaner) { - s.cleanupFuncs = append(s.cleanupFuncs, metricsCleaner.RemoveAttributionMetricsForUser) -}