Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MVP: cost attribution #10108

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix doc
  • Loading branch information
ying-jeanne committed Dec 9, 2024
commit f122ac32a3d5af8c018efe268a697fb2e17e353a
43 changes: 43 additions & 0 deletions docs/sources/mimir/configure/configuration-parameters/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,18 @@ overrides_exporter:
# (experimental) Enables optimized marshaling of timeseries.
# CLI flag: -timeseries-unmarshal-caching-optimization-enabled
[timeseries_unmarshal_caching_optimization_enabled: <boolean> | default = true]

# (experimental) Time interval at which inactive cost attributions are evicted
# from the counter, ensuring they are not included in the cost attribution
# cardinality per user limit.
# CLI flag: -cost-attribution.eviction-interval
[cost_attribution_eviction_interval: <duration> | default = 20m]

# (experimental) Defines a custom path for the registry. When specified, Mimir
# will expose cost attribution metrics through this custom path, if not
# specified, cost attribution metrics won't be exposed.
# CLI flag: -cost-attribution.registry-path
[cost_attribution_registry_path: <string> | default = ""]
```

### common
Expand Down Expand Up @@ -3572,6 +3584,37 @@ The `limits` block configures default and per-tenant limits imposed by component
# CLI flag: -querier.active-series-results-max-size-bytes
[active_series_results_max_size_bytes: <int> | default = 419430400]

# (experimental) List of labels used to define cost attribution. These labels
# will be included in the specified distributor and ingester metrics for each
# write request, allowing them to be distinguished by the label. The label
# applies to the following metrics:
# cortex_distributor_attributed_received_samples_total,
# cortex_ingester_attributed_active_series, and
# cortex_attributed_discarded_samples_total. Set to an empty string to disable
# cost attribution.
# CLI flag: -validation.cost-attribution-labels
[cost_attribution_labels: <string> | default = ""]

# (experimental) Maximum number of cost attribution labels allowed per user. Set
# to 0 to disable.
# CLI flag: -validation.max-cost-attribution-labels-per-user
[max_cost_attribution_labels_per_user: <int> | default = 2]

# (experimental) Maximum cardinality of cost attribution labels allowed per
# user.
# CLI flag: -validation.max-cost-attribution-cardinality-per-user
[max_cost_attribution_cardinality_per_user: <int> | default = 10000]

# (experimental) Cooldown period for cost attribution labels. Specifies the
# duration the cost attribution remains in overflow before attempting a reset.
# If the cardinality remains above the limit after this period, the system will
# stay in overflow mode and extend the cooldown. Setting this value to 0
# disables the cooldown, causing the system to continuously check whether the
# cardinality has dropped below the limit. A reset will occur once the
# cardinality falls below the limit.
# CLI flag: -validation.cost-attribution-cooldown
[cost_attribution_cooldown: <duration> | default = 0s]

# Duration to delay the evaluation of rules to ensure the underlying metrics
# have been pushed.
# CLI flag: -ruler.evaluation-delay-duration
Expand Down
81 changes: 58 additions & 23 deletions pkg/costattribution/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,37 +27,58 @@ type Manager struct {
limits *validation.Overrides

// mu protects the trackersByUserID map
mtx sync.RWMutex
trackersByUserID map[string]*Tracker
reg *prometheus.Registry
mtx sync.RWMutex
trackersByUserID map[string]*Tracker
reg *prometheus.Registry
cleanupInterval time.Duration
metricsExportInterval time.Duration
}

// NewManager creates a new cost attribution manager. which is responsible for managing the cost attribution of series.
// It will clean up inactive series and update the cost attribution of series every 3 minutes.
func NewManager(cleanupInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides, reg *prometheus.Registry) (*Manager, error) {
func NewManager(cleanupInterval, exportInterval, inactiveTimeout time.Duration, logger log.Logger, limits *validation.Overrides, reg *prometheus.Registry) (*Manager, error) {
m := &Manager{
trackersByUserID: make(map[string]*Tracker),
limits: limits,
mtx: sync.RWMutex{},
inactiveTimeout: inactiveTimeout,
logger: logger,
reg: reg,
trackersByUserID: make(map[string]*Tracker),
limits: limits,
mtx: sync.RWMutex{},
inactiveTimeout: inactiveTimeout,
logger: logger,
reg: reg,
cleanupInterval: cleanupInterval,
metricsExportInterval: exportInterval,
}

m.Service = services.NewTimerService(cleanupInterval, nil, m.iteration, nil).WithName("cost attribution manager")
m.Service = services.NewBasicService(nil, m.running, nil).WithName("cost attribution manager")
if err := reg.Register(m); err != nil {
return nil, err
}
return m, nil
}

func (m *Manager) iteration(_ context.Context) error {
func (m *Manager) running(ctx context.Context) error {
if m == nil {
return nil
}
currentTime := time.Now()
m.purgeInactiveAttributionsUntil(currentTime.Add(-m.inactiveTimeout).Unix())
return nil
t := time.NewTicker(m.cleanupInterval)
defer t.Stop()

tMupdate := time.NewTicker(m.metricsExportInterval)
defer tMupdate.Stop()

for {
select {
case <-t.C:
err := m.purgeInactiveAttributionsUntil(currentTime.Add(-m.inactiveTimeout).Unix())
if err != nil {
return err
}
case <-tMupdate.C:
m.updateMetrics()
case <-ctx.Done():
return nil
}
}
}

// EnabledForUser returns true if the cost attribution is enabled for the user
Expand Down Expand Up @@ -109,14 +130,14 @@ func (m *Manager) deleteUserTracer(userID string) {
defer m.mtx.Unlock()
if _, exists := m.trackersByUserID[userID]; exists {
// clean up tracker metrics and delete the tracker
m.trackersByUserID[userID].cleanupTracker(userID)
m.trackersByUserID[userID].cleanupTracker()
delete(m.trackersByUserID, userID)
}
}

func (m *Manager) purgeInactiveAttributionsUntil(deadline int64) {
func (m *Manager) purgeInactiveAttributionsUntil(deadline int64) error {
if m == nil {
return
return nil
}
// Get all userIDs from the map
m.mtx.RLock()
Expand All @@ -135,13 +156,14 @@ func (m *Manager) purgeInactiveAttributionsUntil(deadline int64) {
}

// get all inactive attributions for the user and clean up the tracker
inactiveObs := m.purgeInactiveObservationsForUser(userID, deadline)
for _, ob := range inactiveObs {
m.trackersByUserID[userID].cleanupTrackerAttribution(ob.lvalues)
invalidKeys := m.getInactiveObservationsForUser(userID, deadline)

cat := m.TrackerForUser(userID)
for _, key := range invalidKeys {
cat.cleanupTrackerAttribution(key)
}

// if the tracker is no longer overflowed, and it is currently in overflow state, check the cooldown and create new tracker
cat := m.TrackerForUser(userID)
if cat != nil && cat.cooldownUntil != nil && cat.cooldownUntil.Load() < deadline {
if len(cat.observed) <= cat.MaxCardinality() {
m.deleteUserTracer(userID)
Expand All @@ -150,6 +172,7 @@ func (m *Manager) purgeInactiveAttributionsUntil(deadline int64) {
}
}
}
return nil
}

// compare two sorted string slices
Expand All @@ -166,7 +189,7 @@ func CompareCALabels(a, b []string) bool {
return true
}

func (m *Manager) purgeInactiveObservationsForUser(userID string, deadline int64) []*Observation {
func (m *Manager) getInactiveObservationsForUser(userID string, deadline int64) []string {
cat := m.TrackerForUser(userID)
if cat == nil {
return nil
Expand Down Expand Up @@ -196,5 +219,17 @@ func (m *Manager) purgeInactiveObservationsForUser(userID string, deadline int64
}
}

return cat.PurgeInactiveObservations(deadline)
return cat.GetInactiveObservations(deadline)
}

func (m *Manager) updateMetrics() {
if m == nil {
return
}

m.mtx.RLock()
defer m.mtx.RUnlock()
for _, tracker := range m.trackersByUserID {
tracker.updateMetrics()
}
}
28 changes: 18 additions & 10 deletions pkg/costattribution/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func newTestManager() *Manager {
logger := log.NewNopLogger()
limits, _ := getMockLimits(0)
reg := prometheus.NewRegistry()
manager, err := NewManager(5*time.Second, 10*time.Second, logger, limits, reg)
manager, err := NewManager(5*time.Second, time.Second, 10*time.Second, logger, limits, reg)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -89,11 +89,14 @@ func Test_CreateDeleteTracker(t *testing.T) {
expectedMetrics := `
# HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution.
# TYPE cortex_discarded_attributed_samples_total counter
cortex_discarded_attributed_samples_total{reason="invalid-metrics-name",team="foo",tenant="user1",tracker="custom_attribution"} 1
cortex_discarded_attributed_samples_total{reason="invalid-metrics-name",team="foo",tenant="user1",tracker="cost-attribution"} 1
# HELP cortex_received_attributed_samples_total The total number of samples that were received per attribution.
# TYPE cortex_received_attributed_samples_total counter
cortex_received_attributed_samples_total{department="foo",service="dodo",tenant="user3",tracker="custom_attribution"} 1
cortex_received_attributed_samples_total{department="foo",service="dodo",tenant="user3",tracker="cost-attribution"} 1
`

// manually trigger metrics update to ensure they are exported
manager.updateMetrics()
assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), "cortex_discarded_attributed_samples_total", "cortex_received_attributed_samples_total"))
})

Expand All @@ -102,8 +105,9 @@ func Test_CreateDeleteTracker(t *testing.T) {
expectedMetrics := `
# HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution.
# TYPE cortex_discarded_attributed_samples_total counter
cortex_discarded_attributed_samples_total{reason="invalid-metrics-name",team="foo",tenant="user1",tracker="custom_attribution"} 1
cortex_discarded_attributed_samples_total{reason="invalid-metrics-name",team="foo",tenant="user1",tracker="cost-attribution"} 1
`
manager.updateMetrics()
assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), "cortex_discarded_attributed_samples_total"))
})

Expand All @@ -115,8 +119,9 @@ func Test_CreateDeleteTracker(t *testing.T) {
expectedMetrics := `
# HELP cortex_received_attributed_samples_total The total number of samples that were received per attribution.
# TYPE cortex_received_attributed_samples_total counter
cortex_received_attributed_samples_total{department="foo",service="dodo",tenant="user3",tracker="custom_attribution"} 1
cortex_received_attributed_samples_total{department="foo",service="dodo",tenant="user3",tracker="cost-attribution"} 1
`
manager.updateMetrics()
assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), "cortex_received_attributed_samples_total"))
})

Expand All @@ -131,8 +136,9 @@ func Test_CreateDeleteTracker(t *testing.T) {
expectedMetrics := `
# HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution.
# TYPE cortex_discarded_attributed_samples_total counter
cortex_discarded_attributed_samples_total{feature="__missing__",reason="invalid-metrics-name",team="foo",tenant="user3",tracker="custom_attribution"} 1
cortex_discarded_attributed_samples_total{feature="__missing__",reason="invalid-metrics-name",team="foo",tenant="user3",tracker="cost-attribution"} 1
`
manager.updateMetrics()
assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), "cortex_discarded_attributed_samples_total"))
})

Expand All @@ -144,8 +150,9 @@ func Test_CreateDeleteTracker(t *testing.T) {
expectedMetrics := `
# HELP cortex_received_attributed_samples_total The total number of samples that were received per attribution.
# TYPE cortex_received_attributed_samples_total counter
cortex_received_attributed_samples_total{feature="__overflow__",team="__overflow__",tenant="user3",tracker="custom_attribution"} 2
cortex_received_attributed_samples_total{feature="__overflow__",team="__overflow__",tenant="user3",tracker="cost-attribution"} 1
`
manager.updateMetrics()
assert.NoError(t, testutil.GatherAndCompare(manager.reg, strings.NewReader(expectedMetrics), "cortex_received_attributed_samples_total"))
})
}
Expand All @@ -158,6 +165,7 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) {
manager.TrackerForUser("user1").IncrementDiscardedSamples(labels.FromStrings("team", "foo"), 1, "invalid-metrics-name", time.Unix(1, 0))
manager.TrackerForUser("user3").IncrementDiscardedSamples(labels.FromStrings("department", "foo", "service", "bar"), 1, "out-of-window", time.Unix(10, 0))

manager.updateMetrics()
t.Run("Purge before inactive timeout", func(t *testing.T) {
// Run purge at a timestamp that doesn't exceed inactive timeout
manager.purgeInactiveAttributionsUntil(time.Unix(0, 0).Unix())
Expand All @@ -168,8 +176,8 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) {
expectedMetrics := `
# HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution.
# TYPE cortex_discarded_attributed_samples_total counter
cortex_discarded_attributed_samples_total{reason="invalid-metrics-name",team="foo",tenant="user1",tracker="custom_attribution"} 1
cortex_discarded_attributed_samples_total{department="foo",reason="out-of-window",service="bar",tenant="user3",tracker="custom_attribution"} 1
cortex_discarded_attributed_samples_total{reason="invalid-metrics-name",team="foo",tenant="user1",tracker="cost-attribution"} 1
cortex_discarded_attributed_samples_total{department="foo",reason="out-of-window",service="bar",tenant="user3",tracker="cost-attribution"} 1
`
metricNames := []string{
"cortex_discarded_attributed_samples_total",
Expand All @@ -189,7 +197,7 @@ func Test_PurgeInactiveAttributionsUntil(t *testing.T) {
expectedMetrics := `
# HELP cortex_discarded_attributed_samples_total The total number of samples that were discarded per attribution.
# TYPE cortex_discarded_attributed_samples_total counter
cortex_discarded_attributed_samples_total{department="foo",reason="out-of-window",service="bar",tenant="user3",tracker="custom_attribution"} 1
cortex_discarded_attributed_samples_total{department="foo",reason="out-of-window",service="bar",tenant="user3",tracker="cost-attribution"} 1
`
metricNames := []string{
"cortex_discarded_attributed_samples_total",
Expand Down
Loading
Loading