Skip to content

Commit

Permalink
make tracker and tracker group private
Browse files Browse the repository at this point in the history
  • Loading branch information
ying-jeanne committed Oct 9, 2024
1 parent 0db7db7 commit e55f83c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pkg/costattribution/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
type Manager struct {
services.Service
logger log.Logger
attributionTracker *AttributionTrackerGroup
attributionTracker *attributionTrackerGroup
inactiveTimeout time.Duration
invalidValue string
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/costattribution/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"go.uber.org/atomic"
)

type Tracker struct {
type tracker struct {
trackedLabel string
attributionLimit int
activeSeriesPerUserAttribution *prometheus.GaugeVec
Expand All @@ -17,21 +17,21 @@ type Tracker struct {
coolDownDeadline *atomic.Int64
}

func (t *Tracker) cleanupTrackerAttribution(userID, attribution string) {
func (t *tracker) cleanupTrackerAttribution(userID, attribution string) {
t.activeSeriesPerUserAttribution.DeleteLabelValues(userID, attribution)
t.receivedSamplesAttribution.DeleteLabelValues(userID, attribution)
t.discardedSampleAttribution.DeleteLabelValues(userID, attribution)
}

func (t *Tracker) cleanupTracker(userID string) {
func (t *tracker) cleanupTracker(userID string) {
filter := prometheus.Labels{"user": userID}
t.activeSeriesPerUserAttribution.DeletePartialMatch(filter)
t.receivedSamplesAttribution.DeletePartialMatch(filter)
t.discardedSampleAttribution.DeletePartialMatch(filter)
}

func newTracker(trackedLabel string, limit int) (*Tracker, error) {
m := &Tracker{
func newTracker(trackedLabel string, limit int) (*tracker, error) {
m := &tracker{
trackedLabel: trackedLabel,
attributionLimit: limit,
attributionTimestamps: map[string]*atomic.Int64{},
Expand All @@ -55,13 +55,13 @@ func newTracker(trackedLabel string, limit int) (*Tracker, error) {
return m, nil
}

func (t *Tracker) Collect(out chan<- prometheus.Metric) {
func (t *tracker) Collect(out chan<- prometheus.Metric) {
t.activeSeriesPerUserAttribution.Collect(out)
t.receivedSamplesAttribution.Collect(out)
t.discardedSampleAttribution.Collect(out)
}

// Describe implements prometheus.Collector.
func (t *Tracker) Describe(chan<- *prometheus.Desc) {
func (t *tracker) Describe(chan<- *prometheus.Desc) {
// this is an unchecked collector
}
24 changes: 12 additions & 12 deletions pkg/costattribution/tracker_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,24 @@ import (
"github.com/grafana/mimir/pkg/util/validation"
)

type AttributionTrackerGroup struct {
type attributionTrackerGroup struct {
mu sync.RWMutex
trackersByUserID map[string]*Tracker
trackersByUserID map[string]*tracker
limits *validation.Overrides
cooldownTimeout time.Duration
}

func newAttributionTrackerGroup(limits *validation.Overrides, cooldownTimeout time.Duration) *AttributionTrackerGroup {
return &AttributionTrackerGroup{
trackersByUserID: make(map[string]*Tracker),
func newAttributionTrackerGroup(limits *validation.Overrides, cooldownTimeout time.Duration) *attributionTrackerGroup {
return &attributionTrackerGroup{
trackersByUserID: make(map[string]*tracker),
limits: limits,
mu: sync.RWMutex{},
cooldownTimeout: cooldownTimeout,
}
}

// getUserAttributionLabelFromCache is read user attribution label through cache, if not found, get from config
func (atg *AttributionTrackerGroup) getUserAttributionLabelFromCache(userID string) string {
func (atg *attributionTrackerGroup) getUserAttributionLabelFromCache(userID string) string {
atg.mu.RLock()
defer atg.mu.RUnlock()
// if the user is not enabled for cost attribution, we don't need to track the attribution
Expand All @@ -41,7 +41,7 @@ func (atg *AttributionTrackerGroup) getUserAttributionLabelFromCache(userID stri

// getUserAttributionLimitFromCache is read per user attribution limit through cache, if not found, get from config
// always call only when the user is enabled for cost attribution
func (atg *AttributionTrackerGroup) getUserAttributionLimitFromCache(userID string) int {
func (atg *attributionTrackerGroup) getUserAttributionLimitFromCache(userID string) int {
atg.mu.Lock()
defer atg.mu.Unlock()
if _, exists := atg.trackersByUserID[userID]; !exists {
Expand All @@ -51,7 +51,7 @@ func (atg *AttributionTrackerGroup) getUserAttributionLimitFromCache(userID stri
}

// deleteUserTracerFromCache is delete user from cache since the user is disabled for cost attribution
func (atg *AttributionTrackerGroup) deleteUserTracerFromCache(userID string) {
func (atg *attributionTrackerGroup) deleteUserTracerFromCache(userID string) {
atg.mu.Lock()
defer atg.mu.Unlock()
if _, exists := atg.trackersByUserID[userID]; !exists {
Expand All @@ -66,7 +66,7 @@ func (atg *AttributionTrackerGroup) deleteUserTracerFromCache(userID string) {
// if the label has changed, we will create a new tracker, and won't update the timestamp
// if the label has not changed, we will update the attribution timestamp
// if the limit is set to 0 or label is empty, we skip the update
func (atg *AttributionTrackerGroup) updateAttributionCacheForUser(userID, label, attribution string, now time.Time) {
func (atg *attributionTrackerGroup) updateAttributionCacheForUser(userID, label, attribution string, now time.Time) {
// If the limit is set to 0, we don't need to track the attribution, clean the cache if exists
if atg.limits.CostAttributionLabel(userID) == "" || atg.limits.MaxCostAttributionPerUser(userID) <= 0 {
atg.deleteUserTracerFromCache(userID)
Expand Down Expand Up @@ -96,7 +96,7 @@ func (atg *AttributionTrackerGroup) updateAttributionCacheForUser(userID, label,
atg.trackersByUserID[userID].attributionTimestamps[attribution] = atomic.NewInt64(ts)
}

func (atg *AttributionTrackerGroup) purgeInactiveAttributionsForUser(userID string, deadline int64) []string {
func (atg *attributionTrackerGroup) purgeInactiveAttributionsForUser(userID string, deadline int64) []string {
atg.mu.RLock()
var inactiveAttributions []string
if atg.trackersByUserID[userID] == nil {
Expand Down Expand Up @@ -145,7 +145,7 @@ func (atg *AttributionTrackerGroup) purgeInactiveAttributionsForUser(userID stri
return inactiveAttributions
}

func (atg *AttributionTrackerGroup) purgeInactiveAttributions(inactiveTimeout time.Duration) {
func (atg *attributionTrackerGroup) purgeInactiveAttributions(inactiveTimeout time.Duration) {
atg.mu.RLock()
userIDs := make([]string, 0, len(atg.trackersByUserID))
for userID := range atg.trackersByUserID {
Expand All @@ -167,7 +167,7 @@ func (atg *AttributionTrackerGroup) purgeInactiveAttributions(inactiveTimeout ti
}
}

func (atg *AttributionTrackerGroup) attributionLimitExceeded(userID, attribution string, now time.Time) bool {
func (atg *attributionTrackerGroup) attributionLimitExceeded(userID, attribution string, now time.Time) bool {
// if we are still at the cooldown period, we will consider the limit reached
atg.mu.RLock()
defer atg.mu.RUnlock()
Expand Down

0 comments on commit e55f83c

Please sign in to comment.