From dd7e2a4fd478f80e5b83cc11aa12bcc8540faeae Mon Sep 17 00:00:00 2001 From: Ying WANG Date: Wed, 2 Oct 2024 17:08:56 +0200 Subject: [PATCH] seperate registry for cost attribution metrics --- cmd/mimir/config-descriptor.json | 10 + pkg/distributor/distributor.go | 3 +- .../activeseries/active_labels_test.go | 2 +- .../active_native_histogram_postings_test.go | 10 +- .../activeseries/active_postings_test.go | 6 +- pkg/ingester/activeseries/active_series.go | 27 +-- .../activeseries/active_series_test.go | 28 +-- pkg/ingester/ingester.go | 17 +- pkg/mimir/mimir.go | 3 +- pkg/mimir/modules.go | 19 +- pkg/util/costattribution/cost_attribution.go | 219 +++++++++++++----- 11 files changed, 218 insertions(+), 126 deletions(-) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index a8222277d87..79bd4bb05af 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -18161,6 +18161,16 @@ "fieldValue": null, "fieldDefaultValue": null }, + { + "kind": "field", + "name": "custom_registry_path", + "required": false, + "desc": "", + "fieldValue": null, + "fieldDefaultValue": "", + "fieldType": "string", + "fieldCategory": "advanced" + }, { "kind": "field", "name": "timeseries_unmarshal_caching_optimization_enabled", diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 05f8160d15d..c14b5a35306 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -358,7 +358,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove Name: "cortex_distributor_received_samples_total", Help: "The total number of received samples, excluding rejected and deduped samples.", }, []string{"user", "attrib"}), - receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_distributor_received_exemplars_total", Help: "The total number of received exemplars, excluding rejected and deduped exemplars.", @@ -1684,7 +1683,7 @@ func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID st receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms) receivedExemplars += len(ts.TimeSeries.Exemplars) if costAttributionLabel != "" { - attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), now) + attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels), now) costAttribution[attribution]++ } } diff --git a/pkg/ingester/activeseries/active_labels_test.go b/pkg/ingester/activeseries/active_labels_test.go index dc919618a84..266fb0e7079 100644 --- a/pkg/ingester/activeseries/active_labels_test.go +++ b/pkg/ingester/activeseries/active_labels_test.go @@ -41,7 +41,7 @@ func TestIsLabelValueActive(t *testing.T) { labels.FromStrings("a", "5"), } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil) memPostings := index.NewMemPostings() for i, l := range series { diff --git a/pkg/ingester/activeseries/active_native_histogram_postings_test.go b/pkg/ingester/activeseries/active_native_histogram_postings_test.go index e6cc823e502..56ebd0b1d52 100644 --- a/pkg/ingester/activeseries/active_native_histogram_postings_test.go +++ b/pkg/ingester/activeseries/active_native_histogram_postings_test.go @@ -26,7 +26,7 @@ func TestNativeHistogramPostings_Expand(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -62,7 +62,7 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -106,7 +106,7 @@ func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -146,7 +146,7 @@ func TestNativeHistogramPostings_Seek(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -183,7 +183,7 @@ func TestNativeHistogramPostings_SeekToEnd(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { diff --git a/pkg/ingester/activeseries/active_postings_test.go b/pkg/ingester/activeseries/active_postings_test.go index 5c6d2711212..2a6cf812aa4 100644 --- a/pkg/ingester/activeseries/active_postings_test.go +++ b/pkg/ingester/activeseries/active_postings_test.go @@ -26,7 +26,7 @@ func TestPostings_Expand(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -58,7 +58,7 @@ func TestPostings_Seek(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -90,7 +90,7 @@ func TestPostings_SeekToEnd(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil) // Update each series at a different time according to its index. for i := range allStorageRefs { diff --git a/pkg/ingester/activeseries/active_series.go b/pkg/ingester/activeseries/active_series.go index 77fbd3770f3..7cc2d39554b 100644 --- a/pkg/ingester/activeseries/active_series.go +++ b/pkg/ingester/activeseries/active_series.go @@ -50,14 +50,12 @@ type ActiveSeries struct { matchers *asmodel.Matchers lastMatchersUpdate time.Time - costAttributionLabel string - costAttributionSvc *costattribution.CostAttributionCleanupService + costAttributionSvc *costattribution.CostAttributionCleanupService // The duration after which series become inactive. // Also used to determine if enough time has passed since configuration reload for valid results. - timeout time.Duration - userID string - maxCostAttributionPerUser int + timeout time.Duration + userID string } // seriesStripe holds a subset of the series timestamps for a single tenant. @@ -80,7 +78,6 @@ type seriesStripe struct { activeNativeHistogramBuckets uint32 // Number of buckets in active native histogram entries in this stripe. Only decreased during purge or clear. activeMatchingNativeHistogramBuckets []uint32 // Number of buckets in active native histogram entries in this stripe matching each matcher of the configured Matchers. userID string - costAttributionLabel string // here the attribution values map, it maps the attribute value to its index, so we can increment the counter directly, // so in each entry, we keep the index of the value only, instead of keeping the string value costAttributionValues map[string]uint32 @@ -100,20 +97,16 @@ func NewActiveSeries( asm *asmodel.Matchers, timeout time.Duration, userID string, - costAttributionLabel string, costAttributionSvc *costattribution.CostAttributionCleanupService, - maxCostAttributionPerUser int, ) *ActiveSeries { c := &ActiveSeries{ matchers: asm, timeout: timeout, userID: userID, - costAttributionLabel: costAttributionLabel, - costAttributionSvc: costAttributionSvc, - maxCostAttributionPerUser: maxCostAttributionPerUser, + costAttributionSvc: costAttributionSvc, } // Stripes are pre-allocated so that we only read on them and no lock is required. for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvc) + c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionSvc) } return c @@ -130,7 +123,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *asmodel.Matchers, now time.Time) { defer c.matchersMutex.Unlock() for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionLabel, c.costAttributionSvc) + c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionSvc) } c.matchers = asm c.lastMatchersUpdate = now @@ -237,7 +230,7 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot } func (c *ActiveSeries) ActiveByAttributionValue() map[string]uint32 { - total := make(map[string]uint32, c.maxCostAttributionPerUser) + total := make(map[string]uint32, c.costAttributionSvc.GetUserAttributionLimit(c.userID)) for s := 0; s < numStripes; s++ { c.stripes[s].mu.RLock() for k, v := range c.stripes[s].costAttributionValues { @@ -433,8 +426,8 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef // here if we have a cost attribution label, we can split the serie count based on the value of the label // we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly - if s.costAttributionLabel != "" { - attributionValue := s.costAttributionSvc.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Unix(0, nowNanos)) + if s.costAttributionSvc != nil && s.costAttributionSvc.GetUserAttributionLabel(s.userID) != "" { + attributionValue := s.costAttributionSvc.UpdateAttributionTimestamp(s.userID, series, time.Unix(0, nowNanos)) s.costAttributionValues[attributionValue]++ e.attributionValue = attributionValue } @@ -466,7 +459,6 @@ func (s *seriesStripe) reinitialize( asm *asmodel.Matchers, deleted *deletedSeries, userID string, - costAttributionLabel string, costAttributionSvc *costattribution.CostAttributionCleanupService, ) { s.mu.Lock() @@ -483,7 +475,6 @@ func (s *seriesStripe) reinitialize( s.activeMatching = resizeAndClear(len(asm.MatcherNames()), s.activeMatching) s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms) s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets) - s.costAttributionLabel = costAttributionLabel s.costAttributionSvc = costAttributionSvc } diff --git a/pkg/ingester/activeseries/active_series_test.go b/pkg/ingester/activeseries/active_series_test.go index 0aa2b8a09e0..d7a9f4cfb93 100644 --- a/pkg/ingester/activeseries/active_series_test.go +++ b/pkg/ingester/activeseries/active_series_test.go @@ -38,7 +38,7 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { ref4, ls4 := storage.SeriesRef(4), labels.FromStrings("a", "4") ref5 := storage.SeriesRef(5) // will be used for ls1 again. - c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", "", nil, 0) + c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", nil) valid := c.Purge(time.Now()) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := c.ActiveWithMatchers() @@ -203,7 +203,7 @@ func TestActiveSeries_ContainsRef(t *testing.T) { for ttl := 1; ttl <= len(series); ttl++ { t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) - c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", "", nil, 0) + c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", nil) // Update each series with a different timestamp according to each index for i := 0; i < len(series); i++ { @@ -230,7 +230,7 @@ func TestActiveSeries_ContainsRef(t *testing.T) { func TestActiveSeries_UpdateSeries_WithMatchers(t *testing.T) { asm := asmodel.NewMatchers(MustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`})) - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) + c := NewActiveSeries(asm, DefaultTimeout, "foo", nil) testUpdateSeries(t, c) } @@ -447,7 +447,7 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { func TestActiveSeries_UpdateSeries_Clear(t *testing.T) { asm := asmodel.NewMatchers(MustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`})) - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) + c := NewActiveSeries(asm, DefaultTimeout, "foo", nil) testUpdateSeries(t, c) c.Clear() @@ -488,7 +488,7 @@ func TestActiveSeries_ShouldCorrectlyHandleHashCollisions(t *testing.T) { ls1, ls2 := labelsWithHashCollision() ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2) - c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", "", nil, 0) + c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", nil) c.UpdateSeries(ls1, ref1, time.Now(), -1) c.UpdateSeries(ls2, ref2, time.Now(), -1) @@ -516,7 +516,7 @@ func TestActiveSeries_Purge_NoMatchers(t *testing.T) { for ttl := 1; ttl <= len(series); ttl++ { t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) - c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", "", nil, 0) + c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", nil) for i := 0; i < len(series); i++ { c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1) @@ -562,7 +562,7 @@ func TestActiveSeries_Purge_WithMatchers(t *testing.T) { t.Run(fmt.Sprintf("ttl=%d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) - c := NewActiveSeries(asm, 5*time.Minute, "foo", "", nil, 0) + c := NewActiveSeries(asm, 5*time.Minute, "foo", nil) exp := len(series) - ttl expMatchingSeries := 0 @@ -595,7 +595,7 @@ func TestActiveSeries_PurgeOpt(t *testing.T) { ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2) currentTime := time.Now() - c := NewActiveSeries(&asmodel.Matchers{}, 59*time.Second, "foo", "", nil, 0) + c := NewActiveSeries(&asmodel.Matchers{}, 59*time.Second, "foo", nil) c.UpdateSeries(ls1, ref1, currentTime.Add(-2*time.Minute), -1) c.UpdateSeries(ls2, ref2, currentTime, -1) @@ -631,7 +631,7 @@ func TestActiveSeries_ReloadSeriesMatchers(t *testing.T) { asm := asmodel.NewMatchers(MustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~.*}`})) currentTime := time.Now() - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) + c := NewActiveSeries(asm, DefaultTimeout, "foo", nil) valid := c.Purge(currentTime) assert.True(t, valid) @@ -697,7 +697,7 @@ func TestActiveSeries_ReloadSeriesMatchers_LessMatchers(t *testing.T) { })) currentTime := time.Now() - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) + c := NewActiveSeries(asm, DefaultTimeout, "foo", nil) valid := c.Purge(currentTime) assert.True(t, valid) allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() @@ -736,7 +736,7 @@ func TestActiveSeries_ReloadSeriesMatchers_SameSizeNewLabels(t *testing.T) { currentTime := time.Now() - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) + c := NewActiveSeries(asm, DefaultTimeout, "foo", nil) valid := c.Purge(currentTime) assert.True(t, valid) allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers() @@ -789,7 +789,7 @@ func benchmarkActiveSeriesUpdateSeriesConcurrency(b *testing.B, numSeries, numGo var ( // Run the active series tracker with an active timeout = 0 so that the Purge() will always // purge the series. - c = NewActiveSeries(&asmodel.Matchers{}, 0, "foo", "", nil, 0) + c = NewActiveSeries(&asmodel.Matchers{}, 0, "foo", nil) updateGroup = &sync.WaitGroup{} purgeGroup = &sync.WaitGroup{} start = make(chan struct{}) @@ -927,7 +927,7 @@ func BenchmarkActiveSeries_UpdateSeries(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) + c := NewActiveSeries(asm, DefaultTimeout, "foo", nil) for round := 0; round <= tt.nRounds; round++ { for ix := 0; ix < tt.nSeries; ix++ { c.UpdateSeries(series[ix], refs[ix], time.Unix(0, now), -1) @@ -952,7 +952,7 @@ func benchmarkPurge(b *testing.B, twice bool) { const numExpiresSeries = numSeries / 25 currentTime := time.Now() - c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", "", nil, 0) + c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", nil) series := [numSeries]labels.Labels{} refs := [numSeries]storage.SeriesRef{} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index e697c6f1060..008cba2ca84 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1299,15 +1299,9 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre costLabel := i.limits.CostAttributionLabel(userID) handleAppendError := func(err error, timestamp int64, labels []mimirpb.LabelAdapter) bool { if costLabel != "" { - costAttrib := "" - for _, label := range labels { - if label.Name == costLabel { - costAttrib = label.Value - } - } // get the label value and update the timestamp, // if the cordianlity is reached or we are currently in cooldown period, function would returned __unaccounted__ - costAttrib = i.costAttributionSvc.UpdateAttributionTimestamp(userID, costAttrib, startAppend) + costAttrib := i.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(labels), startAppend) stats.failedSamplesAttribution[costAttrib]++ } @@ -1418,12 +1412,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre var costAttrib string // when cost attribution label is set if costLabel != "" { - for _, label := range ts.Labels { - if label.Name == costLabel { - costAttrib = label.Value - } - } - costAttrib = i.costAttributionSvc.UpdateAttributionTimestamp(userID, costAttrib, startAppend) + costAttrib = i.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels), startAppend) } // The labels must be sorted (in our case, it's guaranteed a write request @@ -2680,9 +2669,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD asmodel.NewMatchers(matchersConfig), i.cfg.ActiveSeriesMetrics.IdleTimeout, userID, - i.limits.CostAttributionLabel(userID), i.costAttributionSvc, - i.limits.MaxCostAttributionPerUser(userID), ), seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 1443cb22705..3efd678b1a8 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -145,7 +145,8 @@ type Config struct { ContinuousTest continuoustest.Config `yaml:"-"` OverridesExporter exporter.Config `yaml:"overrides_exporter"` - Common CommonConfig `yaml:"common"` + Common CommonConfig `yaml:"common"` + CustomRegistryPath string `yaml:"custom_registry_path" category:"advanced"` TimeseriesUnmarshalCachingOptimizationEnabled bool `yaml:"timeseries_unmarshal_caching_optimization_enabled" category:"experimental"` } diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 35f0a8040f0..8da436becf1 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -66,6 +66,7 @@ import ( "github.com/grafana/mimir/pkg/util/validation/exporter" "github.com/grafana/mimir/pkg/util/version" "github.com/grafana/mimir/pkg/vault" + "github.com/prometheus/client_golang/prometheus/promhttp" ) // The various modules that make up Mimir. @@ -473,10 +474,6 @@ func (t *Mimir) initDistributorService() (serv services.Service, err error) { t.ActiveGroupsCleanup.Register(t.Distributor) } - if t.CostAttributionCleanup != nil { - t.CostAttributionCleanup.Register(t.Distributor) - } - return t.Distributor, nil } @@ -651,7 +648,16 @@ func (t *Mimir) initActiveGroupsCleanupService() (services.Service, error) { } func (t *Mimir) initCostAttributionService() (services.Service, error) { - t.CostAttributionCleanup = costattribution.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides) + if t.Cfg.CustomRegistryPath != "" { + customRegistry := prometheus.NewRegistry() + // Register the custom registry with the provided URL. + // This allows users to expose custom metrics on a separate endpoint. + // This is useful when users want to expose metrics that are not part of the default Mimir metrics. + http.Handle(t.Cfg.CustomRegistryPath, promhttp.HandlerFor(customRegistry, promhttp.HandlerOpts{Registry: customRegistry})) + t.CostAttributionCleanup = costattribution.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides, customRegistry) + return t.CostAttributionCleanup, nil + } + t.CostAttributionCleanup = costattribution.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides, t.Registerer) return t.CostAttributionCleanup, nil } @@ -675,9 +681,6 @@ func (t *Mimir) initIngesterService() (serv services.Service, err error) { t.ActiveGroupsCleanup.Register(t.Ingester) } - if t.CostAttributionCleanup != nil { - t.CostAttributionCleanup.Register(t.Ingester) - } return t.Ingester, nil } diff --git a/pkg/util/costattribution/cost_attribution.go b/pkg/util/costattribution/cost_attribution.go index 6af2f39fefd..d972fdc7af2 100644 --- a/pkg/util/costattribution/cost_attribution.go +++ b/pkg/util/costattribution/cost_attribution.go @@ -12,77 +12,119 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/dskit/services" "github.com/grafana/mimir/pkg/util/validation" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" "go.uber.org/atomic" ) +type Tracker struct { + trackedLabel string + activeSeriesPerUserAttribution *prometheus.GaugeVec + receivedSamplesAttribution *prometheus.CounterVec + discardedSampleAttribution *prometheus.CounterVec + attributionTimestamps map[string]*atomic.Int64 + coolDownDeadline *atomic.Int64 +} + +func (m *Tracker) RemoveAttributionMetricsForUser(userID, attribution string) { + m.activeSeriesPerUserAttribution.DeleteLabelValues(userID, attribution) + m.receivedSamplesAttribution.DeleteLabelValues(userID, attribution) + m.discardedSampleAttribution.DeleteLabelValues(userID, attribution) +} + +func NewCostAttributionTracker(reg prometheus.Registerer, trackedLabel string) *Tracker { + m := &Tracker{ + trackedLabel: trackedLabel, + attributionTimestamps: map[string]*atomic.Int64{}, + coolDownDeadline: atomic.NewInt64(0), + discardedSampleAttribution: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_discarded_samples_attribution_total", + Help: "The total number of samples that were discarded per attribution.", + }, []string{"user", trackedLabel}), + receivedSamplesAttribution: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_received_samples_attribution_total", + Help: "The total number of samples that were received per attribution.", + }, []string{"user", trackedLabel}), + activeSeriesPerUserAttribution: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_active_series_attribution", + Help: "The total number of active series per user and attribution.", + }, []string{"user", trackedLabel}), + } + return m +} + type CostAttribution struct { - mu sync.RWMutex - timestampsPerUser map[string]map[string]*atomic.Int64 // map[user][group] -> timestamp - coolDownDeadline map[string]*atomic.Int64 - limits *validation.Overrides + mu sync.RWMutex + trackers map[string]*Tracker + limits *validation.Overrides + reg prometheus.Registerer } -func NewCostAttribution(limits *validation.Overrides) *CostAttribution { +func NewCostAttribution(limits *validation.Overrides, reg prometheus.Registerer) *CostAttribution { return &CostAttribution{ - timestampsPerUser: map[string]map[string]*atomic.Int64{}, - coolDownDeadline: map[string]*atomic.Int64{}, - limits: limits, + trackers: make(map[string]*Tracker), + limits: limits, + reg: reg, + mu: sync.RWMutex{}, } } // UpdateAttributionTimestampForUser function is only guaranteed to update to the // timestamp provided even if it is smaller than the existing value -func (ag *CostAttribution) UpdateAttributionTimestampForUser(userID, attribution string, now time.Time) { - ts := now.UnixNano() - ag.mu.RLock() - if groupTs := ag.timestampsPerUser[userID][attribution]; groupTs != nil { - ag.mu.RUnlock() - groupTs.Store(ts) +func (ca *CostAttribution) UpdateAttributionTimestampForUser(userID, attribution string, now time.Time) { + // If the limit is set to 0, we don't need to track the attribution + if ca.limits.MaxCostAttributionPerUser(userID) <= 0 { return } - ag.mu.RUnlock() - ag.mu.Lock() - defer ag.mu.Unlock() - - if ag.timestampsPerUser[userID] == nil { - ag.timestampsPerUser[userID] = map[string]*atomic.Int64{attribution: atomic.NewInt64(ts)} - return + ts := now.UnixNano() + ca.mu.Lock() + // create new tracker if not exists + if _, exists := ca.trackers[userID]; !exists { + // the attribution label and values should be managed by cache + ca.trackers[userID] = NewCostAttributionTracker(ca.reg, ca.limits.CostAttributionLabel(userID)) } - - if groupTs := ag.timestampsPerUser[userID][attribution]; groupTs != nil { + ca.mu.Unlock() + ca.mu.RLock() + if groupTs := ca.trackers[userID].attributionTimestamps[attribution]; groupTs != nil { groupTs.Store(ts) return } - - ag.timestampsPerUser[userID][attribution] = atomic.NewInt64(ts) + ca.mu.RUnlock() + ca.mu.Lock() + defer ca.mu.Unlock() + ca.trackers[userID].attributionTimestamps[attribution] = atomic.NewInt64(ts) } -func (ag *CostAttribution) purgeInactiveAttributionsForUser(userID string, deadline int64) []string { - ag.mu.RLock() +func (ca *CostAttribution) purgeInactiveAttributionsForUser(userID string, deadline int64) []string { + ca.mu.RLock() var inactiveAttributions []string - attributionTimestamps := ag.timestampsPerUser[userID] + if ca.trackers[userID] == nil || ca.trackers[userID].attributionTimestamps == nil { + return nil + } + attributionTimestamps := ca.trackers[userID].attributionTimestamps for attr, ts := range attributionTimestamps { if ts.Load() <= deadline { inactiveAttributions = append(inactiveAttributions, attr) } } - ag.mu.RUnlock() + ca.mu.RUnlock() if len(inactiveAttributions) == 0 { return nil } // Cleanup inactive groups - ag.mu.Lock() - defer ag.mu.Unlock() + ca.mu.Lock() + defer ca.mu.Unlock() for i := 0; i < len(inactiveAttributions); { inactiveAttribution := inactiveAttributions[i] - groupTs := ag.timestampsPerUser[userID][inactiveAttribution] + groupTs := ca.trackers[userID].attributionTimestamps[inactiveAttribution] if groupTs != nil && groupTs.Load() <= deadline { - delete(ag.timestampsPerUser[userID], inactiveAttribution) + delete(ca.trackers[userID].attributionTimestamps, inactiveAttribution) i++ } else { inactiveAttributions[i] = inactiveAttributions[len(inactiveAttributions)-1] @@ -93,10 +135,10 @@ func (ag *CostAttribution) purgeInactiveAttributionsForUser(userID string, deadl return inactiveAttributions } -func (ca *CostAttribution) purgeInactiveAttributions(inactiveTimeout time.Duration, cleanupFuncs ...func(string, string)) { +func (ca *CostAttribution) purgeInactiveAttributions(inactiveTimeout time.Duration) { ca.mu.RLock() - userIDs := make([]string, 0, len(ca.timestampsPerUser)) - for userID := range ca.timestampsPerUser { + userIDs := make([]string, 0, len(ca.trackers)) + for userID := range ca.trackers { userIDs = append(userIDs, userID) } ca.mu.RUnlock() @@ -105,32 +147,37 @@ func (ca *CostAttribution) purgeInactiveAttributions(inactiveTimeout time.Durati for _, userID := range userIDs { inactiveAttributions := ca.purgeInactiveAttributionsForUser(userID, currentTime.Add(-inactiveTimeout).UnixNano()) for _, attribution := range inactiveAttributions { - for _, cleanupFn := range cleanupFuncs { - cleanupFn(userID, attribution) - } + ca.trackers[userID].RemoveAttributionMetricsForUser(userID, attribution) } } } -func (ca *CostAttribution) attributionLimitExceeded(userID, attribution string, now time.Time) bool { +func (ca *CostAttribution) attributionLimitExceeded(userID, attribution string) bool { // if we are still at the cooldown period, we will consider the limit reached ca.mu.RLock() defer ca.mu.RUnlock() + // if the user is not exist, we don't need to check the limit + if ca.trackers[userID] == nil { + return false + } - if v, exists := ca.coolDownDeadline[userID]; exists && v.Load() > now.UnixNano() { + now := time.Now() + if v := ca.trackers[userID].coolDownDeadline; v != nil && v.Load() > now.UnixNano() { return true } // if the user attribution is already exist and we are not in the cooldown period, we don't need to check the limit - _, exists := ca.timestampsPerUser[userID][attribution] + _, exists := ca.trackers[userID].attributionTimestamps[attribution] if exists { return false } // if the user has reached the limit, we will set the cooldown period which is 20 minutes - maxReached := len(ca.timestampsPerUser[userID]) >= ca.limits.MaxCostAttributionPerUser(userID) + maxReached := len(ca.trackers[userID].attributionTimestamps) >= ca.limits.MaxCostAttributionPerUser(userID) if maxReached { - ca.coolDownDeadline[userID].Store(time.Now().Add(20 * time.Minute).UnixNano()) + ca.mu.Lock() + ca.trackers[userID].coolDownDeadline.Store(now.Add(20 * time.Minute).UnixNano()) + ca.mu.Unlock() return true } @@ -141,7 +188,6 @@ type CostAttributionCleanupService struct { services.Service logger log.Logger costAttribution *CostAttribution - cleanupFuncs []func(userID, attribution string) inactiveTimeout time.Duration invalidValue string } @@ -150,10 +196,9 @@ type CostAttributionMetricsCleaner interface { RemoveAttributionMetricsForUser(userID, attribution string) } -func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides, cleanupFns ...func(string, string)) *CostAttributionCleanupService { +func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides, reg prometheus.Registerer) *CostAttributionCleanupService { s := &CostAttributionCleanupService{ - costAttribution: NewCostAttribution(limits), - cleanupFuncs: cleanupFns, + costAttribution: NewCostAttribution(limits, reg), inactiveTimeout: inactiveTimeout, logger: logger, invalidValue: "__unaccounted__", @@ -163,12 +208,74 @@ func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Dura return s } -func (s *CostAttributionCleanupService) UpdateAttributionTimestamp(user, attribution string, now time.Time) string { +// IncrementReceivedSamples increments the received samples counter for a given user and attribution +func (s *CostAttributionCleanupService) IncrementReceivedSamples(userID, attribution string, value float64) { + attribution = s.GetUserAttribution(userID, attribution) + s.costAttribution.mu.RLock() + defer s.costAttribution.mu.RUnlock() + if tracker, exists := s.costAttribution.trackers[userID]; exists { + tracker.receivedSamplesAttribution.WithLabelValues(userID, attribution).Add(value) + } +} + +// IncrementDiscardedSamples increments the discarded samples counter for a given user and attribution +func (s *CostAttributionCleanupService) IncrementDiscardedSamples(userID, attribution string, value float64) { + attribution = s.GetUserAttribution(userID, attribution) + s.costAttribution.mu.RLock() + defer s.costAttribution.mu.RUnlock() + if tracker, exists := s.costAttribution.trackers[userID]; exists { + tracker.discardedSampleAttribution.WithLabelValues(userID, attribution).Add(value) + } +} + +// SetActiveSeries sets the active series gauge for a given user and attribution +func (s *CostAttributionCleanupService) SetActiveSeries(userID, attribution string, value float64) { + attribution = s.GetUserAttribution(userID, attribution) + s.costAttribution.mu.RLock() + defer s.costAttribution.mu.RUnlock() + if tracker, exists := s.costAttribution.trackers[userID]; exists { + tracker.activeSeriesPerUserAttribution.WithLabelValues(userID, attribution).Set(value) + } +} + +func (s *CostAttributionCleanupService) GetUserAttribution(userID, attribution string) string { + // not tracking cost attribution for this user, this shouldn't happen + if s.costAttribution.limits.MaxCostAttributionPerUser(userID) <= 0 { + return attribution + } + if s.costAttribution.attributionLimitExceeded(userID, attribution) { + return s.invalidValue + } + return attribution +} + +func (s *CostAttributionCleanupService) GetUserAttributionLabel(userID string) string { + s.costAttribution.mu.RLock() + defer s.costAttribution.mu.RUnlock() + if s.costAttribution != nil { + if val, exists := s.costAttribution.trackers[userID]; exists { + return val.trackedLabel + } + } + return "" +} + +func (s *CostAttributionCleanupService) GetUserAttributionLimit(userID string) int { + return s.costAttribution.limits.MaxCostAttributionPerUser(userID) +} + +func (s *CostAttributionCleanupService) UpdateAttributionTimestamp(user string, lbs labels.Labels, now time.Time) string { + if s.costAttribution.trackers[user] == nil || s.costAttribution.trackers[user].trackedLabel == "" { + return "" + } + attribution := lbs.Get(s.costAttribution.trackers[user].trackedLabel) // empty label is not normal, if user set attribution label, the metrics send has to include the label if attribution == "" { - attribution = s.invalidValue - level.Error(s.logger).Log("msg", fmt.Sprintf("set attribution label to \"%s\" since missing cost attribution label in metrics", s.invalidValue)) - } else if s.costAttribution.attributionLimitExceeded(user, attribution, now) { + level.Error(s.logger).Log("msg", "set attribution label to \"\" since missing cost attribution label in metrics") + return attribution + } + + if s.costAttribution.attributionLimitExceeded(user, attribution) { attribution = s.invalidValue level.Error(s.logger).Log("msg", fmt.Sprintf("set attribution label to \"%s\" since user has reached the limit of cost attribution labels", s.invalidValue)) } @@ -178,12 +285,6 @@ func (s *CostAttributionCleanupService) UpdateAttributionTimestamp(user, attribu } func (s *CostAttributionCleanupService) iteration(_ context.Context) error { - s.costAttribution.purgeInactiveAttributions(s.inactiveTimeout, s.cleanupFuncs...) + s.costAttribution.purgeInactiveAttributions(s.inactiveTimeout) return nil } - -// Register registers the cleanup function from metricsCleaner to be called during each cleanup iteration. -// This function is NOT thread safe -func (s *CostAttributionCleanupService) Register(metricsCleaner CostAttributionMetricsCleaner) { - s.cleanupFuncs = append(s.cleanupFuncs, metricsCleaner.RemoveAttributionMetricsForUser) -}