diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index de122b57816..05f8160d15d 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -53,6 +53,7 @@ import ( "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/storage/ingest" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/costattribution" "github.com/grafana/mimir/pkg/util/globalerror" mimir_limiter "github.com/grafana/mimir/pkg/util/limiter" util_math "github.com/grafana/mimir/pkg/util/math" @@ -105,7 +106,7 @@ type Distributor struct { distributorsLifecycler *ring.BasicLifecycler distributorsRing *ring.Ring healthyInstancesCount *atomic.Uint32 - costAttributionSvc *util.CostAttributionCleanupService + costAttributionSvc *costattribution.CostAttributionCleanupService // For handling HA replicas. HATracker *haTracker @@ -306,7 +307,7 @@ func (m *PushMetrics) deleteUserMetrics(user string) { } // New constructs a new Distributor -func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *util.CostAttributionCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) { +func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionClenaupService *costattribution.CostAttributionCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) { clientMetrics := ingester_client.NewMetrics(reg) if cfg.IngesterClientFactory == nil { cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) { @@ -1683,7 +1684,7 @@ func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID st receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms) receivedExemplars += len(ts.TimeSeries.Exemplars) if costAttributionLabel != "" { - attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), now, costAttributionSize) + attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), now) costAttribution[attribution]++ } } diff --git a/pkg/ingester/activeseries/active_labels_test.go b/pkg/ingester/activeseries/active_labels_test.go index 7580ebbc31c..dc919618a84 100644 --- a/pkg/ingester/activeseries/active_labels_test.go +++ b/pkg/ingester/activeseries/active_labels_test.go @@ -41,11 +41,7 @@ func TestIsLabelValueActive(t *testing.T) { labels.FromStrings("a", "5"), } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} -<<<<<<< HEAD - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "") -======= - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) memPostings := index.NewMemPostings() for i, l := range series { diff --git a/pkg/ingester/activeseries/active_native_histogram_postings_test.go b/pkg/ingester/activeseries/active_native_histogram_postings_test.go index f9d339e2dd5..e6cc823e502 100644 --- a/pkg/ingester/activeseries/active_native_histogram_postings_test.go +++ b/pkg/ingester/activeseries/active_native_histogram_postings_test.go @@ -26,16 +26,7 @@ func TestNativeHistogramPostings_Expand(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) -<<<<<<< HEAD -<<<<<<< HEAD - - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "") -======= - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) ->>>>>>> 3c422a8f57 (new service for tracking cost attribution) -======= - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -71,15 +62,7 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) -<<<<<<< HEAD -<<<<<<< HEAD - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "") -======= - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) ->>>>>>> 3c422a8f57 (new service for tracking cost attribution) -======= - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -123,15 +106,7 @@ func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) -<<<<<<< HEAD -<<<<<<< HEAD - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "") -======= - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) ->>>>>>> 3c422a8f57 (new service for tracking cost attribution) -======= - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -171,15 +146,7 @@ func TestNativeHistogramPostings_Seek(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) -<<<<<<< HEAD -<<<<<<< HEAD - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "") -======= - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) ->>>>>>> 3c422a8f57 (new service for tracking cost attribution) -======= - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -216,15 +183,7 @@ func TestNativeHistogramPostings_SeekToEnd(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) -<<<<<<< HEAD -<<<<<<< HEAD - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "") -======= - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil) ->>>>>>> 3c422a8f57 (new service for tracking cost attribution) -======= - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { diff --git a/pkg/ingester/activeseries/active_postings_test.go b/pkg/ingester/activeseries/active_postings_test.go index a08a02aa18a..5c6d2711212 100644 --- a/pkg/ingester/activeseries/active_postings_test.go +++ b/pkg/ingester/activeseries/active_postings_test.go @@ -26,11 +26,7 @@ func TestPostings_Expand(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) -<<<<<<< HEAD - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "") -======= - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -62,11 +58,7 @@ func TestPostings_Seek(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) -<<<<<<< HEAD - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "") -======= - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { @@ -98,11 +90,7 @@ func TestPostings_SeekToEnd(t *testing.T) { } allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5} storagePostings := index.NewListPostings(allStorageRefs) -<<<<<<< HEAD - activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "") -======= - activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0) // Update each series at a different time according to its index. for i := range allStorageRefs { diff --git a/pkg/ingester/activeseries/active_series.go b/pkg/ingester/activeseries/active_series.go index 925cb191a95..77fbd3770f3 100644 --- a/pkg/ingester/activeseries/active_series.go +++ b/pkg/ingester/activeseries/active_series.go @@ -11,12 +11,14 @@ import ( "sync" "time" - "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/costattribution" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/util/zeropool" "go.uber.org/atomic" + + asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model" ) const ( @@ -45,22 +47,22 @@ type ActiveSeries struct { // matchersMutex protects matchers and lastMatchersUpdate. matchersMutex sync.RWMutex - matchers *Matchers + matchers *asmodel.Matchers lastMatchersUpdate time.Time - costAttributionLabel string - costAttributionSvc *util.CostAttributionCleanupService - maxCostAttributionPerUser int + costAttributionLabel string + costAttributionSvc *costattribution.CostAttributionCleanupService // The duration after which series become inactive. // Also used to determine if enough time has passed since configuration reload for valid results. - timeout time.Duration - userID string + timeout time.Duration + userID string + maxCostAttributionPerUser int } // seriesStripe holds a subset of the series timestamps for a single tenant. type seriesStripe struct { - matchers *Matchers + matchers *asmodel.Matchers deleted *deletedSeries @@ -68,8 +70,7 @@ type seriesStripe struct { // Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated // without holding the lock -- hence the atomic). oldestEntryTs atomic.Int64 - costAttributionSvc *util.CostAttributionCleanupService - maxCostAttributionPerUser int + costAttributionSvc *costattribution.CostAttributionCleanupService mu sync.RWMutex refs map[storage.SeriesRef]seriesEntry active uint32 // Number of active entries in this stripe. Only decreased during purge or clear. @@ -87,20 +88,20 @@ type seriesStripe struct { // seriesEntry holds a timestamp for single series. type seriesEntry struct { - nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe. - matches preAllocDynamicSlice // Index of the matcher matching - numNativeHistogramBuckets int // Number of buckets in native histogram series, -1 if not a native histogram. + nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe. + matches asmodel.PreAllocDynamicSlice // Index of the matcher matching + numNativeHistogramBuckets int // Number of buckets in native histogram series, -1 if not a native histogram. // keep the value corresponding the label configured in serieStripe deleted bool // This series was marked as deleted, so before purging we need to remove the refence to it from the deletedSeries. attributionValue string } func NewActiveSeries( - asm *Matchers, + asm *asmodel.Matchers, timeout time.Duration, userID string, costAttributionLabel string, - costAttributionSvc *util.CostAttributionCleanupService, + costAttributionSvc *costattribution.CostAttributionCleanupService, maxCostAttributionPerUser int, ) *ActiveSeries { c := &ActiveSeries{ @@ -112,7 +113,7 @@ func NewActiveSeries( // Stripes are pre-allocated so that we only read on them and no lock is required. for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvc, maxCostAttributionPerUser) + c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvc) } return c @@ -124,18 +125,18 @@ func (c *ActiveSeries) CurrentMatcherNames() []string { return c.matchers.MatcherNames() } -func (c *ActiveSeries) ReloadMatchers(asm *Matchers, now time.Time) { +func (c *ActiveSeries) ReloadMatchers(asm *asmodel.Matchers, now time.Time) { c.matchersMutex.Lock() defer c.matchersMutex.Unlock() for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionLabel, c.costAttributionSvc, c.maxCostAttributionPerUser) + c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionLabel, c.costAttributionSvc) } c.matchers = asm c.lastMatchersUpdate = now } -func (c *ActiveSeries) CurrentConfig() CustomTrackersConfig { +func (c *ActiveSeries) CurrentConfig() asmodel.CustomTrackersConfig { c.matchersMutex.RLock() defer c.matchersMutex.RUnlock() return c.matchers.Config() @@ -372,21 +373,21 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef entry, ok := s.refs[ref] if ok { if entry.numNativeHistogramBuckets != numNativeHistogramBuckets { - matches := s.matchers.matches(series) - matchesLen := matches.len() + matches := s.matchers.Matches(series) + matchesLen := matches.Len() if numNativeHistogramBuckets >= 0 && entry.numNativeHistogramBuckets >= 0 { // change number of buckets but still a histogram diff := numNativeHistogramBuckets - entry.numNativeHistogramBuckets s.activeNativeHistogramBuckets = uint32(int(s.activeNativeHistogramBuckets) + diff) for i := 0; i < matchesLen; i++ { - s.activeMatchingNativeHistogramBuckets[matches.get(i)] = uint32(int(s.activeMatchingNativeHistogramBuckets[matches.get(i)]) + diff) + s.activeMatchingNativeHistogramBuckets[matches.Get(i)] = uint32(int(s.activeMatchingNativeHistogramBuckets[matches.Get(i)]) + diff) } } else if numNativeHistogramBuckets >= 0 { // change from float to histogram s.activeNativeHistograms++ s.activeNativeHistogramBuckets += uint32(numNativeHistogramBuckets) for i := 0; i < matchesLen; i++ { - match := matches.get(i) + match := matches.Get(i) s.activeMatchingNativeHistograms[match]++ s.activeMatchingNativeHistogramBuckets[match] += uint32(numNativeHistogramBuckets) } @@ -395,7 +396,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef s.activeNativeHistograms-- s.activeNativeHistogramBuckets -= uint32(entry.numNativeHistogramBuckets) for i := 0; i < matchesLen; i++ { - match := matches.get(i) + match := matches.Get(i) s.activeMatchingNativeHistograms[match]-- s.activeMatchingNativeHistogramBuckets[match] -= uint32(entry.numNativeHistogramBuckets) } @@ -406,8 +407,8 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef return entry.nanos, false } - matches := s.matchers.matches(series) - matchesLen := matches.len() + matches := s.matchers.Matches(series) + matchesLen := matches.Len() s.active++ @@ -416,7 +417,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef s.activeNativeHistogramBuckets += uint32(numNativeHistogramBuckets) } for i := 0; i < matchesLen; i++ { - match := matches.get(i) + match := matches.Get(i) s.activeMatching[match]++ if numNativeHistogramBuckets >= 0 { s.activeMatchingNativeHistograms[match]++ @@ -433,7 +434,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef // here if we have a cost attribution label, we can split the serie count based on the value of the label // we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly if s.costAttributionLabel != "" { - attributionValue := s.costAttributionSvc.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Unix(0, nowNanos), s.maxCostAttributionPerUser) + attributionValue := s.costAttributionSvc.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Unix(0, nowNanos)) s.costAttributionValues[attributionValue]++ e.attributionValue = attributionValue } @@ -462,12 +463,11 @@ func (s *seriesStripe) clear() { // Reinitialize assigns new matchers and corresponding size activeMatching slices. func (s *seriesStripe) reinitialize( - asm *Matchers, + asm *asmodel.Matchers, deleted *deletedSeries, userID string, costAttributionLabel string, - costAttributionSvc *util.CostAttributionCleanupService, - maxCostAttributionPerUser int, + costAttributionSvc *costattribution.CostAttributionCleanupService, ) { s.mu.Lock() defer s.mu.Unlock() @@ -478,7 +478,6 @@ func (s *seriesStripe) reinitialize( s.costAttributionValues = map[string]uint32{} s.activeNativeHistograms = 0 s.activeNativeHistogramBuckets = 0 - s.maxCostAttributionPerUser = maxCostAttributionPerUser s.matchers = asm s.userID = userID s.activeMatching = resizeAndClear(len(asm.MatcherNames()), s.activeMatching) @@ -528,9 +527,9 @@ func (s *seriesStripe) purge(keepUntil time.Time) { if entry.attributionValue != "" { s.costAttributionValues[entry.attributionValue]++ } - ml := entry.matches.len() + ml := entry.matches.Len() for i := 0; i < ml; i++ { - match := entry.matches.get(i) + match := entry.matches.Get(i) s.activeMatching[match]++ if entry.numNativeHistogramBuckets >= 0 { s.activeMatchingNativeHistograms[match]++ @@ -573,9 +572,9 @@ func (s *seriesStripe) remove(ref storage.SeriesRef) { s.activeNativeHistograms-- s.activeNativeHistogramBuckets -= uint32(entry.numNativeHistogramBuckets) } - ml := entry.matches.len() + ml := entry.matches.Len() for i := 0; i < ml; i++ { - match := entry.matches.get(i) + match := entry.matches.Get(i) s.activeMatching[match]-- if entry.numNativeHistogramBuckets >= 0 { s.activeMatchingNativeHistograms[match]-- diff --git a/pkg/ingester/activeseries/active_series_test.go b/pkg/ingester/activeseries/active_series_test.go index 8d21d3d3c9d..0aa2b8a09e0 100644 --- a/pkg/ingester/activeseries/active_series_test.go +++ b/pkg/ingester/activeseries/active_series_test.go @@ -38,11 +38,7 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) { ref4, ls4 := storage.SeriesRef(4), labels.FromStrings("a", "4") ref5 := storage.SeriesRef(5) // will be used for ls1 again. -<<<<<<< HEAD - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "") -======= - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", "", nil, 0) valid := c.Purge(time.Now()) assert.True(t, valid) allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := c.ActiveWithMatchers() @@ -207,11 +203,7 @@ func TestActiveSeries_ContainsRef(t *testing.T) { for ttl := 1; ttl <= len(series); ttl++ { t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) -<<<<<<< HEAD - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "") -======= - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", "", nil, 0) // Update each series with a different timestamp according to each index for i := 0; i < len(series); i++ { @@ -237,12 +229,8 @@ func TestActiveSeries_ContainsRef(t *testing.T) { } func TestActiveSeries_UpdateSeries_WithMatchers(t *testing.T) { - asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`})) -<<<<<<< HEAD - c := NewActiveSeries(asm, DefaultTimeout, "foo", "") -======= + asm := asmodel.NewMatchers(MustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`})) c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) testUpdateSeries(t, c) } @@ -458,12 +446,8 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) { } func TestActiveSeries_UpdateSeries_Clear(t *testing.T) { - asm := NewMatchers(mustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`})) -<<<<<<< HEAD - c := NewActiveSeries(asm, DefaultTimeout, "foo", "") -======= + asm := asmodel.NewMatchers(MustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`})) c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) testUpdateSeries(t, c) c.Clear() @@ -504,11 +488,7 @@ func TestActiveSeries_ShouldCorrectlyHandleHashCollisions(t *testing.T) { ls1, ls2 := labelsWithHashCollision() ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2) -<<<<<<< HEAD - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "") -======= - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", "", nil, 0) c.UpdateSeries(ls1, ref1, time.Now(), -1) c.UpdateSeries(ls2, ref2, time.Now(), -1) @@ -536,11 +516,7 @@ func TestActiveSeries_Purge_NoMatchers(t *testing.T) { for ttl := 1; ttl <= len(series); ttl++ { t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) { mockedTime := time.Unix(int64(ttl), 0) -<<<<<<< HEAD - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "") -======= - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", "", nil, 0) for i := 0; i < len(series); i++ { c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1) @@ -619,11 +595,7 @@ func TestActiveSeries_PurgeOpt(t *testing.T) { ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2) currentTime := time.Now() -<<<<<<< HEAD - c := NewActiveSeries(&Matchers{}, 59*time.Second, "foo", "") -======= - c := NewActiveSeries(&Matchers{}, 59*time.Second, "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + c := NewActiveSeries(&asmodel.Matchers{}, 59*time.Second, "foo", "", nil, 0) c.UpdateSeries(ls1, ref1, currentTime.Add(-2*time.Minute), -1) c.UpdateSeries(ls2, ref2, currentTime, -1) @@ -817,11 +789,7 @@ func benchmarkActiveSeriesUpdateSeriesConcurrency(b *testing.B, numSeries, numGo var ( // Run the active series tracker with an active timeout = 0 so that the Purge() will always // purge the series. -<<<<<<< HEAD - c = NewActiveSeries(&Matchers{}, 0, "foo", "") -======= - c = NewActiveSeries(&Matchers{}, 0, "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + c = NewActiveSeries(&asmodel.Matchers{}, 0, "foo", "", nil, 0) updateGroup = &sync.WaitGroup{} purgeGroup = &sync.WaitGroup{} start = make(chan struct{}) @@ -984,11 +952,7 @@ func benchmarkPurge(b *testing.B, twice bool) { const numExpiresSeries = numSeries / 25 currentTime := time.Now() -<<<<<<< HEAD - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "") -======= - c := NewActiveSeries(&Matchers{}, DefaultTimeout, "foo", "", nil, 0) ->>>>>>> 7e628c3508 (address comments) + c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", "", nil, 0) series := [numSeries]labels.Labels{} refs := [numSeries]storage.SeriesRef{} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index caa71fef859..e697c6f1060 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -63,6 +63,7 @@ import ( "github.com/grafana/mimir/pkg/storage/tsdb/block" "github.com/grafana/mimir/pkg/usagestats" "github.com/grafana/mimir/pkg/util" + "github.com/grafana/mimir/pkg/util/costattribution" "github.com/grafana/mimir/pkg/util/globalerror" "github.com/grafana/mimir/pkg/util/limiter" util_log "github.com/grafana/mimir/pkg/util/log" @@ -310,7 +311,7 @@ type Ingester struct { activeGroups *util.ActiveGroupsCleanupService - costAttributionSvc *util.CostAttributionCleanupService + costAttributionSvc *costattribution.CostAttributionCleanupService tsdbMetrics *tsdbMetrics @@ -374,13 +375,12 @@ func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus forceCompactTrigger: make(chan requestWithUsersAndCallback), shipTrigger: make(chan requestWithUsersAndCallback), seriesHashCache: hashcache.NewSeriesHashCache(cfg.BlocksStorageConfig.TSDB.SeriesHashCacheMaxBytes), - - errorSamplers: newIngesterErrSamplers(cfg.ErrorSampleRate), + errorSamplers: newIngesterErrSamplers(cfg.ErrorSampleRate), }, nil } // New returns an Ingester that uses Mimir block storage. -func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionCleanupService *util.CostAttributionCleanupService, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { +func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionCleanupService *costattribution.CostAttributionCleanupService, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { i, err := newIngester(cfg, limits, registerer, logger) if err != nil { return nil, err @@ -1307,7 +1307,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre } // get the label value and update the timestamp, // if the cordianlity is reached or we are currently in cooldown period, function would returned __unaccounted__ - costAttrib = i.costAttributionSvc.UpdateAttributionTimestamp(userID, costAttrib, startAppend, i.limits.MaxCostAttributionPerUser(userID)) + costAttrib = i.costAttributionSvc.UpdateAttributionTimestamp(userID, costAttrib, startAppend) stats.failedSamplesAttribution[costAttrib]++ } @@ -1423,7 +1423,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre costAttrib = label.Value } } - costAttrib = i.costAttributionSvc.UpdateAttributionTimestamp(userID, costAttrib, startAppend, i.limits.MaxCostAttributionPerUser(userID)) + costAttrib = i.costAttributionSvc.UpdateAttributionTimestamp(userID, costAttrib, startAppend) } // The labels must be sorted (in our case, it's guaranteed a write request diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 02d4f3afa7b..1443cb22705 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -74,6 +74,7 @@ import ( "github.com/grafana/mimir/pkg/usagestats" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/activitytracker" + "github.com/grafana/mimir/pkg/util/costattribution" util_log "github.com/grafana/mimir/pkg/util/log" "github.com/grafana/mimir/pkg/util/noauth" "github.com/grafana/mimir/pkg/util/process" @@ -712,7 +713,7 @@ type Mimir struct { TenantLimits validation.TenantLimits Overrides *validation.Overrides ActiveGroupsCleanup *util.ActiveGroupsCleanupService - CostAttributionCleanup *util.CostAttributionCleanupService + CostAttributionCleanup *costattribution.CostAttributionCleanupService Distributor *distributor.Distributor Ingester *ingester.Ingester diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index 00821498c1a..35f0a8040f0 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -60,6 +60,7 @@ import ( "github.com/grafana/mimir/pkg/usagestats" "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/activitytracker" + "github.com/grafana/mimir/pkg/util/costattribution" util_log "github.com/grafana/mimir/pkg/util/log" "github.com/grafana/mimir/pkg/util/validation" "github.com/grafana/mimir/pkg/util/validation/exporter" @@ -650,7 +651,7 @@ func (t *Mimir) initActiveGroupsCleanupService() (services.Service, error) { } func (t *Mimir) initCostAttributionService() (services.Service, error) { - t.CostAttributionCleanup = util.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger) + t.CostAttributionCleanup = costattribution.NewCostAttributionCleanupService(3*time.Minute, t.Cfg.CostAttributionEvictionInterval, util_log.Logger, t.Overrides) return t.CostAttributionCleanup, nil } diff --git a/pkg/util/cost_attribution.go b/pkg/util/costattribution/cost_attribution.go similarity index 87% rename from pkg/util/cost_attribution.go rename to pkg/util/costattribution/cost_attribution.go index d8ab19c74de..6af2f39fefd 100644 --- a/pkg/util/cost_attribution.go +++ b/pkg/util/costattribution/cost_attribution.go @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-only -package util +package costattribution import ( "context" @@ -11,6 +11,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/services" + "github.com/grafana/mimir/pkg/util/validation" "go.uber.org/atomic" ) @@ -18,12 +19,14 @@ type CostAttribution struct { mu sync.RWMutex timestampsPerUser map[string]map[string]*atomic.Int64 // map[user][group] -> timestamp coolDownDeadline map[string]*atomic.Int64 + limits *validation.Overrides } -func NewCostAttribution() *CostAttribution { +func NewCostAttribution(limits *validation.Overrides) *CostAttribution { return &CostAttribution{ timestampsPerUser: map[string]map[string]*atomic.Int64{}, coolDownDeadline: map[string]*atomic.Int64{}, + limits: limits, } } @@ -109,7 +112,7 @@ func (ca *CostAttribution) purgeInactiveAttributions(inactiveTimeout time.Durati } } -func (ca *CostAttribution) attributionLimitExceeded(userID, attribution string, now time.Time, limit int) bool { +func (ca *CostAttribution) attributionLimitExceeded(userID, attribution string, now time.Time) bool { // if we are still at the cooldown period, we will consider the limit reached ca.mu.RLock() defer ca.mu.RUnlock() @@ -125,7 +128,7 @@ func (ca *CostAttribution) attributionLimitExceeded(userID, attribution string, } // if the user has reached the limit, we will set the cooldown period which is 20 minutes - maxReached := len(ca.timestampsPerUser[userID]) >= limit + maxReached := len(ca.timestampsPerUser[userID]) >= ca.limits.MaxCostAttributionPerUser(userID) if maxReached { ca.coolDownDeadline[userID].Store(time.Now().Add(20 * time.Minute).UnixNano()) return true @@ -147,9 +150,9 @@ type CostAttributionMetricsCleaner interface { RemoveAttributionMetricsForUser(userID, attribution string) } -func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, cleanupFns ...func(string, string)) *CostAttributionCleanupService { +func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides, cleanupFns ...func(string, string)) *CostAttributionCleanupService { s := &CostAttributionCleanupService{ - costAttribution: NewCostAttribution(), + costAttribution: NewCostAttribution(limits), cleanupFuncs: cleanupFns, inactiveTimeout: inactiveTimeout, logger: logger, @@ -160,14 +163,14 @@ func NewCostAttributionCleanupService(cleanupInterval, inactiveTimeout time.Dura return s } -func (s *CostAttributionCleanupService) UpdateAttributionTimestamp(user, attribution string, now time.Time, limit int) string { +func (s *CostAttributionCleanupService) UpdateAttributionTimestamp(user, attribution string, now time.Time) string { // empty label is not normal, if user set attribution label, the metrics send has to include the label if attribution == "" { attribution = s.invalidValue level.Error(s.logger).Log("msg", fmt.Sprintf("set attribution label to \"%s\" since missing cost attribution label in metrics", s.invalidValue)) - } else if s.costAttribution.attributionLimitExceeded(user, attribution, now, limit) { + } else if s.costAttribution.attributionLimitExceeded(user, attribution, now) { attribution = s.invalidValue - level.Error(s.logger).Log("msg", "set attribution label to \"%s\" since user has reached the limit of cost attribution labels", s.invalidValue) + level.Error(s.logger).Log("msg", fmt.Sprintf("set attribution label to \"%s\" since user has reached the limit of cost attribution labels", s.invalidValue)) } s.costAttribution.UpdateAttributionTimestampForUser(user, attribution, now) diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 30483c4d750..1d14d03baca 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -143,13 +143,10 @@ type Limits struct { // User defined label to give the option of subdividing specific metrics by another label SeparateMetricsGroupLabel string `yaml:"separate_metrics_group_label" json:"separate_metrics_group_label" category:"experimental"` -<<<<<<< HEAD -======= // User defined label to give the cost distribution by values of the label CostAttributionLabel string `yaml:"cost_attribution_label" json:"cost_attribution_label" category:"experimental"` MaxCostAttributionPerUser int `yaml:"max_cost_attribution_per_user" json:"max_cost_attribution_per_user" category:"experimental"` ->>>>>>> 3c422a8f57 (new service for tracking cost attribution) // Querier enforced limits. MaxChunksPerQuery int `yaml:"max_fetched_chunks_per_query" json:"max_fetched_chunks_per_query"` MaxEstimatedChunksPerQueryMultiplier float64 `yaml:"max_estimated_fetched_chunks_per_query_multiplier" json:"max_estimated_fetched_chunks_per_query_multiplier" category:"experimental"` @@ -289,16 +286,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.OutOfOrderBlocksExternalLabelEnabled, "ingester.out-of-order-blocks-external-label-enabled", false, "Whether the shipper should label out-of-order blocks with an external label before uploading them. Setting this label will compact out-of-order blocks separately from non-out-of-order blocks") f.StringVar(&l.SeparateMetricsGroupLabel, "validation.separate-metrics-group-label", "", "Label used to define the group label for metrics separation. For each write request, the group is obtained from the first non-empty group label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'group' label with group label's value. Currently applies to the following metrics: cortex_discarded_samples_total") -<<<<<<< HEAD -<<<<<<< HEAD - -======= - f.StringVar(&l.CostAttributionLabel, "validation.cost-attribution-label", "", "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total") ->>>>>>> 3c422a8f57 (new service for tracking cost attribution) -======= f.StringVar(&l.CostAttributionLabel, "validation.cost-attribution-label", "", "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'attrib' cost attribution's label's value. Applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total.") f.IntVar(&l.MaxCostAttributionPerUser, "validation.max-cost-attribution-per-user", 0, "The maximum number of cost attribution labels per user, across the cluster. 0 to disable cost attribution.") ->>>>>>> 7e628c3508 (address comments) f.IntVar(&l.MaxChunksPerQuery, MaxChunksPerQueryFlag, 2e6, "Maximum number of chunks that can be fetched in a single query from ingesters and store-gateways. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.") f.Float64Var(&l.MaxEstimatedChunksPerQueryMultiplier, MaxEstimatedChunksPerQueryMultiplierFlag, 0, "Maximum number of chunks estimated to be fetched in a single query from ingesters and store-gateways, as a multiple of -"+MaxChunksPerQueryFlag+". This limit is enforced in the querier. Must be greater than or equal to 1, or 0 to disable.") f.IntVar(&l.MaxFetchedSeriesPerQuery, MaxSeriesPerQueryFlag, 0, "The maximum number of unique series for which a query can fetch samples from ingesters and store-gateways. This limit is enforced in the querier, ruler and store-gateway. 0 to disable") @@ -786,8 +775,6 @@ func (o *Overrides) SeparateMetricsGroupLabel(userID string) string { return o.getOverridesForUser(userID).SeparateMetricsGroupLabel } -<<<<<<< HEAD -======= func (o *Overrides) CostAttributionLabel(userID string) string { return o.getOverridesForUser(userID).CostAttributionLabel } @@ -796,7 +783,6 @@ func (o *Overrides) MaxCostAttributionPerUser(userID string) int { return o.getOverridesForUser(userID).MaxCostAttributionPerUser } ->>>>>>> 7e628c3508 (address comments) // IngestionTenantShardSize returns the ingesters shard size for a given user. func (o *Overrides) IngestionTenantShardSize(userID string) int { return o.getOverridesForUser(userID).IngestionTenantShardSize