Skip to content

Commit

Permalink
POC: Cost Attribution
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Sep 25, 2024
1 parent 2a0738f commit 4ee5004
Show file tree
Hide file tree
Showing 19 changed files with 271 additions and 116 deletions.
22 changes: 22 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -4050,6 +4050,28 @@
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "cost_attribution_label",
"required": false,
"desc": "Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "validation.cost-attribution-label",
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_cost_attribution_per_user",
"required": false,
"desc": "Maximum number of cost attributions per user. 0 to disable the limit.",
"fieldValue": null,
"fieldDefaultValue": 200,
"fieldFlag": "validation.max-cost-attribution-per-user",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "max_fetched_chunks_per_query",
Expand Down
4 changes: 4 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -3067,10 +3067,14 @@ Usage of ./cmd/mimir/mimir:
Enable anonymous usage reporting. (default true)
-usage-stats.installation-mode string
Installation mode. Supported values: custom, helm, jsonnet. (default "custom")
-validation.cost-attribution-label string
[experimental] Label used to define the cost attribution label. For each write request, the cost attribution is obtained from the first non-empty cost attribution label from the first timeseries in the incoming list of timeseries. Specific distributor and ingester metrics will be further separated adding a 'cost' label with cost label's value. Currently applies to the following metrics: cortex_discarded_samples_total
-validation.create-grace-period duration
Controls how far into the future incoming samples and exemplars are accepted compared to the wall clock. Any sample or exemplar will be rejected if its timestamp is greater than '(now + creation_grace_period)'. This configuration is enforced in the distributor and ingester. (default 10m)
-validation.enforce-metadata-metric-name
Enforce every metadata has a metric name. (default true)
-validation.max-cost-attribution-per-user int
[experimental] Maximum number of cost attributions per user. 0 to disable the limit. (default 200)
-validation.max-label-names-per-series int
Maximum number of label names per series. (default 30)
-validation.max-length-label-name int
Expand Down
22 changes: 16 additions & 6 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
receivedSamples: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_received_samples_total",
Help: "The total number of received samples, excluding rejected and deduped samples.",
}, []string{"user"}),
}, []string{"user", "attrib"}),
receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_received_exemplars_total",
Help: "The total number of received exemplars, excluding rejected and deduped exemplars.",
Expand Down Expand Up @@ -640,7 +640,7 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
d.HATracker.cleanupHATrackerMetricsForUser(userID)

d.receivedRequests.DeleteLabelValues(userID)
d.receivedSamples.DeleteLabelValues(userID)
d.receivedSamples.DeletePartialMatch(prometheus.Labels{"user": userID})
d.receivedExemplars.DeleteLabelValues(userID)
d.receivedMetadata.DeleteLabelValues(userID)
d.incomingRequests.DeleteLabelValues(userID)
Expand Down Expand Up @@ -1414,7 +1414,7 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
return err
}

d.updateReceivedMetrics(req, userID)
d.updateReceivedMetrics(req, userID, d.limits.CostAttributionLabel(userID))

