Skip to content

Commit 83a8893

Browse files
authored
feat(kafka): Remove rate limits for kafka ingestion (#14460)
1 parent 8d6da6d commit 83a8893

11 files changed

+87
-55
lines changed

pkg/ingester/checkpoint_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ func Test_SeriesIterator(t *testing.T) {
459459
limits, err := validation.NewOverrides(l, nil)
460460
require.NoError(t, err)
461461

462-
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))
462+
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
463463

464464
for i := 0; i < 3; i++ {
465465
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
@@ -506,7 +506,8 @@ func Benchmark_SeriesIterator(b *testing.B) {
506506

507507
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
508508
require.NoError(b, err)
509-
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))
509+
510+
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
510511

511512
for i := range instances {
512513
inst, _ := newInstance(defaultConfig(), defaultPeriodConfigs, fmt.Sprintf("instance %d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil, nil, nil, NewStreamRateCalculator(), nil, nil)

pkg/ingester/ingester.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -404,18 +404,21 @@ func New(cfg Config, clientConfig client.Config, store Store, limits Limits, con
404404
i.SetExtractorWrapper(i.cfg.SampleExtractorWrapper)
405405
}
406406

407-
var limiterStrategy limiterRingStrategy
407+
var streamCountLimiter limiterRingStrategy
408408
var ownedStreamsStrategy ownershipStrategy
409+
var streamRateLimiter RateLimiterStrategy
409410
if i.cfg.KafkaIngestion.Enabled {
410-
limiterStrategy = newPartitionRingLimiterStrategy(partitionRingWatcher, limits.IngestionPartitionsTenantShardSize)
411+
streamCountLimiter = newPartitionRingLimiterStrategy(partitionRingWatcher, limits.IngestionPartitionsTenantShardSize)
411412
ownedStreamsStrategy = newOwnedStreamsPartitionStrategy(i.ingestPartitionID, partitionRingWatcher, limits.IngestionPartitionsTenantShardSize, util_log.Logger)
413+
streamRateLimiter = &NoLimitsStrategy{} // Kafka ingestion does not have per-stream rate limits, because we control the consumption speed.
412414
} else {
413-
limiterStrategy = newIngesterRingLimiterStrategy(i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)
415+
streamCountLimiter = newIngesterRingLimiterStrategy(i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)
414416
ownedStreamsStrategy = newOwnedStreamsIngesterStrategy(i.lifecycler.ID, i.readRing, util_log.Logger)
417+
streamRateLimiter = &TenantBasedStrategy{limits: limits}
415418
}
416419
// Now that the lifecycler has been created, we can create the limiter
417420
// which depends on it.
418-
i.limiter = NewLimiter(limits, metrics, limiterStrategy)
421+
i.limiter = NewLimiter(limits, metrics, streamCountLimiter, streamRateLimiter)
419422
i.recalculateOwnedStreams = newRecalculateOwnedStreamsSvc(i.getInstances, ownedStreamsStrategy, cfg.OwnedStreamsCheckInterval, util_log.Logger)
420423

421424
return i, nil

pkg/ingester/instance.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ func (i *instance) createStream(ctx context.Context, pushReqStream logproto.Stre
307307
return nil, fmt.Errorf("failed to create stream: %w", err)
308308
}
309309

310-
s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs)
310+
s := newStream(chunkfmt, headfmt, i.cfg, i.limiter.rateLimitStrategy, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs)
311311

312312
// record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them).
313313
if record != nil {
@@ -372,7 +372,7 @@ func (i *instance) createStreamByFP(ls labels.Labels, fp model.Fingerprint) (*st
372372
return nil, fmt.Errorf("failed to create stream for fingerprint: %w", err)
373373
}
374374

375-
s := newStream(chunkfmt, headfmt, i.cfg, i.limiter, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs)
375+
s := newStream(chunkfmt, headfmt, i.cfg, i.limiter.rateLimitStrategy, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.streamRateCalculator, i.metrics, i.writeFailures, i.configs)
376376

377377
i.onStreamCreated(s)
378378

pkg/ingester/instance_test.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ var NilMetrics = newIngesterMetrics(nil, constants.Loki)
7878
func TestLabelsCollisions(t *testing.T) {
7979
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
8080
require.NoError(t, err)
81-
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))
81+
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
8282

8383
i, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
8484
require.Nil(t, err)
@@ -106,7 +106,7 @@ func TestLabelsCollisions(t *testing.T) {
106106
func TestConcurrentPushes(t *testing.T) {
107107
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
108108
require.NoError(t, err)
109-
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))
109+
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
110110

111111
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
112112
require.Nil(t, err)
@@ -158,7 +158,7 @@ func TestConcurrentPushes(t *testing.T) {
158158
func TestGetStreamRates(t *testing.T) {
159159
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
160160
require.NoError(t, err)
161-
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))
161+
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
162162

163163
inst, err := newInstance(defaultConfig(), defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
164164
require.NoError(t, err)
@@ -245,7 +245,7 @@ func labelHashNoShard(l labels.Labels) uint64 {
245245
func TestSyncPeriod(t *testing.T) {
246246
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
247247
require.NoError(t, err)
248-
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))
248+
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
249249

250250
const (
251251
syncPeriod = 1 * time.Minute
@@ -290,7 +290,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
290290
t.Helper()
291291
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
292292
require.NoError(t, err)
293-
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))
293+
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
294294
indexShards := 2
295295

296296
// just some random values
@@ -315,7 +315,7 @@ func setupTestStreams(t *testing.T) (*instance, time.Time, int) {
315315
require.NoError(t, err)
316316
chunkfmt, headfmt, err := instance.chunkFormatAt(minTs(&testStream))
317317
require.NoError(t, err)
318-
chunk := newStream(chunkfmt, headfmt, cfg, limiter, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil, nil).NewChunk()
318+
chunk := newStream(chunkfmt, headfmt, cfg, limiter.rateLimitStrategy, "fake", 0, nil, true, NewStreamRateCalculator(), NilMetrics, nil, nil).NewChunk()
319319
for _, entry := range testStream.Entries {
320320
dup, err := chunk.Append(&entry)
321321
require.False(t, dup)
@@ -507,7 +507,7 @@ func makeRandomLabels() labels.Labels {
507507
func Benchmark_PushInstance(b *testing.B) {
508508
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
509509
require.NoError(b, err)
510-
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))
510+
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
511511

512512
i, _ := newInstance(&Config{IndexShards: 1}, defaultPeriodConfigs, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil, nil, nil, NewStreamRateCalculator(), nil, nil)
513513
ctx := context.Background()
@@ -549,7 +549,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
549549
l.MaxLocalStreamsPerUser = 100000
550550
limits, err := validation.NewOverrides(l, nil)
551551
require.NoError(b, err)
552-
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))
552+
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
553553

554554
ctx := context.Background()
555555

@@ -575,7 +575,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
575575

576576
b.Run("addTailersToNewStream", func(b *testing.B) {
577577
for n := 0; n < b.N; n++ {
578-
inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil, nil))
578+
inst.addTailersToNewStream(newStream(chunkfmt, headfmt, nil, limiter.rateLimitStrategy, "fake", 0, lbs, true, NewStreamRateCalculator(), NilMetrics, nil, nil))
579579
}
580580
})
581581
}
@@ -1089,7 +1089,7 @@ func TestStreamShardingUsage(t *testing.T) {
10891089
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), limitsDefinition)
10901090
require.NoError(t, err)
10911091

