Skip to content

Commit

Permalink
Reuse pushStats
Browse files Browse the repository at this point in the history
  • Loading branch information
DylanGuedes committed Jan 30, 2025
1 parent 5587402 commit 5743d44
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
4 changes: 2 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,10 +630,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, httpgrpc.Errorf(retStatusCode, "%s", err.Error())
}

if !d.ingestionRateLimiter.AllowN(now, tenantID, validationContext.validationMetrics.lineSize) {
if !d.ingestionRateLimiter.AllowN(now, tenantID, validationContext.validationMetrics.aggregatedPushStats.lineSize) {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.RateLimited)

err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validationContext.validationMetrics.lineCount, validationContext.validationMetrics.lineSize)
err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validationContext.validationMetrics.aggregatedPushStats.lineCount, validationContext.validationMetrics.aggregatedPushStats.lineSize)
d.writeFailuresManager.Log(tenantID, err)
// Return a 429 to indicate to the client they are being rate limited
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "%s", err.Error())
Expand Down
17 changes: 8 additions & 9 deletions pkg/distributor/validation_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,37 @@ import (
"github.com/grafana/loki/v3/pkg/util"
)

type pushRetentionStats struct {
type pushStats struct {
lineSize int
lineCount int
}

type validationMetrics struct {
policyPushStats map[string]map[string]pushRetentionStats // policy -> retentionHours -> lineSize
policyPushStats map[string]map[string]pushStats // policy -> retentionHours -> lineSize
tenantRetentionHours string
lineSize int
lineCount int
aggregatedPushStats pushStats
}

func newValidationMetrics(tenantRetentionHours string) validationMetrics {
return validationMetrics{
policyPushStats: make(map[string]map[string]pushRetentionStats),
policyPushStats: make(map[string]map[string]pushStats),
tenantRetentionHours: tenantRetentionHours,
}
}

func (v *validationMetrics) compute(entry logproto.Entry, retentionHours string, policy string) {
if _, ok := v.policyPushStats[policy]; !ok {
v.policyPushStats[policy] = make(map[string]pushRetentionStats)
v.policyPushStats[policy] = make(map[string]pushStats)
}

if _, ok := v.policyPushStats[policy][retentionHours]; !ok {
v.policyPushStats[policy][retentionHours] = pushRetentionStats{}
v.policyPushStats[policy][retentionHours] = pushStats{}
}

totalEntrySize := util.EntryTotalSize(&entry)

v.lineSize += totalEntrySize
v.lineCount++
v.aggregatedPushStats.lineSize += totalEntrySize
v.aggregatedPushStats.lineCount++

stats := v.policyPushStats[policy][retentionHours]
stats.lineCount++
Expand Down

0 comments on commit 5743d44

Please sign in to comment.