if len(req.Timeseries) == 0 && len(req.Metadata) == 0 {
return nil
Expand Down Expand Up @@ -1645,15 +1645,25 @@ func tokenForMetadata(userID string, metricName string) uint32 {
return mimirpb.ShardByMetricName(userID, metricName)
}

func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string) {
func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string, costAttributionLabel string) {
var receivedSamples, receivedExemplars, receivedMetadata int
costAttribution := make(map[string]int)
for _, ts := range req.Timeseries {
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
if costAttributionLabel != "" {
attribution := mimirpb.FromLabelAdaptersToLabels(ts.Labels).Get(costAttributionLabel)
costAttribution[attribution]++
}
}
receivedMetadata = len(req.Metadata)

d.receivedSamples.WithLabelValues(userID).Add(float64(receivedSamples))
if costAttributionLabel != "" {
for lv, count := range costAttribution {
d.receivedSamples.WithLabelValues(userID, lv).Add(float64(count))
}
} else {
d.receivedSamples.WithLabelValues(userID, "").Add(float64(receivedSamples))
}
d.receivedExemplars.WithLabelValues(userID).Add(float64(receivedExemplars))
d.receivedMetadata.WithLabelValues(userID).Add(float64(receivedMetadata))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_ingest_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func TestDistributor_Push_ShouldSupportIngestStorage(t *testing.T) {
# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{user="user"} 5
cortex_distributor_received_samples_total{attrib="", user="user"} 5
# HELP cortex_distributor_metadata_in_total The total number of metadata the have come in to the distributor, including rejected.
# TYPE cortex_distributor_metadata_in_total counter
Expand Down
13 changes: 6 additions & 7 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,8 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
"cortex_distributor_latest_seen_sample_timestamp_seconds",
}

d.receivedSamples.WithLabelValues("userA").Add(5)
d.receivedSamples.WithLabelValues("userB").Add(10)
d.receivedSamples.WithLabelValues("userA", "").Add(5)
d.receivedSamples.WithLabelValues("userB", "").Add(10)
d.receivedExemplars.WithLabelValues("userA").Add(5)
d.receivedExemplars.WithLabelValues("userB").Add(10)
d.receivedMetadata.WithLabelValues("userA").Add(5)
Expand Down Expand Up @@ -399,8 +399,8 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{user="userA"} 5
cortex_distributor_received_samples_total{user="userB"} 10
cortex_distributor_received_samples_total{attrib="",user="userA"} 5
cortex_distributor_received_samples_total{attrib="",user="userB"} 10
# HELP cortex_distributor_received_exemplars_total The total number of received exemplars, excluding rejected and deduped exemplars.
# TYPE cortex_distributor_received_exemplars_total counter
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{user="userB"} 10
cortex_distributor_received_samples_total{attrib="",user="userB"} 10
# HELP cortex_distributor_received_exemplars_total The total number of received exemplars, excluding rejected and deduped exemplars.
# TYPE cortex_distributor_received_exemplars_total counter
Expand Down Expand Up @@ -6843,7 +6843,7 @@ func TestDistributor_MetricsWithRequestModifications(t *testing.T) {
cortex_distributor_received_requests_total{user="%s"} %d
# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{user="%s"} %d
cortex_distributor_received_samples_total{attrib="",user="%s"} %d
# HELP cortex_distributor_received_exemplars_total The total number of received exemplars, excluding rejected and deduped exemplars.
# TYPE cortex_distributor_received_exemplars_total counter
cortex_distributor_received_exemplars_total{user="%s"} %d
Expand Down Expand Up @@ -6944,7 +6944,6 @@ func TestDistributor_MetricsWithRequestModifications(t *testing.T) {
exemplarsIn: 10,
metadataIn: 10,
receivedRequests: 1,
receivedSamples: 0,
receivedExemplars: 0,
receivedMetadata: 10})

Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ func removeNonASCIIChars(in string) (out string) {
// The returned error may retain the provided series labels.
func validateLabels(m *sampleValidationMetrics, cfg labelValidationConfig, userID, group string, ls []mimirpb.LabelAdapter, skipLabelNameValidation bool) error {
unsafeMetricName, err := extract.UnsafeMetricNameFromLabelAdapters(ls)

if err != nil {
m.missingMetricName.WithLabelValues(userID, group).Inc()
return errors.New(noMetricNameMsgFormat)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/activeseries/active_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestIsLabelValueActive(t *testing.T) {
labels.FromStrings("a", "5"),
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")

memPostings := index.NewMemPostings()
for i, l := range series {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestNativeHistogramPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestNativeHistogramPostings_ExpandWithBucketCount(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestNativeHistogramPostings_SeekSkipsNonNative(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestNativeHistogramPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestNativeHistogramPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/activeseries/active_postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestPostings_Expand(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -56,7 +56,7 @@ func TestPostings_Seek(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestPostings_SeekToEnd(t *testing.T) {
}
allStorageRefs := []storage.SeriesRef{1, 2, 3, 4, 5}
storagePostings := index.NewListPostings(allStorageRefs)
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl))
activeSeries := NewActiveSeries(&Matchers{}, time.Duration(ttl), "foo", "")

// Update each series at a different time according to its index.
for i := range allStorageRefs {
Expand Down
Loading

0 comments on commit 4ee5004

Please sign in to comment.