1092-
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1))
1092+
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: limits})
10931093

10941094
defaultShardStreamsCfg := limiter.limits.ShardStreams("fake")
10951095
tenantShardStreamsCfg := limiter.limits.ShardStreams(customTenant1)
@@ -1454,7 +1454,7 @@ func defaultInstance(t *testing.T) *instance {
14541454
&ingesterConfig,
14551455
defaultPeriodConfigs,
14561456
"fake",
1457-
NewLimiter(overrides, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1)),
1457+
NewLimiter(overrides, NilMetrics, newIngesterRingLimiterStrategy(&ringCountMock{count: 1}, 1), &TenantBasedStrategy{limits: overrides}),
14581458
loki_runtime.DefaultTenantConfigs(),
14591459
noopWAL{},
14601460
NilMetrics,

pkg/ingester/limiter.go

+32-8
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,10 @@ type Limits interface {
3838
// Limiter implements primitives to get the maximum number of streams
3939
// an ingester can handle for a specific tenant
4040
type Limiter struct {
41-
limits Limits
42-
ringStrategy limiterRingStrategy
43-
metrics *ingesterMetrics
41+
limits Limits
42+
ringStrategy limiterRingStrategy
43+
metrics *ingesterMetrics
44+
rateLimitStrategy RateLimiterStrategy
4445

4546
mtx sync.RWMutex
4647
disabled bool
@@ -51,25 +52,28 @@ func (l *Limiter) DisableForWALReplay() {
5152
defer l.mtx.Unlock()
5253
l.disabled = true
5354
l.metrics.limiterEnabled.Set(0)
55+
l.rateLimitStrategy.SetDisabled(true)
5456
}
5557

5658
func (l *Limiter) Enable() {
5759
l.mtx.Lock()
5860
defer l.mtx.Unlock()
5961
l.disabled = false
6062
l.metrics.limiterEnabled.Set(1)
63+
l.rateLimitStrategy.SetDisabled(false)
6164
}
6265

6366
type limiterRingStrategy interface {
6467
convertGlobalToLocalLimit(int, string) int
6568
}
6669

6770
// NewLimiter makes a new limiter
68-
func NewLimiter(limits Limits, metrics *ingesterMetrics, ingesterRingLimiterStrategy limiterRingStrategy) *Limiter {
71+
func NewLimiter(limits Limits, metrics *ingesterMetrics, ingesterRingLimiterStrategy limiterRingStrategy, rateLimitStrategy RateLimiterStrategy) *Limiter {
6972
return &Limiter{
70-
limits: limits,
71-
ringStrategy: ingesterRingLimiterStrategy,
72-
metrics: metrics,
73+
limits: limits,
74+
ringStrategy: ingesterRingLimiterStrategy,
75+
metrics: metrics,
76+
rateLimitStrategy: rateLimitStrategy,
7377
}
7478
}
7579

@@ -231,16 +235,36 @@ func (l *streamCountLimiter) getSuppliers(tenant string) (streamCountSupplier, f
231235

232236
type RateLimiterStrategy interface {
233237
RateLimit(tenant string) validation.RateLimit
238+
SetDisabled(bool)
234239
}
235240

236-
func (l *Limiter) RateLimit(tenant string) validation.RateLimit {
241+
type TenantBasedStrategy struct {
242+
disabled bool
243+
limits Limits
244+
}
245+
246+
func (l *TenantBasedStrategy) RateLimit(tenant string) validation.RateLimit {
237247
if l.disabled {
238248
return validation.Unlimited
239249
}
240250

241251
return l.limits.PerStreamRateLimit(tenant)
242252
}
243253

254+
func (l *TenantBasedStrategy) SetDisabled(disabled bool) {
255+
l.disabled = disabled
256+
}
257+
258+
type NoLimitsStrategy struct{}
259+
260+
func (l *NoLimitsStrategy) RateLimit(_ string) validation.RateLimit {
261+
return validation.Unlimited
262+
}
263+
264+
func (l *NoLimitsStrategy) SetDisabled(_ bool) {
265+
// no-op
266+
}
267+
244268
type StreamRateLimiter struct {
245269
recheckPeriod time.Duration
246270
recheckAt time.Time

pkg/ingester/limiter_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) {
133133
ownedStreamCount: testData.ownedStreamCount,
134134
}
135135
strategy := &fixedStrategy{localLimit: testData.calculatedLocalLimit}
136-
limiter := NewLimiter(limits, NilMetrics, strategy)
136+
limiter := NewLimiter(limits, NilMetrics, strategy, &TenantBasedStrategy{limits: limits})
137137
defaultCountSupplier := func() int {
138138
return testData.streams
139139
}
@@ -182,7 +182,7 @@ func TestLimiter_minNonZero(t *testing.T) {
182182

183183
for testName, testData := range tests {
184184
t.Run(testName, func(t *testing.T) {
185-
limiter := NewLimiter(nil, NilMetrics, nil)
185+
limiter := NewLimiter(nil, NilMetrics, nil, nil)
186186
assert.Equal(t, testData.expected, limiter.minNonZero(testData.first, testData.second))
187187
})
188188
}

pkg/ingester/owned_streams_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func Test_OwnedStreamService(t *testing.T) {
1717
require.NoError(t, err)
1818
// Mock the ring
1919
ring := &ringCountMock{count: 30}
20-
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(ring, 3))
20+
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(ring, 3), &TenantBasedStrategy{limits: limits})
2121

2222
service := newOwnedStreamService("test", limiter)
2323
require.Equal(t, 0, service.getOwnedStreamCount())

pkg/ingester/recalculate_owned_streams_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func Test_recalculateOwnedStreams_recalculateWithIngesterStrategy(t *testing.T)
7070
UseOwnedStreamCount: testData.featureEnabled,
7171
}, nil)
7272
require.NoError(t, err)
73-
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(mockRing, 1))
73+
limiter := NewLimiter(limits, NilMetrics, newIngesterRingLimiterStrategy(mockRing, 1), &TenantBasedStrategy{limits: limits})
7474

7575
tenant, err := newInstance(
7676
defaultConfig(),

0 commit comments

Comments
 (0)