Skip to content

Commit

Permalink
seperate registry for cost attribution metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Oct 2, 2024
1 parent 788216a commit dd7e2a4
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 126 deletions.
10 changes: 10 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -18161,6 +18161,16 @@
"fieldValue": null,
"fieldDefaultValue": null
},
{
"kind": "field",
"name": "custom_registry_path",
"required": false,
"desc": "",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldType": "string",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "timeseries_unmarshal_caching_optimization_enabled",
Expand Down
3 changes: 1 addition & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Name: "cortex_distributor_received_samples_total",
Help: "The total number of received samples, excluding rejected and deduped samples.",
}, []string{"user", "attrib"}),

receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_received_exemplars_total",
Help: "The total number of received exemplars, excluding rejected and deduped exemplars.",
Expand Down Expand Up @@ -1684,7 +1683,7 @@ func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID st
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
if costAttributionLabel != "" {
attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel), now)
attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels), now)
costAttribution[attribution]++
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/activeseries/active_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestIsLabelValueActive(t *testing.T) {
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil)

memPostings := index.NewMemPostings()
for i, l := range series {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestNativeHistogramPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -62,7 +62,7 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestNativeHistogramPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestNativeHistogramPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/activeseries/active_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -58,7 +58,7 @@ func TestPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", "", nil, 0)
activeSeries := NewActiveSeries(&asmodel.Matchers{}, time.Duration(ttl), "foo", nil)

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
27 changes: 9 additions & 18 deletions pkg/ingester/activeseries/active_series.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,12 @@ type ActiveSeries struct {
matchers *asmodel.Matchers
lastMatchersUpdate time.Time

costAttributionLabel string
costAttributionSvc *costattribution.CostAttributionCleanupService
costAttributionSvc *costattribution.CostAttributionCleanupService

// The duration after which series become inactive.
// Also used to determine if enough time has passed since configuration reload for valid results.
timeout time.Duration
userID string
maxCostAttributionPerUser int
timeout time.Duration
userID string
}

// seriesStripe holds a subset of the series timestamps for a single tenant.
Expand All @@ -80,7 +78,6 @@ type seriesStripe struct {
activeNativeHistogramBuckets uint32 // Number of buckets in active native histogram entries in this stripe. Only decreased during purge or clear.
activeMatchingNativeHistogramBuckets []uint32 // Number of buckets in active native histogram entries in this stripe matching each matcher of the configured Matchers.
userID string
costAttributionLabel string
// here the attribution values map, it maps the attribute value to its index, so we can increment the counter directly,
// so in each entry, we keep the index of the value only, instead of keeping the string value
costAttributionValues map[string]uint32
Expand All @@ -100,20 +97,16 @@ func NewActiveSeries(
asm *asmodel.Matchers,
timeout time.Duration,
userID string,
costAttributionLabel string,
costAttributionSvc *costattribution.CostAttributionCleanupService,
maxCostAttributionPerUser int,
) *ActiveSeries {
c := &ActiveSeries{
matchers: asm, timeout: timeout, userID: userID,
costAttributionLabel: costAttributionLabel,
costAttributionSvc: costAttributionSvc,
maxCostAttributionPerUser: maxCostAttributionPerUser,
costAttributionSvc: costAttributionSvc,
}

// Stripes are pre-allocated so that we only read on them and no lock is required.
for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionLabel, costAttributionSvc)
c.stripes[i].reinitialize(asm, &c.deleted, userID, costAttributionSvc)
}

return c
Expand All @@ -130,7 +123,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *asmodel.Matchers, now time.Time) {
defer c.matchersMutex.Unlock()

for i := 0; i < numStripes; i++ {
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionLabel, c.costAttributionSvc)
c.stripes[i].reinitialize(asm, &c.deleted, c.userID, c.costAttributionSvc)
}
c.matchers = asm
c.lastMatchersUpdate = now
Expand Down Expand Up @@ -237,7 +230,7 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot
}

