diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 9d07d3cf89f86..1597222310b87 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -583,7 +583,17 @@ pattern_ingester: # first flush check is delayed by a random time up to 0.8x the flush check # period. Additionally, there is +/- 1% jitter added to the interval. # CLI flag: -pattern-ingester.flush-check-period - [flush_check_period: | default = 30s] + [flush_check_period: | default = 1m] + + # The maximum number of detected pattern clusters that can be created by + # streams. + # CLI flag: -pattern-ingester.max-clusters + [max_clusters: | default = 300] + + # The maximum eviction ratio of patterns per stream. Once that ratio is + # reached, the stream will throttled pattern detection. + # CLI flag: -pattern-ingester.max-eviction-ratio + [max_eviction_ratio: | default = 0.25] # Configures the metric aggregation and storage behavior of the pattern # ingester. diff --git a/pkg/pattern/chunk/util.go b/pkg/pattern/chunk/util.go index 8cbde3fb0474b..99aab9dc19740 100644 --- a/pkg/pattern/chunk/util.go +++ b/pkg/pattern/chunk/util.go @@ -8,7 +8,7 @@ import ( const ( TimeResolution = model.Time(int64(time.Second*10) / 1e6) - MaxChunkTime = 1 * time.Hour + MaxChunkTime = 15 * time.Minute ) func TruncateTimestamp(ts, step model.Time) model.Time { return ts - ts%step } diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index 8311036c3df86..64ef81e0e8a84 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -36,13 +36,14 @@ import ( ) type Config struct { - maxNodeDepth int - LogClusterDepth int - SimTh float64 - MaxChildren int - ExtraDelimiters []string - MaxClusters int - ParamString string + maxNodeDepth int + LogClusterDepth int + SimTh float64 + MaxChildren int + ExtraDelimiters []string + MaxClusters int + ParamString string + MaxEvictionRatio float64 } func createLogClusterCache(maxSize int, onEvict func(int, *LogCluster)) *LogClusterCache { @@ -60,29 +61,13 @@ type LogClusterCache struct { } func (c *LogClusterCache) Values() []*LogCluster { - values := make([]*LogCluster, 0) - for _, key := range c.cache.Keys() { - if value, ok := c.cache.Peek(key); ok { - values = append(values, value) - } - } - return values + return c.cache.Values() } func (c *LogClusterCache) Set(key int, cluster *LogCluster) { c.cache.Add(key, cluster) } -func (c *LogClusterCache) Iterate(fn func(*LogCluster) bool) { - for _, key := range c.cache.Keys() { - if value, ok := c.cache.Peek(key); ok { - if !fn(value) { - return - } - } - } -} - func (c *LogClusterCache) Get(key int) *LogCluster { cluster, ok := c.cache.Get(key) if !ok { @@ -140,10 +125,11 @@ func DefaultConfig() *Config { // Both SimTh and MaxClusterDepth impact branching factor: the greater // MaxClusterDepth and SimTh, the less the chance that there will be // "similar" clusters, but the greater the footprint. - SimTh: 0.3, - MaxChildren: 15, - ParamString: `<_>`, - MaxClusters: 300, + SimTh: 0.3, + MaxChildren: 15, + ParamString: `<_>`, + MaxClusters: 300, + MaxEvictionRatio: 0.25, } } @@ -152,10 +138,17 @@ func New(config *Config, format string, metrics *Metrics) *Drain { panic("depth argument must be at least 3") } config.maxNodeDepth = config.LogClusterDepth - 2 - var evictFn func(int, *LogCluster) - if metrics != nil { - evictFn = func(int, *LogCluster) { metrics.PatternsEvictedTotal.Inc() } + + d := &Drain{ + config: config, + rootNode: createNode(), + metrics: metrics, + maxAllowedLineLength: 3000, + format: format, } + + limiter := newLimiter(config.MaxEvictionRatio) + var tokenizer LineTokenizer switch format { case FormatJSON: @@ -165,16 +158,20 @@ func New(config *Config, format string, metrics *Metrics) *Drain { default: tokenizer = newPunctuationTokenizer() } - - d := &Drain{ - config: config, - rootNode: createNode(), - idToCluster: createLogClusterCache(config.MaxClusters, evictFn), - metrics: metrics, - tokenizer: tokenizer, - maxAllowedLineLength: 3000, - format: format, - } + d.idToCluster = createLogClusterCache(config.MaxClusters, func(int, *LogCluster) { + if metrics != nil { + if d.pruning { + metrics.PatternsPrunedTotal.Inc() + } else { + metrics.PatternsEvictedTotal.Inc() + } + } + if !d.pruning { + limiter.Evict() + } + }) + d.tokenizer = tokenizer + d.limiter = limiter return d } @@ -189,6 +186,8 @@ type Drain struct { format string tokens []string state interface{} + limiter *limiter + pruning bool } func (d *Drain) Clusters() []*LogCluster { @@ -200,6 +199,9 @@ func (d *Drain) TrainTokens(tokens []string, stringer func([]string) string, ts } func (d *Drain) Train(content string, ts int64) *LogCluster { + if !d.limiter.Allow() { + return nil + } if len(content) > d.maxAllowedLineLength { return nil } @@ -325,7 +327,9 @@ func (d *Drain) pruneTree(node *Node) int { } func (d *Drain) Delete(cluster *LogCluster) { + d.pruning = true d.idToCluster.cache.Remove(cluster.id) + d.pruning = false } func (d *Drain) treeSearch(rootNode *Node, tokens []string, simTh float64, includeParams bool) *LogCluster { diff --git a/pkg/pattern/drain/limiter.go b/pkg/pattern/drain/limiter.go new file mode 100644 index 0000000000000..4905ebf905c5b --- /dev/null +++ b/pkg/pattern/drain/limiter.go @@ -0,0 +1,51 @@ +package drain + +import ( + "time" +) + +type limiter struct { + added int64 + evicted int64 + maxPercentage float64 + blockedUntil time.Time +} + +func newLimiter(maxPercentage float64) *limiter { + return &limiter{ + maxPercentage: maxPercentage, + } +} + +func (l *limiter) Allow() bool { + if !l.blockedUntil.IsZero() { + if time.Now().Before(l.blockedUntil) { + return false + } + l.reset() + } + if l.added == 0 { + l.added++ + return true + } + if float64(l.evicted)/float64(l.added) > l.maxPercentage { + l.block() + return false + } + l.added++ + return true +} + +func (l *limiter) Evict() { + l.evicted++ +} + +func (l *limiter) reset() { + l.added = 0 + l.evicted = 0 + l.blockedUntil = time.Time{} +} + +func (l *limiter) block() { + l.blockedUntil = time.Now().Add(10 * time.Minute) +} diff --git a/pkg/pattern/drain/limiter_test.go b/pkg/pattern/drain/limiter_test.go new file mode 100644 index 0000000000000..ae8bfdc235c9b --- /dev/null +++ b/pkg/pattern/drain/limiter_test.go @@ -0,0 +1,70 @@ +package drain + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestNewLimiter(t *testing.T) { + maxPercentage := 0.5 + l := newLimiter(maxPercentage) + require.NotNil(t, l, "expected non-nil limiter") + require.Equal(t, maxPercentage, l.maxPercentage, "expected maxPercentage to match") + require.Equal(t, int64(0), l.added, "expected added to be 0") + require.Equal(t, int64(0), l.evicted, "expected evicted to be 0") + require.True(t, l.blockedUntil.IsZero(), "expected blockedUntil to be zero") +} + +func TestLimiterAllow(t *testing.T) { + maxPercentage := 0.5 + l := newLimiter(maxPercentage) + + // Test allowing when no evictions + require.True(t, l.Allow(), "expected Allow to return true initially") + + // Test allowing until evictions exceed maxPercentage + for i := 0; i < 2; i++ { + require.True(t, l.Allow(), "expected Allow to return true %d", i) + l.Evict() + } + + // Evict to exceed maxPercentage + l.Evict() + require.False(t, l.Allow(), "expected Allow to return false after evictions exceed maxPercentage") + + // Test blocking time + require.False(t, l.blockedUntil.IsZero(), "expected blockedUntil to be set") + + // Fast forward time to simulate block duration passing + l.blockedUntil = time.Now().Add(-1 * time.Minute) + require.True(t, l.Allow(), "expected Allow to return true after block duration") +} + +func TestLimiterEvict(t *testing.T) { + l := newLimiter(0.5) + l.Evict() + require.Equal(t, int64(1), l.evicted, "expected evicted to be 1") + l.Evict() + require.Equal(t, int64(2), l.evicted, "expected evicted to be 2") +} + +func TestLimiterReset(t *testing.T) { + l := newLimiter(0.5) + l.added = 10 + l.evicted = 5 + l.blockedUntil = time.Now().Add(10 * time.Minute) + l.reset() + require.Equal(t, int64(0), l.added, "expected added to be 0") + require.Equal(t, int64(0), l.evicted, "expected evicted to be 0") + require.True(t, l.blockedUntil.IsZero(), "expected blockedUntil to be zero") +} + +func TestLimiterBlock(t *testing.T) { + l := newLimiter(0.5) + l.block() + require.False(t, l.blockedUntil.IsZero(), "expected blockedUntil to be set") + require.False(t, l.Allow()) + require.True(t, l.blockedUntil.After(time.Now()), "expected blockedUntil to be in the future") +} diff --git a/pkg/pattern/drain/metrics.go b/pkg/pattern/drain/metrics.go index 3441169013546..0b9e5bff2a43e 100644 --- a/pkg/pattern/drain/metrics.go +++ b/pkg/pattern/drain/metrics.go @@ -29,6 +29,7 @@ func DetectLogFormat(line string) string { type Metrics struct { PatternsEvictedTotal prometheus.Counter + PatternsPrunedTotal prometheus.Counter PatternsDetectedTotal prometheus.Counter TokensPerLine prometheus.Observer StatePerLine prometheus.Observer diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index ddff98115d22f..15972fec24ba5 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/pattern/clientpool" + "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/metric" "github.com/grafana/loki/v3/pkg/util" util_log "github.com/grafana/loki/v3/pkg/util/log" @@ -39,6 +40,8 @@ type Config struct { ClientConfig clientpool.Config `yaml:"client_config,omitempty" doc:"description=Configures how the pattern ingester will connect to the ingesters."` ConcurrentFlushes int `yaml:"concurrent_flushes"` FlushCheckPeriod time.Duration `yaml:"flush_check_period"` + MaxClusters int `yaml:"max_clusters,omitempty" doc:"description=The maximum number of detected pattern clusters that can be created by streams."` + MaxEvictionRatio float64 `yaml:"max_eviction_ratio,omitempty" doc:"description=The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will throttled pattern detection."` MetricAggregation metric.AggregationConfig `yaml:"metric_aggregation,omitempty" doc:"description=Configures the metric aggregation and storage behavior of the pattern ingester."` // For testing. @@ -53,7 +56,9 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { fs.BoolVar(&cfg.Enabled, "pattern-ingester.enabled", false, "Flag to enable or disable the usage of the pattern-ingester component.") fs.IntVar(&cfg.ConcurrentFlushes, "pattern-ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.") - fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.") + fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 1*time.Minute, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.") + fs.IntVar(&cfg.MaxClusters, "pattern-ingester.max-clusters", drain.DefaultConfig().MaxClusters, "The maximum number of detected pattern clusters that can be created by the pattern ingester.") + fs.Float64Var(&cfg.MaxEvictionRatio, "pattern-ingester.max-eviction-ratio", drain.DefaultConfig().MaxEvictionRatio, "The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will be throttled for pattern detection.") } func (cfg *Config) Validate() error { @@ -85,6 +90,7 @@ type Ingester struct { metrics *ingesterMetrics chunkMetrics *metric.ChunkMetrics + drainCfg *drain.Config } func New( @@ -97,6 +103,10 @@ func New( chunkMetrics := metric.NewChunkMetrics(registerer, metricsNamespace) registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer) + drainCfg := drain.DefaultConfig() + drainCfg.MaxClusters = cfg.MaxClusters + drainCfg.MaxEvictionRatio = cfg.MaxEvictionRatio + i := &Ingester{ cfg: cfg, logger: log.With(logger, "component", "pattern-ingester"), @@ -106,6 +116,7 @@ func New( instances: make(map[string]*instance), flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), loopQuit: make(chan struct{}), + drainCfg: drainCfg, } i.Service = services.NewBasicService(i.starting, i.running, i.stopping) var err error @@ -357,6 +368,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / i.logger, i.metrics, i.chunkMetrics, + i.drainCfg, i.cfg.MetricAggregation, ) if err != nil { diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index 1e683f51ece07..d35d893fdc8ad 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/iter" "github.com/grafana/loki/v3/pkg/pattern/metric" @@ -28,6 +29,7 @@ func setup(t *testing.T) *instance { log.NewNopLogger(), newIngesterMetrics(nil, "test"), metric.NewChunkMetrics(nil, "test"), + drain.DefaultConfig(), metric.AggregationConfig{ Enabled: true, }, diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index a4a65f0cb7c5e..3495928436e75 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -39,9 +39,10 @@ type instance struct { metrics *ingesterMetrics chunkMetrics *metric.ChunkMetrics aggregationCfg metric.AggregationConfig + drainCfg *drain.Config } -func newInstance(instanceID string, logger log.Logger, metrics *ingesterMetrics, chunkMetrics *metric.ChunkMetrics, aggCfg metric.AggregationConfig) (*instance, error) { +func newInstance(instanceID string, logger log.Logger, metrics *ingesterMetrics, chunkMetrics *metric.ChunkMetrics, drainCfg *drain.Config, aggCfg metric.AggregationConfig) (*instance, error) { index, err := index.NewBitPrefixWithShards(indexShards) if err != nil { return nil, err @@ -55,6 +56,7 @@ func newInstance(instanceID string, logger log.Logger, metrics *ingesterMetrics, metrics: metrics, chunkMetrics: chunkMetrics, aggregationCfg: aggCfg, + drainCfg: drainCfg, } i.mapper = ingester.NewFPMapper(i.getLabelsFromFingerprint) return i, nil @@ -213,7 +215,7 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream fp := i.getHashForLabels(labels) sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp) firstEntryLine := pushReqStream.Entries[0].Line - s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID) + s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID, i.drainCfg) if err != nil { return nil, fmt.Errorf("failed to create stream: %w", err) } diff --git a/pkg/pattern/instance_test.go b/pkg/pattern/instance_test.go index 2db0c2797fa44..22bfaa53330fc 100644 --- a/pkg/pattern/instance_test.go +++ b/pkg/pattern/instance_test.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/metric" "github.com/grafana/loki/pkg/push" @@ -24,6 +25,7 @@ func TestInstance_QuerySample(t *testing.T) { log.NewNopLogger(), newIngesterMetrics(nil, "test"), metric.NewChunkMetrics(nil, "test"), + drain.DefaultConfig(), metric.AggregationConfig{ Enabled: true, }, diff --git a/pkg/pattern/metrics.go b/pkg/pattern/metrics.go index 94ca7e6e97915..f6f8289c7d176 100644 --- a/pkg/pattern/metrics.go +++ b/pkg/pattern/metrics.go @@ -26,7 +26,7 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges Subsystem: "pattern_ingester", Name: "patterns_evicted_total", Help: "The total number of patterns evicted from the LRU cache.", - }, []string{"tenant", "format"}), + }, []string{"tenant", "format", "pruned"}), patternsDetectedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "pattern_ingester", diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index 578653580b183..f6f6f962e0f5f 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -54,14 +54,16 @@ func newStream( logger log.Logger, guessedFormat string, instanceID string, + drainCfg *drain.Config, ) (*stream, error) { stream := &stream{ fp: fp, labels: labels, labelsString: labels.String(), labelHash: labels.Hash(), - patterns: drain.New(drain.DefaultConfig(), guessedFormat, &drain.Metrics{ - PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat), + patterns: drain.New(drainCfg, guessedFormat, &drain.Metrics{ + PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "false"), + PatternsPrunedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat, "true"), PatternsDetectedTotal: metrics.patternsDetectedTotal.WithLabelValues(instanceID, guessedFormat), TokensPerLine: metrics.tokensPerLine.WithLabelValues(instanceID, guessedFormat), StatePerLine: metrics.statePerLine.WithLabelValues(instanceID, guessedFormat), diff --git a/pkg/pattern/stream_test.go b/pkg/pattern/stream_test.go index 65342f7217851..65d02e8f5fe76 100644 --- a/pkg/pattern/stream_test.go +++ b/pkg/pattern/stream_test.go @@ -31,6 +31,7 @@ func TestAddStream(t *testing.T) { log.NewNopLogger(), drain.FormatUnknown, "123", + drain.DefaultConfig(), ) require.NoError(t, err) @@ -70,6 +71,7 @@ func TestPruneStream(t *testing.T) { log.NewNopLogger(), drain.FormatUnknown, "123", + drain.DefaultConfig(), ) require.NoError(t, err) @@ -120,6 +122,7 @@ func TestSampleIterator(t *testing.T) { log.NewNopLogger(), drain.FormatUnknown, "123", + drain.DefaultConfig(), ) require.NoError(t, err) @@ -167,6 +170,7 @@ func TestSampleIterator(t *testing.T) { log.NewNopLogger(), drain.FormatUnknown, "123", + drain.DefaultConfig(), ) require.NoError(t, err) @@ -255,6 +259,7 @@ func TestSampleIterator(t *testing.T) { log.NewNopLogger(), drain.FormatUnknown, "123", + drain.DefaultConfig(), ) require.NoError(t, err)