From 2a2b52808c508be96ff61bdde2d6740a741fb053 Mon Sep 17 00:00:00 2001
From: Owen Diehl <ow.diehl@gmail.com>
Date: Tue, 21 Jan 2025 13:39:56 -0800
Subject: [PATCH] perf(ingester): refactor lock acquisitions related to
 `not_owned` series limit functionality (#15839)

---
 pkg/ingester/instance.go                      |  2 +-
 pkg/ingester/limiter_test.go                  |  2 +-
 pkg/ingester/metrics.go                       |  4 +--
 pkg/ingester/owned_streams.go                 | 28 ++++++++++---------
 .../recalculate_owned_streams_test.go         |  6 ++--
 5 files changed, 22 insertions(+), 20 deletions(-)

diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go
index c6afcacfbdfde..80905bff23505 100644
--- a/pkg/ingester/instance.go
+++ b/pkg/ingester/instance.go
@@ -1183,7 +1183,7 @@ func (i *instance) updateOwnedStreams(isOwnedStream func(*stream) (bool, error))
 	}()
 
 	var err error
-	i.streams.WithLock(func() {
+	i.streams.WithRLock(func() {
 		i.ownedStreamsSvc.resetStreamCounts()
 		err = i.streams.ForEach(func(s *stream) (bool, error) {
 			ownedStream, err := isOwnedStream(s)
diff --git a/pkg/ingester/limiter_test.go b/pkg/ingester/limiter_test.go
index b611db4d109e1..78e579187a502 100644
--- a/pkg/ingester/limiter_test.go
+++ b/pkg/ingester/limiter_test.go
@@ -130,7 +130,7 @@ func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) {
 
 			ownedStreamSvc := &ownedStreamService{
 				fixedLimit:       atomic.NewInt32(testData.fixedLimit),
-				ownedStreamCount: testData.ownedStreamCount,
+				ownedStreamCount: atomic.NewInt64(int64(testData.ownedStreamCount)),
 			}
 			strategy := &fixedStrategy{localLimit: testData.calculatedLocalLimit}
 			limiter := NewLimiter(limits, NilMetrics, strategy, &TenantBasedStrategy{limits: limits})
diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go
index ff4db43747676..5f144038bc094 100644
--- a/pkg/ingester/metrics.go
+++ b/pkg/ingester/metrics.go
@@ -318,8 +318,8 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges
 			Namespace: constants.Loki,
 			Name:      "ingester_streams_ownership_check_duration_ms",
 			Help:      "Distribution of streams ownership check durations in milliseconds.",
-			// 100ms to 5s.
-			Buckets: []float64{100, 250, 350, 500, 750, 1000, 1500, 2000, 5000},
+			// 1ms -> 16s
+			Buckets: prometheus.ExponentialBuckets(1, 4, 8),
 		}),
 
 		duplicateLogBytesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
diff --git a/pkg/ingester/owned_streams.go b/pkg/ingester/owned_streams.go
index 3bb729815e718..56c5a77fa768e 100644
--- a/pkg/ingester/owned_streams.go
+++ b/pkg/ingester/owned_streams.go
@@ -21,17 +21,18 @@ type ownedStreamService struct {
 	tenantID         string
 	limiter          *Limiter
 	fixedLimit       *atomic.Int32
-	ownedStreamCount int
+	ownedStreamCount *atomic.Int64
 	lock             sync.RWMutex
 	notOwnedStreams  map[model.Fingerprint]any
 }
 
 func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService {
 	svc := &ownedStreamService{
-		tenantID:        tenantID,
-		limiter:         limiter,
-		fixedLimit:      atomic.NewInt32(0),
-		notOwnedStreams: make(map[model.Fingerprint]any),
+		tenantID:         tenantID,
+		limiter:          limiter,
+		fixedLimit:       atomic.NewInt32(0),
+		ownedStreamCount: atomic.NewInt64(0),
+		notOwnedStreams:  make(map[model.Fingerprint]any),
 	}
 
 	svc.updateFixedLimit()
@@ -39,9 +40,7 @@ func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamServic
 }
 
 func (s *ownedStreamService) getOwnedStreamCount() int {
-	s.lock.RLock()
-	defer s.lock.RUnlock()
-	return s.ownedStreamCount
+	return int(s.ownedStreamCount.Load())
 }
 
 func (s *ownedStreamService) updateFixedLimit() (old, new int32) {
@@ -55,12 +54,15 @@ func (s *ownedStreamService) getFixedLimit() int {
 }
 
 func (s *ownedStreamService) trackStreamOwnership(fp model.Fingerprint, owned bool) {
-	s.lock.Lock()
-	defer s.lock.Unlock()
+	// only need to inc the owned count; can use sync atomics.
 	if owned {
-		s.ownedStreamCount++
+		s.ownedStreamCount.Inc()
 		return
 	}
+
+	// need to update map; lock required
+	s.lock.Lock()
+	defer s.lock.Unlock()
 	notOwnedStreamsMetric.Inc()
 	s.notOwnedStreams[fp] = nil
 }
@@ -74,13 +76,13 @@ func (s *ownedStreamService) trackRemovedStream(fp model.Fingerprint) {
 		delete(s.notOwnedStreams, fp)
 		return
 	}
-	s.ownedStreamCount--
+	s.ownedStreamCount.Dec()
 }
 
 func (s *ownedStreamService) resetStreamCounts() {
 	s.lock.Lock()
 	defer s.lock.Unlock()
-	s.ownedStreamCount = 0
+	s.ownedStreamCount.Store(0)
 	notOwnedStreamsMetric.Sub(float64(len(s.notOwnedStreams)))
 	s.notOwnedStreams = make(map[model.Fingerprint]any)
 }
diff --git a/pkg/ingester/recalculate_owned_streams_test.go b/pkg/ingester/recalculate_owned_streams_test.go
index 3e531dcdef66f..f3bea57f69bae 100644
--- a/pkg/ingester/recalculate_owned_streams_test.go
+++ b/pkg/ingester/recalculate_owned_streams_test.go
@@ -37,7 +37,7 @@ func Test_recalculateOwnedStreams_newRecalculateOwnedStreamsIngester(t *testing.
 func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T) {
 	tests := map[string]struct {
 		featureEnabled              bool
-		expectedOwnedStreamCount    int
+		expectedOwnedStreamCount    int64
 		expectedNotOwnedStreamCount int
 	}{
 		"expected streams ownership to be recalculated": {
@@ -101,7 +101,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T)
 			mockRing.addMapping(createStream(t, tenant, 100), true)
 			mockRing.addMapping(createStream(t, tenant, 250), true)
 
-			require.Equal(t, 7, tenant.ownedStreamsSvc.ownedStreamCount)
+			require.Equal(t, int64(7), tenant.ownedStreamsSvc.ownedStreamCount.Load())
 			require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, 0)
 
 			mockTenantsSupplier := &mockTenantsSuplier{tenants: []*instance{tenant}}
@@ -116,7 +116,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T)
 			if testData.featureEnabled {
 				require.Equal(t, 50, tenant.ownedStreamsSvc.getFixedLimit(), "fixed limit must be updated after recalculation")
 			}
-			require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount)
+			require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount.Load())
 			require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, testData.expectedNotOwnedStreamCount)
 		})
 	}