func (c *ActiveSeries) ActiveByAttributionValue() map[string]uint32 {
total := make(map[string]uint32, c.maxCostAttributionPerUser)
total := make(map[string]uint32, c.costAttributionSvc.GetUserAttributionLimit(c.userID))
for s := 0; s < numStripes; s++ {
c.stripes[s].mu.RLock()
for k, v := range c.stripes[s].costAttributionValues {
Expand Down Expand Up @@ -433,8 +426,8 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef

// here if we have a cost attribution label, we can split the serie count based on the value of the label
// we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly
if s.costAttributionLabel != "" {
attributionValue := s.costAttributionSvc.UpdateAttributionTimestamp(s.userID, series.Get(s.costAttributionLabel), time.Unix(0, nowNanos))
if s.costAttributionSvc != nil && s.costAttributionSvc.GetUserAttributionLabel(s.userID) != "" {
attributionValue := s.costAttributionSvc.UpdateAttributionTimestamp(s.userID, series, time.Unix(0, nowNanos))
s.costAttributionValues[attributionValue]++
e.attributionValue = attributionValue
}
Expand Down Expand Up @@ -466,7 +459,6 @@ func (s *seriesStripe) reinitialize(
asm *asmodel.Matchers,
deleted *deletedSeries,
userID string,
costAttributionLabel string,
costAttributionSvc *costattribution.CostAttributionCleanupService,
) {
s.mu.Lock()
Expand All @@ -483,7 +475,6 @@ func (s *seriesStripe) reinitialize(
s.activeMatching = resizeAndClear(len(asm.MatcherNames()), s.activeMatching)
s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms)
s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets)
s.costAttributionLabel = costAttributionLabel
s.costAttributionSvc = costAttributionSvc
}

Expand Down
28 changes: 14 additions & 14 deletions pkg/ingester/activeseries/active_series_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestActiveSeries_UpdateSeries_NoMatchers(t *testing.T) {
ref4, ls4 := storage.SeriesRef(4), labels.FromStrings("a", "4")
ref5 := storage.SeriesRef(5) // will be used for ls1 again.

c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", "", nil, 0)
c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", nil)
valid := c.Purge(time.Now())
assert.True(t, valid)
allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := c.ActiveWithMatchers()
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestActiveSeries_ContainsRef(t *testing.T) {
for ttl := 1; ttl <= len(series); ttl++ {
t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) {
mockedTime := time.Unix(int64(ttl), 0)
c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", "", nil, 0)
c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", nil)

// Update each series with a different timestamp according to each index
for i := 0; i < len(series); i++ {
Expand All @@ -230,7 +230,7 @@ func TestActiveSeries_ContainsRef(t *testing.T) {

func TestActiveSeries_UpdateSeries_WithMatchers(t *testing.T) {
asm := asmodel.NewMatchers(MustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`}))
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0)
c := NewActiveSeries(asm, DefaultTimeout, "foo", nil)
testUpdateSeries(t, c)
}

Expand Down Expand Up @@ -447,7 +447,7 @@ func testUpdateSeries(t *testing.T, c *ActiveSeries) {

func TestActiveSeries_UpdateSeries_Clear(t *testing.T) {
asm := asmodel.NewMatchers(MustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~"2|3|4"}`}))
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0)
c := NewActiveSeries(asm, DefaultTimeout, "foo", nil)
testUpdateSeries(t, c)

c.Clear()
Expand Down Expand Up @@ -488,7 +488,7 @@ func TestActiveSeries_ShouldCorrectlyHandleHashCollisions(t *testing.T) {
ls1, ls2 := labelsWithHashCollision()
ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2)

c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", "", nil, 0)
c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", nil)
c.UpdateSeries(ls1, ref1, time.Now(), -1)
c.UpdateSeries(ls2, ref2, time.Now(), -1)

Expand Down Expand Up @@ -516,7 +516,7 @@ func TestActiveSeries_Purge_NoMatchers(t *testing.T) {
for ttl := 1; ttl <= len(series); ttl++ {
t.Run(fmt.Sprintf("ttl: %d", ttl), func(t *testing.T) {
mockedTime := time.Unix(int64(ttl), 0)
c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", "", nil, 0)
c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", nil)

for i := 0; i < len(series); i++ {
c.UpdateSeries(series[i], refs[i], time.Unix(int64(i), 0), -1)
Expand Down Expand Up @@ -562,7 +562,7 @@ func TestActiveSeries_Purge_WithMatchers(t *testing.T) {
t.Run(fmt.Sprintf("ttl=%d", ttl), func(t *testing.T) {
mockedTime := time.Unix(int64(ttl), 0)

c := NewActiveSeries(asm, 5*time.Minute, "foo", "", nil, 0)
c := NewActiveSeries(asm, 5*time.Minute, "foo", nil)

exp := len(series) - ttl
expMatchingSeries := 0
Expand Down Expand Up @@ -595,7 +595,7 @@ func TestActiveSeries_PurgeOpt(t *testing.T) {
ref1, ref2 := storage.SeriesRef(1), storage.SeriesRef(2)

currentTime := time.Now()
c := NewActiveSeries(&asmodel.Matchers{}, 59*time.Second, "foo", "", nil, 0)
c := NewActiveSeries(&asmodel.Matchers{}, 59*time.Second, "foo", nil)

c.UpdateSeries(ls1, ref1, currentTime.Add(-2*time.Minute), -1)
c.UpdateSeries(ls2, ref2, currentTime, -1)
Expand Down Expand Up @@ -631,7 +631,7 @@ func TestActiveSeries_ReloadSeriesMatchers(t *testing.T) {
asm := asmodel.NewMatchers(MustNewCustomTrackersConfigFromMap(t, map[string]string{"foo": `{a=~.*}`}))

currentTime := time.Now()
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0)
c := NewActiveSeries(asm, DefaultTimeout, "foo", nil)

valid := c.Purge(currentTime)
assert.True(t, valid)
Expand Down Expand Up @@ -697,7 +697,7 @@ func TestActiveSeries_ReloadSeriesMatchers_LessMatchers(t *testing.T) {
}))

currentTime := time.Now()
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0)
c := NewActiveSeries(asm, DefaultTimeout, "foo", nil)
valid := c.Purge(currentTime)
assert.True(t, valid)
allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers()
Expand Down Expand Up @@ -736,7 +736,7 @@ func TestActiveSeries_ReloadSeriesMatchers_SameSizeNewLabels(t *testing.T) {

currentTime := time.Now()

c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0)
c := NewActiveSeries(asm, DefaultTimeout, "foo", nil)
valid := c.Purge(currentTime)
assert.True(t, valid)
allActive, activeMatching, _, _, _, _ := c.ActiveWithMatchers()
Expand Down Expand Up @@ -789,7 +789,7 @@ func benchmarkActiveSeriesUpdateSeriesConcurrency(b *testing.B, numSeries, numGo
var (
// Run the active series tracker with an active timeout = 0 so that the Purge() will always
// purge the series.
c = NewActiveSeries(&asmodel.Matchers{}, 0, "foo", "", nil, 0)
c = NewActiveSeries(&asmodel.Matchers{}, 0, "foo", nil)
updateGroup = &sync.WaitGroup{}
purgeGroup = &sync.WaitGroup{}
start = make(chan struct{})
Expand Down Expand Up @@ -927,7 +927,7 @@ func BenchmarkActiveSeries_UpdateSeries(b *testing.B) {

b.ResetTimer()
for i := 0; i < b.N; i++ {
c := NewActiveSeries(asm, DefaultTimeout, "foo", "", nil, 0)
c := NewActiveSeries(asm, DefaultTimeout, "foo", nil)
for round := 0; round <= tt.nRounds; round++ {
for ix := 0; ix < tt.nSeries; ix++ {
c.UpdateSeries(series[ix], refs[ix], time.Unix(0, now), -1)
Expand All @@ -952,7 +952,7 @@ func benchmarkPurge(b *testing.B, twice bool) {
const numExpiresSeries = numSeries / 25

currentTime := time.Now()
c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", "", nil, 0)
c := NewActiveSeries(&asmodel.Matchers{}, DefaultTimeout, "foo", nil)

series := [numSeries]labels.Labels{}
refs := [numSeries]storage.SeriesRef{}
Expand Down
17 changes: 2 additions & 15 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1299,15 +1299,9 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
costLabel := i.limits.CostAttributionLabel(userID)
handleAppendError := func(err error, timestamp int64, labels []mimirpb.LabelAdapter) bool {
if costLabel != "" {
costAttrib := ""
for _, label := range labels {
if label.Name == costLabel {
costAttrib = label.Value
}
}
// get the label value and update the timestamp,
// if the cordianlity is reached or we are currently in cooldown period, function would returned __unaccounted__
costAttrib = i.costAttributionSvc.UpdateAttributionTimestamp(userID, costAttrib, startAppend)
costAttrib := i.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(labels), startAppend)
stats.failedSamplesAttribution[costAttrib]++
}

Expand Down Expand Up @@ -1418,12 +1412,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre
var costAttrib string
// when cost attribution label is set
if costLabel != "" {
for _, label := range ts.Labels {
if label.Name == costLabel {
costAttrib = label.Value
}
}
costAttrib = i.costAttributionSvc.UpdateAttributionTimestamp(userID, costAttrib, startAppend)
costAttrib = i.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels), startAppend)
}

// The labels must be sorted (in our case, it's guaranteed a write request
Expand Down Expand Up @@ -2680,9 +2669,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD
asmodel.NewMatchers(matchersConfig),
i.cfg.ActiveSeriesMetrics.IdleTimeout,
userID,
i.limits.CostAttributionLabel(userID),
i.costAttributionSvc,
i.limits.MaxCostAttributionPerUser(userID),
),
seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()),
ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod),
Expand Down
3 changes: 2 additions & 1 deletion pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ type Config struct {
ContinuousTest continuoustest.Config `yaml:"-"`
OverridesExporter exporter.Config `yaml:"overrides_exporter"`

Common CommonConfig `yaml:"common"`
Common CommonConfig `yaml:"common"`
CustomRegistryPath string `yaml:"custom_registry_path" category:"advanced"`

TimeseriesUnmarshalCachingOptimizationEnabled bool `yaml:"timeseries_unmarshal_caching_optimization_enabled" category:"experimental"`
}
Expand Down
Loading

0 comments on commit dd7e2a4

Please sign in to comment.