Skip to content

Commit 18d143b

Browse files
feat: wire up ingest-limits service (#16660)
1 parent 0be4897 commit 18d143b

File tree

8 files changed

+704
-0
lines changed

8 files changed

+704
-0
lines changed

docs/sources/shared/configuration.md

+344
Large diffs are not rendered by default.

pkg/limits/frontend/client/client.go

+8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package client
55

66
import (
77
"flag"
8+
"fmt"
89
"io"
910
"time"
1011

@@ -62,6 +63,13 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
6263
cfg.PoolConfig.RegisterFlagsWithPrefix(prefix, f)
6364
}
6465

66+
func (cfg *Config) Validate() error {
67+
if err := cfg.GRPCClientConfig.Validate(); err != nil {
68+
return fmt.Errorf("invalid gRPC client config: %w", err)
69+
}
70+
return nil
71+
}
72+
6573
// PoolConfig contains the config for a pool of ingest-limits-frontend clients.
6674
type PoolConfig struct {
6775
ClientCleanupPeriod time.Duration `yaml:"client_cleanup_period"`

pkg/limits/frontend/frontend.go

+14
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,20 @@ func (f *Frontend) ExceedsLimits(ctx context.Context, r *logproto.ExceedsLimitsR
151151
return f.limits.ExceedsLimits(ctx, r)
152152
}
153153

154+
func (f *Frontend) CheckReady(ctx context.Context) error {
155+
if f.State() != services.Running && f.State() != services.Stopping {
156+
return fmt.Errorf("ingest limits frontend not ready: %v", f.State())
157+
}
158+
159+
err := f.lifecycler.CheckReady(ctx)
160+
if err != nil {
161+
level.Error(f.logger).Log("msg", "ingest limits frontend not ready", "err", err)
162+
return err
163+
}
164+
165+
return nil
166+
}
167+
154168
type exceedsLimitsRequest struct {
155169
TenantID string `json:"tenantID"`
156170
StreamHashes []uint64 `json:"streamHashes"`

pkg/limits/ingest_limits.go

+4
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
8585
f.IntVar(&cfg.NumPartitions, "ingest-limits.num-partitions", 64, "The number of partitions for the Kafka topic used to read and write stream metadata. It is fixed, not a maximum.")
8686
}
8787

88+
func (cfg *Config) Validate() error {
89+
return nil
90+
}
91+
8892
type metrics struct {
8993
tenantStreamEvictionsTotal *prometheus.CounterVec
9094

pkg/loki/config_wrapper.go

+57
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,38 @@ func applyConfigToRings(r, defaults *ConfigWrapper, rc lokiring.RingConfig, merg
262262
r.Pattern.LifecyclerConfig.ObservePeriod = rc.ObservePeriod
263263
}
264264

265+
// IngestLimits
266+
if mergeWithExisting || reflect.DeepEqual(r.IngestLimits.LifecyclerConfig.RingConfig, defaults.IngestLimits.LifecyclerConfig.RingConfig) {
267+
r.IngestLimits.LifecyclerConfig.RingConfig.KVStore = rc.KVStore
268+
r.IngestLimits.LifecyclerConfig.HeartbeatPeriod = rc.HeartbeatPeriod
269+
r.IngestLimits.LifecyclerConfig.RingConfig.HeartbeatTimeout = rc.HeartbeatTimeout
270+
r.IngestLimits.LifecyclerConfig.TokensFilePath = rc.TokensFilePath
271+
r.IngestLimits.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
272+
r.IngestLimits.LifecyclerConfig.ID = rc.InstanceID
273+
r.IngestLimits.LifecyclerConfig.InfNames = rc.InstanceInterfaceNames
274+
r.IngestLimits.LifecyclerConfig.Port = rc.InstancePort
275+
r.IngestLimits.LifecyclerConfig.Addr = rc.InstanceAddr
276+
r.IngestLimits.LifecyclerConfig.Zone = rc.InstanceZone
277+
r.IngestLimits.LifecyclerConfig.ListenPort = rc.ListenPort
278+
r.IngestLimits.LifecyclerConfig.ObservePeriod = rc.ObservePeriod
279+
}
280+
281+
// IngestLimitsFrontend
282+
if mergeWithExisting || reflect.DeepEqual(r.IngestLimitsFrontend.LifecyclerConfig.RingConfig, defaults.IngestLimitsFrontend.LifecyclerConfig.RingConfig) {
283+
r.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore = rc.KVStore
284+
r.IngestLimitsFrontend.LifecyclerConfig.HeartbeatPeriod = rc.HeartbeatPeriod
285+
r.IngestLimitsFrontend.LifecyclerConfig.RingConfig.HeartbeatTimeout = rc.HeartbeatTimeout
286+
r.IngestLimitsFrontend.LifecyclerConfig.TokensFilePath = rc.TokensFilePath
287+
r.IngestLimitsFrontend.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled = rc.ZoneAwarenessEnabled
288+
r.IngestLimitsFrontend.LifecyclerConfig.ID = rc.InstanceID
289+
r.IngestLimitsFrontend.LifecyclerConfig.InfNames = rc.InstanceInterfaceNames
290+
r.IngestLimitsFrontend.LifecyclerConfig.Port = rc.InstancePort
291+
r.IngestLimitsFrontend.LifecyclerConfig.Addr = rc.InstanceAddr
292+
r.IngestLimitsFrontend.LifecyclerConfig.Zone = rc.InstanceZone
293+
r.IngestLimitsFrontend.LifecyclerConfig.ListenPort = rc.ListenPort
294+
r.IngestLimitsFrontend.LifecyclerConfig.ObservePeriod = rc.ObservePeriod
295+
}
296+
265297
// Distributor
266298
if mergeWithExisting || reflect.DeepEqual(r.Distributor.DistributorRing, defaults.Distributor.DistributorRing) {
267299
r.Distributor.DistributorRing.HeartbeatTimeout = rc.HeartbeatTimeout
@@ -332,6 +364,20 @@ func applyTokensFilePath(cfg *ConfigWrapper) error {
332364
}
333365
cfg.Ingester.LifecyclerConfig.TokensFilePath = f
334366

367+
// IngestLimits
368+
f, err = tokensFile(cfg, "ingestlimits.tokens")
369+
if err != nil {
370+
return err
371+
}
372+
cfg.IngestLimits.LifecyclerConfig.TokensFilePath = f
373+
374+
// IngestLimitsFrontend
375+
f, err = tokensFile(cfg, "ingestlimitsfrontend.tokens")
376+
if err != nil {
377+
return err
378+
}
379+
cfg.IngestLimitsFrontend.LifecyclerConfig.TokensFilePath = f
380+
335381
// Compactor
336382
f, err = tokensFile(cfg, "compactor.tokens")
337383
if err != nil {
@@ -414,6 +460,15 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) {
414460
if reflect.DeepEqual(cfg.Ingester.LifecyclerConfig.InfNames, defaults.Ingester.LifecyclerConfig.InfNames) {
415461
cfg.Ingester.LifecyclerConfig.InfNames = append(cfg.Ingester.LifecyclerConfig.InfNames, loopbackIface)
416462
}
463+
464+
if reflect.DeepEqual(cfg.IngestLimits.LifecyclerConfig.InfNames, defaults.IngestLimits.LifecyclerConfig.InfNames) {
465+
cfg.IngestLimits.LifecyclerConfig.InfNames = append(cfg.IngestLimits.LifecyclerConfig.InfNames, loopbackIface)
466+
}
467+
468+
if reflect.DeepEqual(cfg.IngestLimitsFrontend.LifecyclerConfig.InfNames, defaults.IngestLimitsFrontend.LifecyclerConfig.InfNames) {
469+
cfg.IngestLimitsFrontend.LifecyclerConfig.InfNames = append(cfg.IngestLimitsFrontend.LifecyclerConfig.InfNames, loopbackIface)
470+
}
471+
417472
if reflect.DeepEqual(cfg.Pattern.LifecyclerConfig.InfNames, defaults.Pattern.LifecyclerConfig.InfNames) {
418473
cfg.Pattern.LifecyclerConfig.InfNames = append(cfg.Pattern.LifecyclerConfig.InfNames, loopbackIface)
419474
}
@@ -453,6 +508,8 @@ func appendLoopbackInterface(cfg, defaults *ConfigWrapper) {
453508
// (for example, use consul for the distributor), it seems harmless to take a guess at better defaults here.
454509
func applyMemberlistConfig(r *ConfigWrapper) {
455510
r.Ingester.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
511+
r.IngestLimits.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
512+
r.IngestLimitsFrontend.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
456513
r.Pattern.LifecyclerConfig.RingConfig.KVStore.Store = memberlistStr
457514
r.Distributor.DistributorRing.KVStore.Store = memberlistStr
458515
r.Ruler.Ring.KVStore.Store = memberlistStr

0 commit comments

Comments
 (0)