Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(ingester): refactor lock acquisitions related to not_owned series limit functionality #15839

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -1183,7 +1183,7 @@ func (i *instance) updateOwnedStreams(isOwnedStream func(*stream) (bool, error))
}()

var err error
i.streams.WithLock(func() {
i.streams.WithRLock(func() {
i.ownedStreamsSvc.resetStreamCounts()
err = i.streams.ForEach(func(s *stream) (bool, error) {
ownedStream, err := isOwnedStream(s)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) {

ownedStreamSvc := &ownedStreamService{
fixedLimit: atomic.NewInt32(testData.fixedLimit),
ownedStreamCount: testData.ownedStreamCount,
ownedStreamCount: atomic.NewInt64(int64(testData.ownedStreamCount)),
}
strategy := &fixedStrategy{localLimit: testData.calculatedLocalLimit}
limiter := NewLimiter(limits, NilMetrics, strategy, &TenantBasedStrategy{limits: limits})
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
Namespace: constants.Loki,
Name: "ingester_streams_ownership_check_duration_ms",
Help: "Distribution of streams ownership check durations in milliseconds.",
// 100ms to 5s.
Buckets: []float64{100, 250, 350, 500, 750, 1000, 1500, 2000, 5000},
// 1ms -> 16s
Buckets: prometheus.ExponentialBuckets(1, 4, 8),
}),

duplicateLogBytesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Expand Down
28 changes: 15 additions & 13 deletions pkg/ingester/owned_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,26 @@ type ownedStreamService struct {
tenantID string
limiter *Limiter
fixedLimit *atomic.Int32
ownedStreamCount int
ownedStreamCount *atomic.Int64
lock sync.RWMutex
notOwnedStreams map[model.Fingerprint]any
}

func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService {
svc := &ownedStreamService{
tenantID: tenantID,
limiter: limiter,
fixedLimit: atomic.NewInt32(0),
notOwnedStreams: make(map[model.Fingerprint]any),
tenantID: tenantID,
limiter: limiter,
fixedLimit: atomic.NewInt32(0),
ownedStreamCount: atomic.NewInt64(0),
notOwnedStreams: make(map[model.Fingerprint]any),
}

svc.updateFixedLimit()
return svc
}

func (s *ownedStreamService) getOwnedStreamCount() int {
s.lock.RLock()
defer s.lock.RUnlock()
return s.ownedStreamCount
return int(s.ownedStreamCount.Load())
}

func (s *ownedStreamService) updateFixedLimit() (old, new int32) {
Expand All @@ -55,12 +54,15 @@ func (s *ownedStreamService) getFixedLimit() int {
}

func (s *ownedStreamService) trackStreamOwnership(fp model.Fingerprint, owned bool) {
s.lock.Lock()
defer s.lock.Unlock()
// only need to inc the owned count; can use sync atomics.
if owned {
s.ownedStreamCount++
s.ownedStreamCount.Inc()
return
}

// need to update map; lock required
s.lock.Lock()
defer s.lock.Unlock()
notOwnedStreamsMetric.Inc()
s.notOwnedStreams[fp] = nil
}
Expand All @@ -74,13 +76,13 @@ func (s *ownedStreamService) trackRemovedStream(fp model.Fingerprint) {
delete(s.notOwnedStreams, fp)
return
}
s.ownedStreamCount--
s.ownedStreamCount.Dec()
}

func (s *ownedStreamService) resetStreamCounts() {
s.lock.Lock()
defer s.lock.Unlock()
s.ownedStreamCount = 0
s.ownedStreamCount.Store(0)
notOwnedStreamsMetric.Sub(float64(len(s.notOwnedStreams)))
s.notOwnedStreams = make(map[model.Fingerprint]any)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/recalculate_owned_streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func Test_recalculateOwnedStreams_newRecalculateOwnedStreamsIngester(t *testing.
func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T) {
tests := map[string]struct {
featureEnabled bool
expectedOwnedStreamCount int
expectedOwnedStreamCount int64
expectedNotOwnedStreamCount int
}{
"expected streams ownership to be recalculated": {
Expand Down Expand Up @@ -101,7 +101,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T)
mockRing.addMapping(createStream(t, tenant, 100), true)
mockRing.addMapping(createStream(t, tenant, 250), true)

require.Equal(t, 7, tenant.ownedStreamsSvc.ownedStreamCount)
require.Equal(t, int64(7), tenant.ownedStreamsSvc.ownedStreamCount.Load())
require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, 0)

mockTenantsSupplier := &mockTenantsSuplier{tenants: []*instance{tenant}}
Expand All @@ -116,7 +116,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T)
if testData.featureEnabled {
require.Equal(t, 50, tenant.ownedStreamsSvc.getFixedLimit(), "fixed limit must be updated after recalculation")
}
require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount)
require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount.Load())
require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, testData.expectedNotOwnedStreamCount)
})
}
Expand Down
Loading