diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index a7ff4d8a459..572644b80b7 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -4292,6 +4292,28 @@ "fieldType": "int", "fieldCategory": "experimental" }, + { + "kind": "field", + "name": "cost_attribution_labels", + "required": false, + "desc": "List of labels used to define the cost attribution. This label will be included in the specified distributor and ingester metrics for each write request, allowing them to be distinguished by the label. The label applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total. Set to an empty string to disable cost attribution.", + "fieldValue": null, + "fieldDefaultValue": "", + "fieldFlag": "validation.cost-attribution-labels", + "fieldType": "string", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "max_cost_attribution_per_user", + "required": false, + "desc": "Maximum number of cost attribution labels allowed per user.", + "fieldValue": null, + "fieldDefaultValue": 0, + "fieldFlag": "validation.max-cost-attribution-per-user", + "fieldType": "int", + "fieldCategory": "experimental" + }, { "kind": "field", "name": "ruler_evaluation_delay_duration", @@ -18128,6 +18150,17 @@ "fieldValue": null, "fieldDefaultValue": null }, + { + "kind": "field", + "name": "custom_registry_path", + "required": false, + "desc": "Defines a custom path for the registry. When specified, Mimir will expose cost attribution metrics through this custom path, if not specified, cost attribution metrics won't be exposed.", + "fieldValue": null, + "fieldDefaultValue": "", + "fieldFlag": "custom-registry-path", + "fieldType": "string", + "fieldCategory": "advanced" + }, { "kind": "field", "name": "timeseries_unmarshal_caching_optimization_enabled", @@ -18138,6 +18171,28 @@ "fieldFlag": "timeseries-unmarshal-caching-optimization-enabled", "fieldType": "boolean", "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "cost_attribution_eviction_interval", + "required": false, + "desc": "Time interval at which inactive cost attributions will be evicted from the cache.", + "fieldValue": null, + "fieldDefaultValue": 1800000000000, + "fieldFlag": "cost-attribution-eviction-interval", + "fieldType": "duration", + "fieldCategory": "experimental" + }, + { + "kind": "field", + "name": "cost_attribution_cool_down_duration", + "required": false, + "desc": "Duration during which any cost attribution for a user will be marked as __overflow__ after exceeding the specified limit, prior to resetting the cache.", + "fieldValue": null, + "fieldDefaultValue": 1200000000000, + "fieldFlag": "cost-attribution-cool-down-duration", + "fieldType": "duration", + "fieldCategory": "experimental" } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 5dc7e93cca9..9739f14a411 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1133,6 +1133,12 @@ Usage of ./cmd/mimir/mimir: Expands ${var} or $var in config according to the values of the environment variables. -config.file value Configuration file to load. + -cost-attribution-cool-down-duration duration + [experimental] Duration during which any cost attribution for a user will be marked as __overflow__ after exceeding the specified limit, prior to resetting the cache. (default 20m0s) + -cost-attribution-eviction-interval duration + [experimental] Time interval at which inactive cost attributions will be evicted from the cache. (default 30m0s) + -custom-registry-path string + Defines a custom path for the registry. When specified, Mimir will expose cost attribution metrics through this custom path, if not specified, cost attribution metrics won't be exposed. -debug.block-profile-rate int Fraction of goroutine blocking events that are reported in the blocking profile. 1 to include every blocking event in the profile, 0 to disable. -debug.mutex-profile-fraction int @@ -3059,10 +3065,14 @@ Usage of ./cmd/mimir/mimir: Enable anonymous usage reporting. (default true) -usage-stats.installation-mode string Installation mode. Supported values: custom, helm, jsonnet. (default "custom") + -validation.cost-attribution-labels comma-separated-list-of-strings + [experimental] List of labels used to define the cost attribution. This label will be included in the specified distributor and ingester metrics for each write request, allowing them to be distinguished by the label. The label applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total. Set to an empty string to disable cost attribution. -validation.create-grace-period duration Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + creation_grace_period)'. This configuration is enforced in the distributor and ingester. (default 10m) -validation.enforce-metadata-metric-name Enforce every metadata has a metric name. (default true) + -validation.max-cost-attribution-per-user int + [experimental] Maximum number of cost attribution labels allowed per user. -validation.max-label-names-per-series int Maximum number of label names per series. (default 30) -validation.max-length-label-name int diff --git a/pkg/costattribution/manager.go b/pkg/costattribution/manager.go new file mode 100644 index 00000000000..cda08a6eaa3 --- /dev/null +++ b/pkg/costattribution/manager.go @@ -0,0 +1,159 @@ +package costattribution + +import ( + "context" + "sort" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/mimir/pkg/util/validation" +) + +const ( + missingValue = "__missing__" + overflowValue = "__overflow__" +) + +type Manager struct { + services.Service + logger log.Logger + inactiveTimeout time.Duration + limits *validation.Overrides + cooldownTimeout time.Duration + + // mu protects the trackersByUserID map + tlock sync.RWMutex + trackersByUserID map[string]*Tracker +} + +// NewManager creates a new cost attribution manager. which is responsible for managing the cost attribution of series. +// It will clean up inactive series and update the cost attribution of series every 3 minutes. +func NewManager(cleanupInterval, inactiveTimeout time.Duration, cooldownTimeout time.Duration, logger log.Logger, limits *validation.Overrides) *Manager { + s := &Manager{ + trackersByUserID: make(map[string]*Tracker), + limits: limits, + tlock: sync.RWMutex{}, + cooldownTimeout: cooldownTimeout, + inactiveTimeout: inactiveTimeout, + logger: logger, + } + + s.Service = services.NewTimerService(cleanupInterval, nil, s.iteration, nil).WithName("cost attribution manager") + return s +} + +func (m *Manager) iteration(_ context.Context) error { + m.purgeInactiveAttributions(m.inactiveTimeout) + return nil +} + +// EnabledForUser returns true if the cost attribution is enabled for the user +func (m *Manager) EnabledForUser(userID string) bool { + return len(m.limits.CostAttributionLabel(userID)) > 0 +} + +func (m *Manager) TrackerForUser(userID string) *Tracker { + // if cost attribution is not enabled, return nil + if !m.EnabledForUser(userID) { + return nil + } + m.tlock.Lock() + defer m.tlock.Unlock() + + // if not exists, create a new tracker + if _, exists := m.trackersByUserID[userID]; !exists { + m.trackersByUserID[userID], _ = newTracker(m.limits.CostAttributionLabel(userID), m.limits.MaxCostAttributionPerUser(userID)) + } + return m.trackersByUserID[userID] +} + +func (m *Manager) Collect(out chan<- prometheus.Metric) { + m.tlock.RLock() + defer m.tlock.RUnlock() + for _, tracker := range m.trackersByUserID { + tracker.Collect(out) + } +} + +// Describe implements prometheus.Collector. +func (m *Manager) Describe(chan<- *prometheus.Desc) { + // this is an unchecked collector +} + +// deleteUserTracer is delete user tracker since the user is disabled for cost attribution +func (m *Manager) deleteUserTracer(userID string) { + m.tlock.Lock() + defer m.tlock.Unlock() + if _, exists := m.trackersByUserID[userID]; !exists { + return + } + // clean up tracker metrics and delete the tracker + m.trackersByUserID[userID].cleanupTracker(userID) + delete(m.trackersByUserID, userID) +} + +func (m *Manager) purgeInactiveAttributions(inactiveTimeout time.Duration) { + + // Get all userIDs from the map + m.tlock.RLock() + userIDs := make([]string, 0, len(m.trackersByUserID)) + for userID := range m.trackersByUserID { + userIDs = append(userIDs, userID) + } + m.tlock.RUnlock() + + // Iterate over all userIDs and purge inactive attributions of each user + currentTime := time.Now() + for _, userID := range userIDs { + // if cost attribution is not enabled for the user, delete the user tracker and continue + if len(m.limits.CostAttributionLabel(userID)) == 0 || m.limits.MaxCostAttributionPerUser(userID) <= 0 { + m.deleteUserTracer(userID) + continue + } + // get all inactive attributions for the user and clean up the tracker + inactiveObs := m.purgeInactiveObservationsForUser(userID, currentTime.Add(-inactiveTimeout).UnixNano()) + + for _, ob := range inactiveObs { + m.trackersByUserID[userID].cleanupTrackerAttribution(ob.lvalues) + } + } +} + +// compare two sorted string slices +func compareStringSlice(a, b []string) bool { + if len(a) != len(b) { + return false + } + for i, v := range a { + if v != b[i] { + return false + } + } + return true +} + +func (m *Manager) purgeInactiveObservationsForUser(userID string, deadline int64) []*observation { + cat := m.TrackerForUser(userID) + if cat == nil { + return nil + } + + newTrackedLabels := sort.StringSlice(m.limits.CostAttributionLabel(userID)) + // if they are different, we need to update the tracker, we don't mind, just reinitalized the tracker + if !compareStringSlice(cat.trackedLabels, newTrackedLabels) { + m.tlock.Lock() + m.trackersByUserID[userID], _ = newTracker(m.limits.CostAttributionLabel(userID), m.limits.MaxCostAttributionPerUser(userID)) + // update the tracker with the new tracker + cat = m.trackersByUserID[userID] + m.tlock.Unlock() + } else if maxCardinality := m.limits.MaxCostAttributionPerUser(userID); cat.maxCardinality != maxCardinality { + // if the maxCardinality is different, update the tracker + cat.updateMaxCardinality(maxCardinality) + } + + return cat.PurgeInactiveObservations(deadline) +} diff --git a/pkg/costattribution/tracker.go b/pkg/costattribution/tracker.go new file mode 100644 index 00000000000..d9f61cbda93 --- /dev/null +++ b/pkg/costattribution/tracker.go @@ -0,0 +1,200 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package costattribution + +import ( + "sort" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "go.uber.org/atomic" +) + +type observation struct { + lvalues []string + lastUpdate *atomic.Int64 +} + +func (t *Tracker) cleanupTrackerAttribution(vals []string) { + t.activeSeriesPerUserAttribution.DeleteLabelValues(vals...) + t.receivedSamplesAttribution.DeleteLabelValues(vals...) + t.discardedSampleAttribution.DeleteLabelValues(vals...) +} + +func (t *Tracker) cleanupTracker(userID string) { + filter := prometheus.Labels{"user": userID} + t.activeSeriesPerUserAttribution.DeletePartialMatch(filter) + t.receivedSamplesAttribution.DeletePartialMatch(filter) + t.discardedSampleAttribution.DeletePartialMatch(filter) +} + +type Tracker struct { + userID string + trackedLabels []string + maxCardinality int + activeSeriesPerUserAttribution *prometheus.GaugeVec + receivedSamplesAttribution *prometheus.CounterVec + discardedSampleAttribution *prometheus.CounterVec + + // oLock protects the observed map + oLock sync.RWMutex + observed map[uint64]*observation + + hashBuffer []byte +} + +func (t *Tracker) IncrementActiveSeries(lbs labels.Labels, now time.Time) { + vals := t.getKeyValues(lbs, now.Unix()) + t.activeSeriesPerUserAttribution.WithLabelValues(vals...).Inc() +} + +func (t *Tracker) IncrementDiscardedSamples(lbs labels.Labels, value float64, reason string, now time.Time) { + vals := t.getKeyValues(lbs, now.Unix()) + t.discardedSampleAttribution.WithLabelValues(vals...).Add(value) +} + +func (t *Tracker) IncrementReceivedSamples(lbs labels.Labels, value float64, now time.Time) { + vals := t.getKeyValues(lbs, now.Unix()) + t.receivedSamplesAttribution.WithLabelValues(vals...).Add(value) +} + +func (t *Tracker) getKeyValues(lbls labels.Labels, ts int64) []string { + values := make([]string, len(t.trackedLabels)+1) + for i, l := range t.trackedLabels { + values[i] = lbls.Get(l) + if values[i] == "" { + values[i] = missingValue + } + } + values[len(values)-1] = t.userID + + var stream uint64 + stream, t.hashBuffer = lbls.HashForLabels(t.hashBuffer, t.trackedLabels...) + if t.overflow(stream, values, ts) { + // Omit last label. + for i := range values[:len(values)-1] { + values[i] = overflowValue + } + } + + return values +} + +func (t *Tracker) overflow(stream uint64, values []string, ts int64) bool { + // If the maximum cardinality is hit all streams become `__overflow__`. + if len(t.observed) > t.maxCardinality { + return true + } + + if o, known := t.observed[stream]; known && o.lastUpdate != nil && o.lastUpdate.Load() < ts { + o.lastUpdate.Store(ts) + } else { + t.observed[stream] = &observation{ + lvalues: values, + lastUpdate: atomic.NewInt64(ts), + } + } + + return false +} + +// we need the time stamp, since active series could have entered active stripe long time ago, and already evicted +// from the observed map but still in the active Stripe +func (t *Tracker) DecrementActiveSeries(lbs labels.Labels, value int64, ts time.Time) { + vals := t.getKeyValues(lbs, ts.Unix()) + t.activeSeriesPerUserAttribution.WithLabelValues(vals...).Dec() +} + +func newTracker(trackedLabels []string, limit int) (*Tracker, error) { + // keep tracked labels sorted for consistent metric labels + sort.Strings(trackedLabels) + m := &Tracker{ + trackedLabels: trackedLabels, + maxCardinality: limit, + oLock: sync.RWMutex{}, + observed: map[uint64]*observation{}, + //nolint:faillint // the metrics are registered in the mimir package + discardedSampleAttribution: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_discarded_samples_attribution_total", + Help: "The total number of samples that were discarded per attribution.", + }, append(trackedLabels, "user")), + //nolint:faillint + receivedSamplesAttribution: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_received_samples_attribution_total", + Help: "The total number of samples that were received per attribution.", + }, append(trackedLabels, "user")), + //nolint:faillint + activeSeriesPerUserAttribution: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "cortex_ingester_active_series_attribution", + Help: "The total number of active series per user and attribution.", + }, append(trackedLabels, "user")), + } + return m, nil +} + +func (t *Tracker) Collect(out chan<- prometheus.Metric) { + t.activeSeriesPerUserAttribution.Collect(out) + t.receivedSamplesAttribution.Collect(out) + t.discardedSampleAttribution.Collect(out) +} + +// Describe implements prometheus.Collector. +func (t *Tracker) Describe(chan<- *prometheus.Desc) { + // this is an unchecked collector +} + +func (t *Tracker) PurgeInactiveObservations(deadline int64) []*observation { + obs := t.observed + if obs == nil { + return nil + } + + var invalidKeys []uint64 + for labHash, ob := range obs { + if ob != nil && ob.lastUpdate != nil && ob.lastUpdate.Load() <= deadline { + invalidKeys = append(invalidKeys, labHash) + } + } + + if len(invalidKeys) == 0 { + return nil + } + + t.oLock.Lock() + defer t.oLock.Unlock() + + // Cleanup inactive observations and return all invalid observations to clean up metrics for them + res := make([]*observation, len(invalidKeys)) + for i := 0; i < len(invalidKeys); { + inactiveLab := invalidKeys[i] + ob := t.observed[inactiveLab] + if ob != nil && ob.lastUpdate != nil && ob.lastUpdate.Load() <= deadline { + delete(t.observed, inactiveLab) + res[i] = ob + i++ + } else { + invalidKeys[i] = invalidKeys[len(invalidKeys)-1] + invalidKeys = invalidKeys[:len(invalidKeys)-1] + } + } + + return res[:len(invalidKeys)] +} + +func (t *Tracker) updateMaxCardinality(limit int) { + // if we are reducing limit, we can just set it + if t.maxCardinality >= limit { + t.maxCardinality = limit + return + } + // if we are increasing limit, we need to check if we are already in overflow, + // if yes, reset the counter, otherwise the counters won't be correct + t.oLock.Lock() + defer t.oLock.Unlock() + if len(t.observed) > t.maxCardinality { + t.observed = map[uint64]*observation{} + } + t.maxCardinality = limit +} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 59d9161147b..901fc56d717 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -48,6 +48,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/grafana/mimir/pkg/cardinality" + "github.com/grafana/mimir/pkg/costattribution" ingester_client "github.com/grafana/mimir/pkg/ingester/client" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/querier/stats" @@ -105,7 +106,7 @@ type Distributor struct { distributorsLifecycler *ring.BasicLifecycler distributorsRing *ring.Ring healthyInstancesCount *atomic.Uint32 - + costAttributionMgr *costattribution.Manager // For handling HA replicas. HATracker *haTracker @@ -306,7 +307,7 @@ func (m *PushMetrics) deleteUserMetrics(user string) { } // New constructs a new Distributor -func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) { +func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionMgr *costattribution.Manager, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) { clientMetrics := ingester_client.NewMetrics(reg) if cfg.IngesterClientFactory == nil { cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) { @@ -341,6 +342,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove healthyInstancesCount: atomic.NewUint32(0), limits: limits, HATracker: haTracker, + costAttributionMgr: costAttributionMgr, ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ @@ -714,21 +716,21 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica // The returned error may retain the series labels. // It uses the passed nowt time to observe the delay of sample timestamps. func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeseries, userID, group string, skipLabelNameValidation bool, minExemplarTS, maxExemplarTS int64) error { - if err := validateLabels(d.sampleValidationMetrics, d.limits, userID, group, ts.Labels, skipLabelNameValidation); err != nil { + now := model.TimeFromUnixNano(nowt.UnixNano()) + cat := getCATrackerForUser(userID, d.costAttributionMgr) + if err := validateLabels(d.sampleValidationMetrics, d.limits, userID, group, ts.Labels, skipLabelNameValidation, cat, nowt); err != nil { return err } - now := model.TimeFromUnixNano(nowt.UnixNano()) - for _, s := range ts.Samples { - if err := validateSample(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, s); err != nil { + if err := validateSample(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, s, cat); err != nil { return err } } histogramsUpdated := false for i := range ts.Histograms { - updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[i]) + updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[i], cat) if err != nil { return err } @@ -840,7 +842,8 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc { } numSamples := 0 - group := d.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(d.limits, userID, req.Timeseries), time.Now()) + now := time.Now() + group := d.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(d.limits, userID, req.Timeseries), now) for _, ts := range req.Timeseries { numSamples += len(ts.Samples) + len(ts.Histograms) } @@ -854,6 +857,11 @@ func (d *Distributor) prePushHaDedupeMiddleware(next PushFunc) PushFunc { if errors.As(err, &tooManyClustersError{}) { d.discardedSamplesTooManyHaClusters.WithLabelValues(userID, group).Add(float64(numSamples)) + if d.costAttributionMgr != nil { + if cat := d.costAttributionMgr.TrackerForUser(userID); cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(numSamples), reasonTooManyHAClusters, now) + } + } } return err @@ -1107,6 +1115,9 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc { totalN := validatedSamples + validatedExemplars + validatedMetadata if !d.ingestionRateLimiter.AllowN(now, userID, totalN) { + if cat := getCATrackerForUser(userID, d.costAttributionMgr); cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(req.Timeseries[0].Labels), float64(validatedSamples), reasonRateLimited, now) + } d.discardedSamplesRateLimited.WithLabelValues(userID, group).Add(float64(validatedSamples)) d.discardedExemplarsRateLimited.WithLabelValues(userID).Add(float64(validatedExemplars)) d.discardedMetadataRateLimited.WithLabelValues(userID).Add(float64(validatedMetadata)) @@ -1664,10 +1675,15 @@ func tokenForMetadata(userID string, metricName string) uint32 { } func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string) { + now := mtime.Now() var receivedSamples, receivedExemplars, receivedMetadata int + for _, ts := range req.Timeseries { receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms) receivedExemplars += len(ts.TimeSeries.Exemplars) + if cat := getCATrackerForUser(userID, d.costAttributionMgr); cat != nil { + cat.IncrementReceivedSamples(mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(receivedSamples), now) + } } receivedMetadata = len(req.Metadata) diff --git a/pkg/distributor/validate.go b/pkg/distributor/validate.go index db15559c70d..3aaa9b44b82 100644 --- a/pkg/distributor/validate.go +++ b/pkg/distributor/validate.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/grafana/mimir/pkg/costattribution" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/util/extract" "github.com/grafana/mimir/pkg/util/globalerror" @@ -215,15 +216,22 @@ func newExemplarValidationMetrics(r prometheus.Registerer) *exemplarValidationMe // validateSample returns an err if the sample is invalid. // The returned error may retain the provided series labels. // It uses the passed 'now' time to measure the relative time of the sample. -func validateSample(m *sampleValidationMetrics, now model.Time, cfg sampleValidationConfig, userID, group string, ls []mimirpb.LabelAdapter, s mimirpb.Sample) error { +func validateSample(m *sampleValidationMetrics, now model.Time, cfg sampleValidationConfig, userID, group string, ls []mimirpb.LabelAdapter, s mimirpb.Sample, cat *costattribution.Tracker) error { if model.Time(s.TimestampMs) > now.Add(cfg.CreationGracePeriod(userID)) { m.tooFarInFuture.WithLabelValues(userID, group).Inc() + // if the validation failed, we need to increment the discarded samples metric + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonTooFarInFuture, now.Time()) + } unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls) return fmt.Errorf(sampleTimestampTooNewMsgFormat, s.TimestampMs, unsafeMetricName) } if cfg.PastGracePeriod(userID) > 0 && model.Time(s.TimestampMs) < now.Add(-cfg.PastGracePeriod(userID)).Add(-cfg.OutOfOrderTimeWindow(userID)) { m.tooFarInPast.WithLabelValues(userID, group).Inc() + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonTooFarInPast, now.Time()) + } unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls) return fmt.Errorf(sampleTimestampTooOldMsgFormat, s.TimestampMs, unsafeMetricName) } @@ -234,20 +242,29 @@ func validateSample(m *sampleValidationMetrics, now model.Time, cfg sampleValida // validateSampleHistogram returns an err if the sample is invalid. // The returned error may retain the provided series labels. // It uses the passed 'now' time to measure the relative time of the sample. -func validateSampleHistogram(m *sampleValidationMetrics, now model.Time, cfg sampleValidationConfig, userID, group string, ls []mimirpb.LabelAdapter, s *mimirpb.Histogram) (bool, error) { +func validateSampleHistogram(m *sampleValidationMetrics, now model.Time, cfg sampleValidationConfig, userID, group string, ls []mimirpb.LabelAdapter, s *mimirpb.Histogram, cat *costattribution.Tracker) (bool, error) { if model.Time(s.Timestamp) > now.Add(cfg.CreationGracePeriod(userID)) { + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonTooFarInFuture, now.Time()) + } m.tooFarInFuture.WithLabelValues(userID, group).Inc() unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls) return false, fmt.Errorf(sampleTimestampTooNewMsgFormat, s.Timestamp, unsafeMetricName) } if cfg.PastGracePeriod(userID) > 0 && model.Time(s.Timestamp) < now.Add(-cfg.PastGracePeriod(userID)).Add(-cfg.OutOfOrderTimeWindow(userID)) { + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonTooFarInPast, now.Time()) + } m.tooFarInPast.WithLabelValues(userID, group).Inc() unsafeMetricName, _ := extract.UnsafeMetricNameFromLabelAdapters(ls) return false, fmt.Errorf(sampleTimestampTooOldMsgFormat, s.Timestamp, unsafeMetricName) } if s.Schema < mimirpb.MinimumHistogramSchema || s.Schema > mimirpb.MaximumHistogramSchema { + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonInvalidNativeHistogramSchema, now.Time()) + } m.invalidNativeHistogramSchema.WithLabelValues(userID, group).Inc() return false, fmt.Errorf(invalidSchemaNativeHistogramMsgFormat, s.Schema) } @@ -261,6 +278,9 @@ func validateSampleHistogram(m *sampleValidationMetrics, now model.Time, cfg sam } if bucketCount > bucketLimit { if !cfg.ReduceNativeHistogramOverMaxBuckets(userID) { + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonMaxNativeHistogramBuckets, now.Time()) + } m.maxNativeHistogramBuckets.WithLabelValues(userID, group).Inc() return false, fmt.Errorf(maxNativeHistogramBucketsMsgFormat, s.Timestamp, mimirpb.FromLabelAdaptersToString(ls), bucketCount, bucketLimit) } @@ -268,6 +288,9 @@ func validateSampleHistogram(m *sampleValidationMetrics, now model.Time, cfg sam for { bc, err := s.ReduceResolution() if err != nil { + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonMaxNativeHistogramBuckets, now.Time()) + } m.maxNativeHistogramBuckets.WithLabelValues(userID, group).Inc() return false, fmt.Errorf(notReducibleNativeHistogramMsgFormat, s.Timestamp, mimirpb.FromLabelAdaptersToString(ls), bucketCount, bucketLimit) } @@ -366,22 +389,40 @@ func removeNonASCIIChars(in string) (out string) { return out } +// getCATrackerForUser returns the cost attribution tracker for the user. +// If the cost attribution manager is nil or the user is not enabled for cost attribution, it returns nil. +func getCATrackerForUser(userID string, cam *costattribution.Manager) *costattribution.Tracker { + if cam == nil { + return nil + } + return cam.TrackerForUser(userID) +} + // validateLabels returns an err if the labels are invalid. // The returned error may retain the provided series labels. -func validateLabels(m *sampleValidationMetrics, cfg labelValidationConfig, userID, group string, ls []mimirpb.LabelAdapter, skipLabelNameValidation bool) error { +func validateLabels(m *sampleValidationMetrics, cfg labelValidationConfig, userID, group string, ls []mimirpb.LabelAdapter, skipLabelNameValidation bool, cat *costattribution.Tracker, ts time.Time) error { unsafeMetricName, err := extract.UnsafeMetricNameFromLabelAdapters(ls) if err != nil { + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonMissingMetricName, ts) + } m.missingMetricName.WithLabelValues(userID, group).Inc() return errors.New(noMetricNameMsgFormat) } if !model.IsValidMetricName(model.LabelValue(unsafeMetricName)) { + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonInvalidMetricName, ts) + } m.invalidMetricName.WithLabelValues(userID, group).Inc() return fmt.Errorf(invalidMetricNameMsgFormat, removeNonASCIIChars(unsafeMetricName)) } numLabelNames := len(ls) if numLabelNames > cfg.MaxLabelNamesPerSeries(userID) { + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonMaxLabelNamesPerSeries, ts) + } m.maxLabelNamesPerSeries.WithLabelValues(userID, group).Inc() metric, ellipsis := getMetricAndEllipsis(ls) return fmt.Errorf(tooManyLabelsMsgFormat, len(ls), cfg.MaxLabelNamesPerSeries(userID), metric, ellipsis) @@ -392,15 +433,27 @@ func validateLabels(m *sampleValidationMetrics, cfg labelValidationConfig, userI lastLabelName := "" for _, l := range ls { if !skipLabelNameValidation && !model.LabelName(l.Name).IsValid() { + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonInvalidLabel, ts) + } m.invalidLabel.WithLabelValues(userID, group).Inc() return fmt.Errorf(invalidLabelMsgFormat, l.Name, mimirpb.FromLabelAdaptersToString(ls)) } else if len(l.Name) > maxLabelNameLength { + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonLabelNameTooLong, ts) + } m.labelNameTooLong.WithLabelValues(userID, group).Inc() return fmt.Errorf(labelNameTooLongMsgFormat, l.Name, mimirpb.FromLabelAdaptersToString(ls)) } else if len(l.Value) > maxLabelValueLength { + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonLabelValueTooLong, ts) + } m.labelValueTooLong.WithLabelValues(userID, group).Inc() return fmt.Errorf(labelValueTooLongMsgFormat, l.Name, l.Value, mimirpb.FromLabelAdaptersToString(ls)) } else if lastLabelName == l.Name { + if cat != nil { + cat.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ls), 1, reasonDuplicateLabelNames, ts) + } m.duplicateLabelNames.WithLabelValues(userID, group).Inc() return fmt.Errorf(duplicateLabelMsgFormat, l.Name, mimirpb.FromLabelAdaptersToString(ls)) } diff --git a/pkg/ingester/activeseries/active_series.go b/pkg/ingester/activeseries/active_series.go index 71044b5e348..d827097839f 100644 --- a/pkg/ingester/activeseries/active_series.go +++ b/pkg/ingester/activeseries/active_series.go @@ -13,10 +13,12 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/util/zeropool" "go.uber.org/atomic" + "github.com/grafana/mimir/pkg/costattribution" asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model" ) @@ -44,9 +46,10 @@ type ActiveSeries struct { stripes [numStripes]seriesStripe deleted deletedSeries - // matchersMutex protects matchers and lastMatchersUpdate. + // matchersMutex protects matchers and lastMatchersUpdate. it used by both matchers and cat matchersMutex sync.RWMutex matchers *asmodel.Matchers + cat *costattribution.Tracker lastMatchersUpdate time.Time // The duration after which series become inactive. @@ -63,8 +66,8 @@ type seriesStripe struct { // Unix nanoseconds. Only used by purge. Zero = unknown. // Updated in purge and when old timestamp is used when updating series (in this case, oldestEntryTs is updated // without holding the lock -- hence the atomic). - oldestEntryTs atomic.Int64 - + oldestEntryTs atomic.Int64 + cat *costattribution.Tracker mu sync.RWMutex refs map[storage.SeriesRef]seriesEntry active uint32 // Number of active entries in this stripe. Only decreased during purge or clear. @@ -73,6 +76,8 @@ type seriesStripe struct { activeMatchingNativeHistograms []uint32 // Number of active entries (only native histograms) in this stripe matching each matcher of the configured Matchers. activeNativeHistogramBuckets uint32 // Number of buckets in active native histogram entries in this stripe. Only decreased during purge or clear. activeMatchingNativeHistogramBuckets []uint32 // Number of buckets in active native histogram entries in this stripe matching each matcher of the configured Matchers. + userID string + buf labels.ScratchBuilder } // seriesEntry holds a timestamp for single series. @@ -80,16 +85,22 @@ type seriesEntry struct { nanos *atomic.Int64 // Unix timestamp in nanoseconds. Needs to be a pointer because we don't store pointers to entries in the stripe. matches asmodel.PreAllocDynamicSlice // Index of the matcher matching numNativeHistogramBuckets int // Number of buckets in native histogram series, -1 if not a native histogram. - + // keep the value corresponding the label configured in serieStripe deleted bool // This series was marked as deleted, so before purging we need to remove the refence to it from the deletedSeries. } -func NewActiveSeries(asm *asmodel.Matchers, timeout time.Duration) *ActiveSeries { - c := &ActiveSeries{matchers: asm, timeout: timeout} +func NewActiveSeries( + asm *asmodel.Matchers, + timeout time.Duration, + cat *costattribution.Tracker, +) *ActiveSeries { + c := &ActiveSeries{ + matchers: asm, timeout: timeout, cat: cat, + } // Stripes are pre-allocated so that we only read on them and no lock is required. for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted) + c.stripes[i].reinitialize(asm, &c.deleted, cat) } return c @@ -106,7 +117,7 @@ func (c *ActiveSeries) ReloadMatchers(asm *asmodel.Matchers, now time.Time) { defer c.matchersMutex.Unlock() for i := 0; i < numStripes; i++ { - c.stripes[i].reinitialize(asm, &c.deleted) + c.stripes[i].reinitialize(asm, &c.deleted, c.cat) } c.matchers = asm c.lastMatchersUpdate = now @@ -118,18 +129,24 @@ func (c *ActiveSeries) CurrentConfig() asmodel.CustomTrackersConfig { return c.matchers.Config() } +func (c *ActiveSeries) CurrentCostAttributionTracker() *costattribution.Tracker { + c.matchersMutex.RLock() + defer c.matchersMutex.RUnlock() + return c.cat +} + // UpdateSeries updates series timestamp to 'now'. Function is called to make a copy of labels if entry doesn't exist yet. // Pass -1 in numNativeHistogramBuckets if the series is not a native histogram series. -func (c *ActiveSeries) UpdateSeries(series labels.Labels, ref storage.SeriesRef, now time.Time, numNativeHistogramBuckets int) { +func (c *ActiveSeries) UpdateSeries(series labels.Labels, ref storage.SeriesRef, now time.Time, numNativeHistogramBuckets int, idx tsdb.IndexReader) { stripeID := ref % numStripes - created := c.stripes[stripeID].updateSeriesTimestamp(now, series, ref, numNativeHistogramBuckets) if created { if deleted, ok := c.deleted.find(series); ok { deletedStripeID := deleted.ref % numStripes - c.stripes[deletedStripeID].remove(deleted.ref) + c.stripes[deletedStripeID].remove(deleted.ref, idx) } } + } // PostDeletion should be called when series are deleted from the head. @@ -149,20 +166,21 @@ func (c *ActiveSeries) PostDeletion(deleted map[chunks.HeadSeriesRef]labels.Labe // Purge purges expired entries and returns true if enough time has passed since // last reload. This should be called periodically to avoid unbounded memory // growth. -func (c *ActiveSeries) Purge(now time.Time) bool { +func (c *ActiveSeries) Purge(now time.Time, idx tsdb.IndexReader) bool { c.matchersMutex.Lock() defer c.matchersMutex.Unlock() purgeTime := now.Add(-c.timeout) - c.purge(purgeTime) + c.purge(purgeTime, idx) return !c.lastMatchersUpdate.After(purgeTime) } // purge removes expired entries from the cache. -func (c *ActiveSeries) purge(keepUntil time.Time) { +func (c *ActiveSeries) purge(keepUntil time.Time, idx tsdb.IndexReader) { for s := 0; s < numStripes; s++ { - c.stripes[s].purge(keepUntil) + c.stripes[s].purge(keepUntil, idx) } + } func (c *ActiveSeries) ContainsRef(ref storage.SeriesRef) bool { @@ -212,9 +230,9 @@ func (c *ActiveSeries) ActiveWithMatchers() (total int, totalMatching []int, tot return } -func (c *ActiveSeries) Delete(ref chunks.HeadSeriesRef) { +func (c *ActiveSeries) Delete(ref chunks.HeadSeriesRef, idx tsdb.IndexReader) { stripeID := storage.SeriesRef(ref) % numStripes - c.stripes[stripeID].remove(storage.SeriesRef(ref)) + c.stripes[stripeID].remove(storage.SeriesRef(ref), idx) } func (c *ActiveSeries) Clear() { @@ -375,6 +393,7 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef matchesLen := matches.Len() s.active++ + if numNativeHistogramBuckets >= 0 { s.activeNativeHistograms++ s.activeNativeHistogramBuckets += uint32(numNativeHistogramBuckets) @@ -394,6 +413,12 @@ func (s *seriesStripe) findAndUpdateOrCreateEntryForSeries(ref storage.SeriesRef numNativeHistogramBuckets: numNativeHistogramBuckets, } + // here if we have a cost attribution label, we can split the serie count based on the value of the label + // we also set the reference to the value of the label in the entry, so when remove, we can decrease the counter accordingly + if s.cat != nil { + s.cat.IncrementActiveSeries(series, time.Unix(0, nowNanos)) + } + s.refs[ref] = e return e.nanos, true } @@ -403,6 +428,7 @@ func (s *seriesStripe) clear() { defer s.mu.Unlock() s.oldestEntryTs.Store(0) + // TODO: s.refs = map[storage.SeriesRef]seriesEntry{} s.active = 0 s.activeNativeHistograms = 0 @@ -415,10 +441,13 @@ func (s *seriesStripe) clear() { } // Reinitialize assigns new matchers and corresponding size activeMatching slices. -func (s *seriesStripe) reinitialize(asm *asmodel.Matchers, deleted *deletedSeries) { +func (s *seriesStripe) reinitialize( + asm *asmodel.Matchers, + deleted *deletedSeries, + cat *costattribution.Tracker, +) { s.mu.Lock() defer s.mu.Unlock() - s.deleted = deleted s.oldestEntryTs.Store(0) s.refs = map[storage.SeriesRef]seriesEntry{} @@ -429,9 +458,11 @@ func (s *seriesStripe) reinitialize(asm *asmodel.Matchers, deleted *deletedSerie s.activeMatching = resizeAndClear(len(asm.MatcherNames()), s.activeMatching) s.activeMatchingNativeHistograms = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistograms) s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(asm.MatcherNames()), s.activeMatchingNativeHistogramBuckets) + s.cat = cat + s.buf = labels.NewScratchBuilder(128) } -func (s *seriesStripe) purge(keepUntil time.Time) { +func (s *seriesStripe) purge(keepUntil time.Time, idx tsdb.IndexReader) { keepUntilNanos := keepUntil.UnixNano() if oldest := s.oldestEntryTs.Load(); oldest > 0 && keepUntilNanos <= oldest { // Nothing to do. @@ -449,13 +480,29 @@ func (s *seriesStripe) purge(keepUntil time.Time) { s.activeMatchingNativeHistogramBuckets = resizeAndClear(len(s.activeMatchingNativeHistogramBuckets), s.activeMatchingNativeHistogramBuckets) oldest := int64(math.MaxInt64) + buf := labels.NewScratchBuilder(128) for ref, entry := range s.refs { ts := entry.nanos.Load() if ts < keepUntilNanos { if entry.deleted { s.deleted.purge(ref) } + + // idx, err := db.Head().Index() + // err = idx.Series(seriesRef, &buf, nil) + // if err != nil { + // return fmt.Errorf("error getting series: %w", err) + // } + // m := &mimirpb.Metric{Labels: mimirpb.FromLabelsToLabelAdapters(buf.Labels())} + + if s.cat != nil && idx != nil { + if err := idx.Series(ref, &buf, nil); err != nil { + //TODO: think about what to do here + } + s.cat.DecrementActiveSeries(buf.Labels(), 1, keepUntil) + } delete(s.refs, ref) + // TODO: here need to find what is deleted and decrement counters continue } @@ -464,6 +511,7 @@ func (s *seriesStripe) purge(keepUntil time.Time) { s.activeNativeHistograms++ s.activeNativeHistogramBuckets += uint32(entry.numNativeHistogramBuckets) } + ml := entry.matches.Len() for i := 0; i < ml; i++ { match := entry.matches.Get(i) @@ -489,7 +537,7 @@ func (s *seriesStripe) purge(keepUntil time.Time) { // This is mostly the same logic from purge() but we decrement counters for a single entry instead of incrementing for each entry. // Note: we might remove the oldest series here, but the worst thing can happen is that we let run a useless purge() cycle later, // so this method doesn't update the oldestEntryTs. -func (s *seriesStripe) remove(ref storage.SeriesRef) { +func (s *seriesStripe) remove(ref storage.SeriesRef, idx tsdb.IndexReader) { s.mu.Lock() defer s.mu.Unlock() @@ -502,6 +550,13 @@ func (s *seriesStripe) remove(ref storage.SeriesRef) { } s.active-- + if s.cat != nil && idx != nil { + if err := idx.Series(ref, &s.buf, nil); err != nil { + //TODO: think about what to do here + _ = err + } + s.cat.DecrementActiveSeries(s.buf.Labels(), 1, time.Now()) + } if entry.numNativeHistogramBuckets >= 0 { s.activeNativeHistograms-- s.activeNativeHistogramBuckets -= uint32(entry.numNativeHistogramBuckets) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 97a1f83011e..0a0b7ea8eda 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -51,6 +51,7 @@ import ( "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" + "github.com/grafana/mimir/pkg/costattribution" "github.com/grafana/mimir/pkg/ingester/activeseries" asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model" "github.com/grafana/mimir/pkg/ingester/client" @@ -310,6 +311,8 @@ type Ingester struct { activeGroups *util.ActiveGroupsCleanupService + costAttributionMgr *costattribution.Manager + tsdbMetrics *tsdbMetrics forceCompactTrigger chan requestWithUsersAndCallback @@ -364,21 +367,21 @@ func newIngester(cfg Config, limits *validation.Overrides, registerer prometheus limits: limits, logger: logger, - tsdbs: make(map[string]*userTSDB), - usersMetadata: make(map[string]*userMetricsMetadata), + tsdbs: make(map[string]*userTSDB), + usersMetadata: make(map[string]*userMetricsMetadata), + bucket: bucketClient, tsdbMetrics: newTSDBMetrics(registerer, logger), shipperMetrics: newShipperMetrics(registerer), forceCompactTrigger: make(chan requestWithUsersAndCallback), shipTrigger: make(chan requestWithUsersAndCallback), seriesHashCache: hashcache.NewSeriesHashCache(cfg.BlocksStorageConfig.TSDB.SeriesHashCacheMaxBytes), - - errorSamplers: newIngesterErrSamplers(cfg.ErrorSampleRate), + errorSamplers: newIngesterErrSamplers(cfg.ErrorSampleRate), }, nil } // New returns an Ingester that uses Mimir block storage. -func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, activeGroupsCleanupService *util.ActiveGroupsCleanupService, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { +func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, partitionRingWatcher *ring.PartitionRingWatcher, activeGroupsCleanupService *util.ActiveGroupsCleanupService, costAttributionMgr *costattribution.Manager, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) { i, err := newIngester(cfg, limits, registerer, logger) if err != nil { return nil, err @@ -386,7 +389,7 @@ func New(cfg Config, limits *validation.Overrides, ingestersRing ring.ReadRing, i.ingestionRate = util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval) i.metrics = newIngesterMetrics(registerer, cfg.ActiveSeriesMetrics.Enabled, i.getInstanceLimits, i.ingestionRate, &i.inflightPushRequests, &i.inflightPushRequestsBytes) i.activeGroups = activeGroupsCleanupService - + i.costAttributionMgr = costAttributionMgr // We create a circuit breaker, which will be activated on a successful completion of starting. i.circuitBreaker = newIngesterCircuitBreaker(i.cfg.PushCircuitBreaker, i.cfg.ReadCircuitBreaker, logger, registerer) @@ -775,10 +778,13 @@ func (i *Ingester) updateActiveSeries(now time.Time) { } newMatchersConfig := i.limits.ActiveSeriesCustomTrackersConfig(userID) - if newMatchersConfig.String() != userDB.activeSeries.CurrentConfig().String() { + newCostAttributionTracker := i.costAttributionMgr.TrackerForUser(userID) + if newMatchersConfig.String() != userDB.activeSeries.CurrentConfig().String() || newCostAttributionTracker != userDB.activeSeries.CurrentCostAttributionTracker() { i.replaceMatchers(asmodel.NewMatchers(newMatchersConfig), userDB, now) } - valid := userDB.activeSeries.Purge(now) + + idx, _ := userDB.Head().Index() + valid := userDB.activeSeries.Purge(now, idx) if !valid { // Active series config has been reloaded, exposing loading metric until MetricsIdleTimeout passes. i.metrics.activeSeriesLoading.WithLabelValues(userID).Set(1) @@ -1152,7 +1158,8 @@ func (i *Ingester) PushWithCleanup(ctx context.Context, req *mimirpb.WriteReques // Keep track of some stats which are tracked only if the samples will be // successfully committed - stats pushStats + + stats = pushStats{} firstPartialErr error // updateFirstPartial is a function that, in case of a softError, stores that error @@ -1277,8 +1284,13 @@ func (i *Ingester) updateMetricsFromPushStats(userID string, group string, stats func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.PreallocTimeseries, app extendedAppender, startAppend time.Time, stats *pushStats, updateFirstPartial func(sampler *util_log.Sampler, errFn softErrorFunction), activeSeries *activeseries.ActiveSeries, outOfOrderWindow time.Duration, minAppendTimeAvailable bool, minAppendTime int64) error { - // Return true if handled as soft error, and we can ingest more series. + // get the cost attribution value for the series + var caTracker *costattribution.Tracker + if i.costAttributionMgr != nil { + caTracker = i.costAttributionMgr.TrackerForUser(userID) + } + handleAppendError := func(err error, timestamp int64, labels []mimirpb.LabelAdapter) bool { stats.failedSamplesCount++ @@ -1288,6 +1300,9 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre // we actually ingested all samples which haven't failed. switch { case errors.Is(err, storage.ErrOutOfBounds): + if caTracker != nil { + caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleOutOfBounds, startAppend) + } stats.sampleOutOfBoundsCount++ updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() softError { return newSampleTimestampTooOldError(model.Time(timestamp), labels) @@ -1295,6 +1310,9 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre return true case errors.Is(err, storage.ErrOutOfOrderSample): + if caTracker != nil { + caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleOutOfOrder, startAppend) + } stats.sampleOutOfOrderCount++ updateFirstPartial(i.errorSamplers.sampleOutOfOrder, func() softError { return newSampleOutOfOrderError(model.Time(timestamp), labels) @@ -1302,6 +1320,9 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre return true case errors.Is(err, storage.ErrTooOldSample): + if caTracker != nil { + caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleTooOld, startAppend) + } stats.sampleTooOldCount++ updateFirstPartial(i.errorSamplers.sampleTimestampTooOldOOOEnabled, func() softError { return newSampleTimestampTooOldOOOEnabledError(model.Time(timestamp), labels, outOfOrderWindow) @@ -1309,6 +1330,9 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre return true case errors.Is(err, globalerror.SampleTooFarInFuture): + if caTracker != nil { + caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonSampleTooFarInFuture, startAppend) + } stats.sampleTooFarInFutureCount++ updateFirstPartial(i.errorSamplers.sampleTimestampTooFarInFuture, func() softError { return newSampleTimestampTooFarInFutureError(model.Time(timestamp), labels) @@ -1316,6 +1340,9 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre return true case errors.Is(err, storage.ErrDuplicateSampleForTimestamp): + if caTracker != nil { + caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonNewValueForTimestamp, startAppend) + } stats.newValueForTimestampCount++ updateFirstPartial(i.errorSamplers.sampleDuplicateTimestamp, func() softError { return newSampleDuplicateTimestampError(model.Time(timestamp), labels) @@ -1323,6 +1350,9 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre return true case errors.Is(err, globalerror.MaxSeriesPerUser): + if caTracker != nil { + caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonPerUserSeriesLimit, startAppend) + } stats.perUserSeriesLimitCount++ updateFirstPartial(i.errorSamplers.maxSeriesPerUserLimitExceeded, func() softError { return newPerUserSeriesLimitReachedError(i.limiter.limits.MaxGlobalSeriesPerUser(userID)) @@ -1330,6 +1360,9 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre return true case errors.Is(err, globalerror.MaxSeriesPerMetric): + if caTracker != nil { + caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonPerMetricSeriesLimit, startAppend) + } stats.perMetricSeriesLimitCount++ updateFirstPartial(i.errorSamplers.maxSeriesPerMetricLimitExceeded, func() softError { return newPerMetricSeriesLimitReachedError(i.limiter.limits.MaxGlobalSeriesPerMetric(userID), labels) @@ -1338,30 +1371,45 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre // Map TSDB native histogram validation errors to soft errors. case errors.Is(err, histogram.ErrHistogramCountMismatch): + if caTracker != nil { + caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) + } stats.invalidNativeHistogramCount++ updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { return newNativeHistogramValidationError(globalerror.NativeHistogramCountMismatch, err, model.Time(timestamp), labels) }) return true case errors.Is(err, histogram.ErrHistogramCountNotBigEnough): + if caTracker != nil { + caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) + } stats.invalidNativeHistogramCount++ updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { return newNativeHistogramValidationError(globalerror.NativeHistogramCountNotBigEnough, err, model.Time(timestamp), labels) }) return true case errors.Is(err, histogram.ErrHistogramNegativeBucketCount): + if caTracker != nil { + caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) + } stats.invalidNativeHistogramCount++ updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { return newNativeHistogramValidationError(globalerror.NativeHistogramNegativeBucketCount, err, model.Time(timestamp), labels) }) return true case errors.Is(err, histogram.ErrHistogramSpanNegativeOffset): + if caTracker != nil { + caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) + } stats.invalidNativeHistogramCount++ updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { return newNativeHistogramValidationError(globalerror.NativeHistogramSpanNegativeOffset, err, model.Time(timestamp), labels) }) return true case errors.Is(err, histogram.ErrHistogramSpansBucketsMismatch): + if caTracker != nil { + caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(labels), 1, reasonInvalidNativeHistogram, startAppend) + } stats.invalidNativeHistogramCount++ updateFirstPartial(i.errorSamplers.nativeHistogramValidationError, func() softError { return newNativeHistogramValidationError(globalerror.NativeHistogramSpansBucketsMismatch, err, model.Time(timestamp), labels) @@ -1384,6 +1432,7 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre var builder labels.ScratchBuilder var nonCopiedLabels labels.Labels for _, ts := range timeseries { + // The labels must be sorted (in our case, it's guaranteed a write request // has sorted labels once hit the ingester). @@ -1399,7 +1448,9 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre stats.failedSamplesCount += len(ts.Samples) + len(ts.Histograms) stats.sampleOutOfBoundsCount += len(ts.Samples) + len(ts.Histograms) - + if caTracker != nil { + caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(len(ts.Samples)+len(ts.Histograms)), reasonSampleOutOfBounds, startAppend) + } var firstTimestamp int64 if len(ts.Samples) > 0 { firstTimestamp = ts.Samples[0].TimestampMs @@ -1420,7 +1471,9 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre stats.failedSamplesCount += len(ts.Samples) stats.sampleOutOfBoundsCount += len(ts.Samples) - + if caTracker != nil { + caTracker.IncrementDiscardedSamples(mimirpb.FromLabelAdaptersToLabels(ts.Labels), float64(len(ts.Samples)), reasonSampleOutOfBounds, startAppend) + } firstTimestamp := ts.Samples[0].TimestampMs updateFirstPartial(i.errorSamplers.sampleTimestampTooOld, func() softError { @@ -1541,7 +1594,9 @@ func (i *Ingester) pushSamplesToAppender(userID string, timeseries []mimirpb.Pre } if activeSeries != nil && stats.succeededSamplesCount > oldSucceededSamplesCount { - activeSeries.UpdateSeries(nonCopiedLabels, ref, startAppend, numNativeHistogramBuckets) + idx, _ := i.getTSDB(userID).Head().Index() + // TODO: deal with the error here + activeSeries.UpdateSeries(nonCopiedLabels, ref, startAppend, numNativeHistogramBuckets, idx) } if len(ts.Exemplars) > 0 && i.limits.MaxGlobalExemplarsPerUser(userID) > 0 { @@ -2628,9 +2683,17 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD ownedSeriedStateShardSize = i.ownedSeriesService.ringStrategy.shardSizeForUser(userID) } + var cat *costattribution.Tracker + if i.costAttributionMgr != nil { + cat = i.costAttributionMgr.TrackerForUser(userID) + } userDB := &userTSDB{ - userID: userID, - activeSeries: activeseries.NewActiveSeries(asmodel.NewMatchers(matchersConfig), i.cfg.ActiveSeriesMetrics.IdleTimeout), + userID: userID, + activeSeries: activeseries.NewActiveSeries( + asmodel.NewMatchers(matchersConfig), + i.cfg.ActiveSeriesMetrics.IdleTimeout, + cat, + ), seriesInMetric: newMetricCounter(i.limiter, i.cfg.getIgnoreSeriesLimitForMetricNamesMap()), ingestedAPISamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), ingestedRuleSamples: util_math.NewEWMARate(0.2, i.cfg.RateUpdatePeriod), @@ -2645,6 +2708,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD localSeriesLimit: initialLocalLimit, }, } + userDB.triggerRecomputeOwnedSeries(recomputeOwnedSeriesReasonNewUser) oooTW := i.limits.OutOfOrderTimeWindow(userID) @@ -3219,7 +3283,12 @@ func (i *Ingester) compactBlocksToReduceInMemorySeries(ctx context.Context, now } // Purge the active series so that the next call to Active() will return the up-to-date count. - db.activeSeries.Purge(now) + idx, err := db.Head().Index() + if err != nil { + level.Warn(i.logger).Log("msg", "failed to get the index of the TSDB head", "user", userID, "err", err) + continue + } + db.activeSeries.Purge(now, idx) // Estimate the number of series that would be dropped from the TSDB Head if we would // compact the head up until "now - active series idle timeout". diff --git a/pkg/ingester/user_tsdb.go b/pkg/ingester/user_tsdb.go index 95bfe9840e2..e9766753525 100644 --- a/pkg/ingester/user_tsdb.go +++ b/pkg/ingester/user_tsdb.go @@ -624,7 +624,9 @@ func (u *userTSDB) computeOwnedSeries() int { if u.ownedTokenRanges.IncludesKey(sh) { count++ } else { - u.activeSeries.Delete(refs[i]) + idx, _ := u.Head().Index() + // TODO: deal with the err here + u.activeSeries.Delete(refs[i], idx) } } }) diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index a778a05ac3a..2f60463e350 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -51,6 +51,7 @@ import ( "github.com/grafana/mimir/pkg/blockbuilder" "github.com/grafana/mimir/pkg/compactor" "github.com/grafana/mimir/pkg/continuoustest" + "github.com/grafana/mimir/pkg/costattribution" "github.com/grafana/mimir/pkg/distributor" "github.com/grafana/mimir/pkg/flusher" "github.com/grafana/mimir/pkg/frontend" @@ -143,9 +144,12 @@ type Config struct { ContinuousTest continuoustest.Config `yaml:"-"` OverridesExporter exporter.Config `yaml:"overrides_exporter"` - Common CommonConfig `yaml:"common"` + Common CommonConfig `yaml:"common"` + CustomRegistryPath string `yaml:"custom_registry_path" category:"advanced"` - TimeseriesUnmarshalCachingOptimizationEnabled bool `yaml:"timeseries_unmarshal_caching_optimization_enabled" category:"experimental"` + TimeseriesUnmarshalCachingOptimizationEnabled bool `yaml:"timeseries_unmarshal_caching_optimization_enabled" category:"experimental"` + CostAttributionEvictionInterval time.Duration `yaml:"cost_attribution_eviction_interval" category:"experimental"` + CostAttributionCoolDownDuration time.Duration `yaml:"cost_attribution_cool_down_duration" category:"experimental"` } // RegisterFlags registers flags. @@ -168,10 +172,12 @@ func (c *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.StringVar(&c.NoAuthTenant, "auth.no-auth-tenant", "anonymous", "Tenant ID to use when multitenancy is disabled.") f.BoolVar(&c.PrintConfig, "print.config", false, "Print the config and exit.") f.DurationVar(&c.ShutdownDelay, "shutdown-delay", 0, "How long to wait between SIGTERM and shutdown. After receiving SIGTERM, Mimir will report not-ready status via /ready endpoint.") + f.DurationVar(&c.CostAttributionEvictionInterval, "cost-attribution-eviction-interval", 30*time.Minute, "Time interval at which inactive cost attributions will be evicted from the cache.") + f.DurationVar(&c.CostAttributionCoolDownDuration, "cost-attribution-cool-down-duration", 20*time.Minute, "Duration during which any cost attribution for a user will be marked as __overflow__ after exceeding the specified limit, prior to resetting the cache.") f.IntVar(&c.MaxSeparateMetricsGroupsPerUser, "max-separate-metrics-groups-per-user", 1000, "Maximum number of groups allowed per user by which specified distributor and ingester metrics can be further separated.") f.BoolVar(&c.EnableGoRuntimeMetrics, "enable-go-runtime-metrics", false, "Set to true to enable all Go runtime metrics, such as go_sched_* and go_memstats_*.") f.BoolVar(&c.TimeseriesUnmarshalCachingOptimizationEnabled, "timeseries-unmarshal-caching-optimization-enabled", true, "Enables optimized marshaling of timeseries.") - + f.StringVar(&c.CustomRegistryPath, "custom-registry-path", "", "Defines a custom path for the registry. When specified, Mimir will expose cost attribution metrics through this custom path, if not specified, cost attribution metrics won't be exposed.") c.API.RegisterFlags(f) c.registerServerFlagsWithChangedDefaultValues(f) c.Distributor.RegisterFlags(f, logger) @@ -702,14 +708,16 @@ type Mimir struct { ServiceMap map[string]services.Service ModuleManager *modules.Manager - API *api.API - Server *server.Server - IngesterRing *ring.Ring - IngesterPartitionRingWatcher *ring.PartitionRingWatcher - IngesterPartitionInstanceRing *ring.PartitionInstanceRing - TenantLimits validation.TenantLimits - Overrides *validation.Overrides - ActiveGroupsCleanup *util.ActiveGroupsCleanupService + API *api.API + Server *server.Server + IngesterRing *ring.Ring + IngesterPartitionRingWatcher *ring.PartitionRingWatcher + IngesterPartitionInstanceRing *ring.PartitionInstanceRing + TenantLimits validation.TenantLimits + Overrides *validation.Overrides + ActiveGroupsCleanup *util.ActiveGroupsCleanupService + CostAttributionManager *costattribution.Manager + Distributor *distributor.Distributor Ingester *ingester.Ingester Flusher *flusher.Flusher diff --git a/pkg/mimir/modules.go b/pkg/mimir/modules.go index c372bea1c25..a0e82862160 100644 --- a/pkg/mimir/modules.go +++ b/pkg/mimir/modules.go @@ -29,6 +29,7 @@ import ( "github.com/prometheus/alertmanager/featurecontrol" "github.com/prometheus/alertmanager/matchers/compat" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/config" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/rules" @@ -79,6 +80,7 @@ const ( OverridesExporter string = "overrides-exporter" Server string = "server" ActiveGroupsCleanupService string = "active-groups-cleanup-service" + CostAttributionService string = "cost-attribution-service" Distributor string = "distributor" DistributorService string = "distributor-service" Ingester string = "ingester" @@ -460,7 +462,9 @@ func (t *Mimir) initDistributorService() (serv services.Service, err error) { t.Cfg.Distributor.PreferAvailabilityZone = t.Cfg.Querier.PreferAvailabilityZone t.Cfg.Distributor.IngestStorageConfig = t.Cfg.IngestStorage - t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, t.ActiveGroupsCleanup, t.IngesterRing, t.IngesterPartitionInstanceRing, canJoinDistributorsRing, t.Registerer, util_log.Logger) + t.Distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.Overrides, + t.ActiveGroupsCleanup, t.CostAttributionManager, t.IngesterRing, t.IngesterPartitionInstanceRing, + canJoinDistributorsRing, t.Registerer, util_log.Logger) if err != nil { return } @@ -642,6 +646,21 @@ func (t *Mimir) initActiveGroupsCleanupService() (services.Service, error) { return t.ActiveGroupsCleanup, nil } +func (t *Mimir) initCostAttributionService() (services.Service, error) { + // The cost attribution service is only initilized if the custom registry path is provided. + if t.Cfg.CustomRegistryPath != "" { + // if custom registry path is provided, create a custom registry and use it for cost attribution service + customRegistry := prometheus.NewRegistry() + // Register the custom registry with the provided URL. + // This allows users to expose custom metrics on a separate endpoint. + // This is useful when users want to expose metrics that are not part of the default Mimir metrics. + http.Handle(t.Cfg.CustomRegistryPath, promhttp.HandlerFor(customRegistry, promhttp.HandlerOpts{Registry: customRegistry})) + err := customRegistry.Register(t.CostAttributionManager) + return t.CostAttributionManager, err + } + return nil, nil +} + func (t *Mimir) tsdbIngesterConfig() { t.Cfg.Ingester.BlocksStorageConfig = t.Cfg.BlocksStorage } @@ -653,7 +672,7 @@ func (t *Mimir) initIngesterService() (serv services.Service, err error) { t.Cfg.Ingester.IngestStorageConfig = t.Cfg.IngestStorage t.tsdbIngesterConfig() - t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, t.IngesterRing, t.IngesterPartitionRingWatcher, t.ActiveGroupsCleanup, t.Registerer, util_log.Logger) + t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, t.IngesterRing, t.IngesterPartitionRingWatcher, t.ActiveGroupsCleanup, t.CostAttributionManager, t.Registerer, util_log.Logger) if err != nil { return } @@ -1125,6 +1144,7 @@ func (t *Mimir) setupModuleManager() error { mm.RegisterModule(Overrides, t.initOverrides, modules.UserInvisibleModule) mm.RegisterModule(OverridesExporter, t.initOverridesExporter) mm.RegisterModule(ActiveGroupsCleanupService, t.initActiveGroupsCleanupService, modules.UserInvisibleModule) + mm.RegisterModule(CostAttributionService, t.initCostAttributionService, modules.UserInvisibleModule) mm.RegisterModule(Distributor, t.initDistributor) mm.RegisterModule(DistributorService, t.initDistributorService, modules.UserInvisibleModule) mm.RegisterModule(Ingester, t.initIngester) @@ -1163,9 +1183,9 @@ func (t *Mimir) setupModuleManager() error { IngesterPartitionRing: {MemberlistKV, IngesterRing, API}, Overrides: {RuntimeConfig}, OverridesExporter: {Overrides, MemberlistKV, Vault}, - Distributor: {DistributorService, API, ActiveGroupsCleanupService, Vault}, + Distributor: {DistributorService, API, ActiveGroupsCleanupService, CostAttributionService, Vault}, DistributorService: {IngesterRing, IngesterPartitionRing, Overrides, Vault}, - Ingester: {IngesterService, API, ActiveGroupsCleanupService, Vault}, + Ingester: {IngesterService, API, ActiveGroupsCleanupService, CostAttributionService, Vault}, IngesterService: {IngesterRing, IngesterPartitionRing, Overrides, RuntimeConfig, MemberlistKV}, Flusher: {Overrides, API}, Queryable: {Overrides, DistributorService, IngesterRing, IngesterPartitionRing, API, StoreQueryable, MemberlistKV}, diff --git a/pkg/streamingpromql/benchmarks/ingester.go b/pkg/streamingpromql/benchmarks/ingester.go index 6f3b5f04a9a..9107b66f64f 100644 --- a/pkg/streamingpromql/benchmarks/ingester.go +++ b/pkg/streamingpromql/benchmarks/ingester.go @@ -96,7 +96,7 @@ func startBenchmarkIngester(rootDataDir string) (*ingester.Ingester, string, fun return services.StopAndAwaitTerminated(context.Background(), ingestersRing) }) - ing, err := ingester.New(ingesterCfg, overrides, ingestersRing, nil, nil, nil, log.NewNopLogger()) + ing, err := ingester.New(ingesterCfg, overrides, ingestersRing, nil, nil, nil, nil, log.NewNopLogger()) if err != nil { cleanup() return nil, "", nil, fmt.Errorf("could not create ingester: %w", err) diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 463f4e81f62..4157f3fa92d 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -179,6 +179,10 @@ type Limits struct { LabelValuesMaxCardinalityLabelNamesPerRequest int `yaml:"label_values_max_cardinality_label_names_per_request" json:"label_values_max_cardinality_label_names_per_request"` ActiveSeriesResultsMaxSizeBytes int `yaml:"active_series_results_max_size_bytes" json:"active_series_results_max_size_bytes" category:"experimental"` + // Cost attribution and limit. + CostAttributionLabels flagext.StringSliceCSV `yaml:"cost_attribution_labels" json:"cost_attribution_labels" category:"experimental"` + MaxCostAttributionPerUser int `yaml:"max_cost_attribution_per_user" json:"max_cost_attribution_per_user" category:"experimental"` + // Ruler defaults and limits. RulerEvaluationDelay model.Duration `yaml:"ruler_evaluation_delay_duration" json:"ruler_evaluation_delay_duration"` RulerTenantShardSize int `yaml:"ruler_tenant_shard_size" json:"ruler_tenant_shard_size"` @@ -282,7 +286,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.OutOfOrderBlocksExternalLabelEnabled, "ingester.out-of-order-blocks-external-label-enabled", false, "Whether the shipper should label out-of-order blocks with an external label before uploading them. Setting this label will compact out-of-order blocks separately from non-out-of-order blocks") f.StringVar(&l.SeparateMetricsGroupLabel, "validation.separate-metrics-group-label", "", "Label used to define the group label for metrics separation. For each write request, the group is obtained from the first non-empty group label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'group' label with group label's value. Currently applies to the following metrics: cortex_discarded_samples_total") - + f.Var(&l.CostAttributionLabels, "validation.cost-attribution-labels", "List of labels used to define the cost attribution. This label will be included in the specified distributor and ingester metrics for each write request, allowing them to be distinguished by the label. The label applies to the following metrics: cortex_distributor_received_samples_total, cortex_ingester_active_series and cortex_discarded_samples_attribution_total. Set to an empty string to disable cost attribution.") + f.IntVar(&l.MaxCostAttributionPerUser, "validation.max-cost-attribution-per-user", 0, "Maximum number of cost attribution labels allowed per user.") f.IntVar(&l.MaxChunksPerQuery, MaxChunksPerQueryFlag, 2e6, "Maximum number of chunks that can be fetched in a single query from ingesters and store-gateways. This limit is enforced in the querier, ruler and store-gateway. 0 to disable.") f.Float64Var(&l.MaxEstimatedChunksPerQueryMultiplier, MaxEstimatedChunksPerQueryMultiplierFlag, 0, "Maximum number of chunks estimated to be fetched in a single query from ingesters and store-gateways, as a multiple of -"+MaxChunksPerQueryFlag+". This limit is enforced in the querier. Must be greater than or equal to 1, or 0 to disable.") f.IntVar(&l.MaxFetchedSeriesPerQuery, MaxSeriesPerQueryFlag, 0, "The maximum number of unique series for which a query can fetch samples from ingesters and store-gateways. This limit is enforced in the querier, ruler and store-gateway. 0 to disable") @@ -418,7 +423,6 @@ func (l *Limits) unmarshal(decode func(any) error) error { return err } l.extensions = getExtensions() - return l.validate() } @@ -770,6 +774,14 @@ func (o *Overrides) SeparateMetricsGroupLabel(userID string) string { return o.getOverridesForUser(userID).SeparateMetricsGroupLabel } +func (o *Overrides) CostAttributionLabel(userID string) []string { + return o.getOverridesForUser(userID).CostAttributionLabels +} + +func (o *Overrides) MaxCostAttributionPerUser(userID string) int { + return o.getOverridesForUser(userID).MaxCostAttributionPerUser +} + // IngestionTenantShardSize returns the ingesters shard size for a given user. func (o *Overrides) IngestionTenantShardSize(userID string) int { return o.getOverridesForUser(userID).IngestionTenantShardSize