Skip to content

Commit

Permalink
remove changes to metrics and adding extra
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Oct 3, 2024
1 parent dd7e2a4 commit 046046f
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 94 deletions.
27 changes: 12 additions & 15 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
receivedSamples: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_received_samples_total",
Help: "The total number of received samples, excluding rejected and deduped samples.",
}, []string{"user", "attrib"}),
}, []string{"user"}),
receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_received_exemplars_total",
Help: "The total number of received exemplars, excluding rejected and deduped exemplars.",
Expand Down Expand Up @@ -645,6 +645,7 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
d.HATracker.cleanupHATrackerMetricsForUser(userID)

d.receivedRequests.DeleteLabelValues(userID)
d.receivedSamples.DeleteLabelValues(userID)
d.receivedExemplars.DeleteLabelValues(userID)
d.receivedMetadata.DeleteLabelValues(userID)
d.incomingRequests.DeleteLabelValues(userID)
Expand All @@ -661,7 +662,6 @@ func (d *Distributor) cleanupInactiveUser(userID string) {

filter := prometheus.Labels{"user": userID}
d.dedupedSamples.DeletePartialMatch(filter)
d.receivedSamples.DeletePartialMatch(filter)
d.discardedSamplesTooManyHaClusters.DeletePartialMatch(filter)
d.discardedSamplesRateLimited.DeletePartialMatch(filter)
d.discardedRequestsRateLimited.DeleteLabelValues(userID)
Expand All @@ -680,11 +680,6 @@ func (d *Distributor) RemoveGroupMetricsForUser(userID, group string) {
d.sampleValidationMetrics.deleteUserMetricsForGroup(userID, group)
}

func (d *Distributor) RemoveAttributionMetricsForUser(userID, attribution string) {
d.receivedSamples.DeleteLabelValues(userID, attribution)
//TODO @ying: Remove attribution metrics
}

// Called after distributor is asked to stop via StopAsync.
func (d *Distributor) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
Expand Down Expand Up @@ -1441,7 +1436,7 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {

now := mtime.Now()

d.updateReceivedMetrics(req, userID, d.limits.CostAttributionLabel(userID), now)
d.updateReceivedMetrics(req, userID, now)

if len(req.Timeseries) == 0 && len(req.Metadata) == 0 {
return nil
Expand Down Expand Up @@ -1672,28 +1667,30 @@ func tokenForMetadata(userID string, metricName string) uint32 {
return mimirpb.ShardByMetricName(userID, metricName)
}

func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, costAttributionLabel string, now time.Time) {
func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, now time.Time) {
var receivedSamples, receivedExemplars, receivedMetadata int
costAttributionSize := 0
if costAttributionLabel != "" {
costAttributionSize = d.limits.MaxCostAttributionPerUser(userID)
caEnabled := d.costAttributionSvc != nil && d.costAttributionSvc.EnabledForUser(userID)
if caEnabled {
costAttributionSize = d.costAttributionSvc.GetUserAttributionLimit(userID)
}
costAttribution := make(map[string]int, costAttributionSize)

for _, ts := range req.Timeseries {
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
if costAttributionLabel != "" {
if caEnabled {
attribution := d.costAttributionSvc.UpdateAttributionTimestamp(userID, mimirpb.FromLabelAdaptersToLabels(ts.Labels), now)
costAttribution[attribution]++
}
}
receivedMetadata = len(req.Metadata)
if costAttributionLabel != "" {
if caEnabled {
for lv, count := range costAttribution {
d.receivedSamples.WithLabelValues(userID, lv).Add(float64(count))
d.costAttributionSvc.IncrementReceivedSamples(userID, lv, float64(count))
}
} else {
d.receivedSamples.WithLabelValues(userID, "").Add(float64(receivedSamples))
d.receivedSamples.WithLabelValues(userID).Add(float64(receivedSamples))
}
d.receivedExemplars.WithLabelValues(userID).Add(float64(receivedExemplars))
d.receivedMetadata.WithLabelValues(userID).Add(float64(receivedMetadata))
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func TestDistributor_Push_ShouldSupportIngestStorage(t *testing.T) {
# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{attrib="", user="user"} 5
cortex_distributor_received_samples_total{ user="user"} 5
# HELP cortex_distributor_metadata_in_total The total number of metadata the have come in to the distributor, including rejected.
# TYPE cortex_distributor_metadata_in_total counter
Expand Down
12 changes: 6 additions & 6 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
"cortex_distributor_label_values_with_newlines_total",
}

d.receivedSamples.WithLabelValues("userA", "").Add(5)
d.receivedSamples.WithLabelValues("userB", "").Add(10)
d.receivedSamples.WithLabelValues("userA").Add(5)
d.receivedSamples.WithLabelValues("userB").Add(10)
d.receivedExemplars.WithLabelValues("userA").Add(5)
d.receivedExemplars.WithLabelValues("userB").Add(10)
d.receivedMetadata.WithLabelValues("userA").Add(5)
Expand Down Expand Up @@ -401,8 +401,8 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{attrib="",user="userA"} 5
cortex_distributor_received_samples_total{attrib="",user="userB"} 10
cortex_distributor_received_samples_total{user="userA"} 5
cortex_distributor_received_samples_total{user="userB"} 10
# HELP cortex_distributor_received_exemplars_total The total number of received exemplars, excluding rejected and deduped exemplars.
# TYPE cortex_distributor_received_exemplars_total counter
Expand Down Expand Up @@ -443,7 +443,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{attrib="",user="userB"} 10
cortex_distributor_received_samples_total{user="userB"} 10
# HELP cortex_distributor_received_exemplars_total The total number of received exemplars, excluding rejected and deduped exemplars.
# TYPE cortex_distributor_received_exemplars_total counter
Expand Down Expand Up @@ -6852,7 +6852,7 @@ func TestDistributor_MetricsWithRequestModifications(t *testing.T) {
cortex_distributor_received_requests_total{user="%s"} %d
# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{attrib="",user="%s"} %d
cortex_distributor_received_samples_total{user="%s"} %d
# HELP cortex_distributor_received_exemplars_total The total number of received exemplars, excluding rejected and deduped exemplars.
# TYPE cortex_distributor_received_exemplars_total counter
cortex_distributor_received_exemplars_total{user="%s"} %d
Expand Down
18 changes: 8 additions & 10 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,14 +788,14 @@ func (i *Ingester) updateActiveSeries(now time.Time) {
allActive, activeMatching, allActiveHistograms, activeMatchingHistograms, allActiveBuckets, activeMatchingBuckets := userDB.activeSeries.ActiveWithMatchers()
i.metrics.activeSeriesLoading.DeleteLabelValues(userID)
if allActive > 0 {
costAttribLabel := i.limits.CostAttributionLabel(userID)
if costAttribLabel != "" {
caEnabled := i.costAttributionSvc != nil && i.costAttributionSvc.EnabledForUser(userID)
if caEnabled {
labelAttributions := userDB.activeSeries.ActiveByAttributionValue()
for label, count := range labelAttributions {
i.metrics.activeSeriesPerUser.WithLabelValues(userID, label).Set(float64(count))
i.costAttributionSvc.SetActiveSeries(userID, label, float64(count))
}
} else {
i.metrics.activeSeriesPerUser.WithLabelValues(userID, "").Set(float64(allActive))
i.metrics.activeSeriesPerUser.WithLabelValues(userID).Set(float64(allActive))
}
} else {
i.metrics.activeSeriesPerUser.DeletePartialMatch(prometheus.Labels{"user": userID})
Expand Down Expand Up @@ -1283,8 +1283,10 @@ func (i *Ingester) updateMetricsFromPushStats(userID string, group string, stats
db.ingestedAPISamples.Add(int64(stats.succeededSamplesCount))
}
}
for label, count := range stats.failedSamplesAttribution {
discarded.samplesPerAttribution.WithLabelValues(userID, label).Add(float64(count))
if i.costAttributionSvc != nil && i.costAttributionSvc.EnabledForUser(userID) {
for label, count := range stats.failedSamplesAttribution {
i.costAttributionSvc.IncrementDiscardedSamples(userID, label, float64(count))
}
}
}

Expand Down Expand Up @@ -3429,10 +3431,6 @@ func (i *Ingester) RemoveGroupMetricsForUser(userID, group string) {
i.metrics.deletePerGroupMetricsForUser(userID, group)
}

func (i *Ingester) RemoveAttributionMetricsForUser(userID, attribution string) {
i.metrics.deletePerAttributionMetricsForUser(userID, attribution)
}

// TransferOut implements ring.FlushTransferer.
func (i *Ingester) TransferOut(_ context.Context) error {
return ring.ErrTransferDisabled
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestIngester_Start(t *testing.T) {
return testutil.GatherAndCompare(reg, strings.NewReader(fmt.Sprintf(`
# HELP cortex_ingester_active_series Number of currently active series per user.
# TYPE cortex_ingester_active_series gauge
cortex_ingester_active_series{attrib="",user="%s"} 1
cortex_ingester_active_series{user="%s"} 1
# HELP cortex_ingester_owned_series Number of currently owned series per user.
# TYPE cortex_ingester_owned_series gauge
Expand Down
Loading

0 comments on commit 046046f

Please sign in to comment.