From af51c77c1bbf8f8854548729eda139646f386c73 Mon Sep 17 00:00:00 2001 From: Ben Clive <ben.clive@grafana.com> Date: Thu, 27 Jun 2024 13:25:06 +0100 Subject: [PATCH 1/6] perf: add metrics to determine token sizes --- pkg/pattern/drain/drain.go | 6 +++++- pkg/pattern/drain/line_tokenizer.go | 20 +++++++------------- pkg/pattern/drain/metrics.go | 2 ++ pkg/pattern/metrics.go | 16 ++++++++++++++++ pkg/pattern/stream.go | 2 ++ 5 files changed, 32 insertions(+), 14 deletions(-) diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index 784beabb2a876..7c6402d61a94a 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -94,7 +94,7 @@ func (c *LogClusterCache) Get(key int) *LogCluster { func createNode() *Node { return &Node{ keyToChildNode: make(map[string]*Node), - clusterIDs: make([]int, 0), + clusterIDs: make([]int, 0, 8), } } @@ -198,6 +198,10 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster if len(tokens) < 4 { return nil } + if d.metrics != nil { + d.metrics.TokensPerLine.Observe(float64(len(tokens))) + d.metrics.MetadataPerLine.Observe(float64(len(state.([]int)))) + } matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false) // Match no existing log cluster if matchCluster == nil { diff --git a/pkg/pattern/drain/line_tokenizer.go b/pkg/pattern/drain/line_tokenizer.go index 89bf34a5569b5..b4ade63f7fc03 100644 --- a/pkg/pattern/drain/line_tokenizer.go +++ b/pkg/pattern/drain/line_tokenizer.go @@ -38,12 +38,10 @@ func newPunctuationTokenizer() *punctuationTokenizer { } func (p *punctuationTokenizer) Tokenize(line string) ([]string, interface{}) { - tokens := make([]string, len(line)) // Maximum size is every character is punctuation - spacesAfter := make([]int, strings.Count(line, " ")) // Could be a bitmap, but it's not worth it for a few bytes. + tokens := make([]string, 0, 128) + spacesAfter := make([]int, 0, 128) start := 0 - nextTokenIdx := 0 - nextSpaceIdx := 0 for i, char := range line { if unicode.IsLetter(char) || unicode.IsNumber(char) || char < 128 && p.excludeDelimiters[char] != 0 { continue @@ -51,26 +49,22 @@ func (p *punctuationTokenizer) Tokenize(line string) ([]string, interface{}) { included := char < 128 && p.includeDelimiters[char] != 0 if char == ' ' || included || unicode.IsPunct(char) { if i > start { - tokens[nextTokenIdx] = line[start:i] - nextTokenIdx++ + tokens = append(tokens, line[start:i]) } if char == ' ' { - spacesAfter[nextSpaceIdx] = nextTokenIdx - 1 - nextSpaceIdx++ + spacesAfter = append(spacesAfter, len(tokens)-1) } else { - tokens[nextTokenIdx] = line[i : i+1] - nextTokenIdx++ + tokens = append(tokens, line[i:i+1]) } start = i + 1 } } if start < len(line) { - tokens[nextTokenIdx] = line[start:] - nextTokenIdx++ + tokens = append(tokens, line[start:]) } - return tokens[:nextTokenIdx], spacesAfter[:nextSpaceIdx] + return tokens, spacesAfter } func (p *punctuationTokenizer) Join(tokens []string, state interface{}) string { diff --git a/pkg/pattern/drain/metrics.go b/pkg/pattern/drain/metrics.go index b09ef12301271..6a19ecaf462ca 100644 --- a/pkg/pattern/drain/metrics.go +++ b/pkg/pattern/drain/metrics.go @@ -5,4 +5,6 @@ import "github.com/prometheus/client_golang/prometheus" type Metrics struct { PatternsEvictedTotal prometheus.Counter PatternsDetectedTotal prometheus.Counter + TokensPerLine prometheus.Histogram + MetadataPerLine prometheus.Histogram } diff --git a/pkg/pattern/metrics.go b/pkg/pattern/metrics.go index ceb5647d1fe54..4048dec5b9b6e 100644 --- a/pkg/pattern/metrics.go +++ b/pkg/pattern/metrics.go @@ -9,6 +9,8 @@ type ingesterMetrics struct { flushQueueLength prometheus.Gauge patternsDiscardedTotal prometheus.Counter patternsDetectedTotal prometheus.Counter + tokensPerLine prometheus.Histogram + metadataPerLine prometheus.Histogram } func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics { @@ -31,6 +33,20 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges Name: "patterns_detected_total", Help: "The total number of patterns detected from incoming log lines.", }), + tokensPerLine: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "tokens_per_line", + Help: "The number of tokens an incoming logline is split into for pattern recognision", + Buckets: []float64{5, 10, 20, 40, 80, 120, 160, 240, 320, 640, 1280}, + }), + metadataPerLine: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "metadata_per_line", + Help: "The number of items of additional metadata returned alongside tokens for pattern recognition", + Buckets: []float64{5, 10, 20, 40, 80, 100, 120, 140, 160, 240, 320, 640, 1280}, + }), } } diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index ec075fece7396..2add1943fbe9e 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -61,6 +61,8 @@ func newStream( patterns: drain.New(drain.DefaultConfig(), &drain.Metrics{ PatternsEvictedTotal: metrics.patternsDiscardedTotal, PatternsDetectedTotal: metrics.patternsDetectedTotal, + TokensPerLine: metrics.tokensPerLine, + MetadataPerLine: metrics.metadataPerLine, }), cfg: cfg, logger: logger, From b105953c4f714ba0bcda7ed11eb45d8370aef5f4 Mon Sep 17 00:00:00 2001 From: Ben Clive <ben.clive@grafana.com> Date: Thu, 27 Jun 2024 17:10:40 +0100 Subject: [PATCH 2/6] Temp: Add more metric dimensions --- pkg/pattern/drain/drain.go | 10 +++++---- pkg/pattern/drain/metrics.go | 32 ++++++++++++++++++++++++----- pkg/pattern/drain/testdata/okta.txt | 0 pkg/pattern/instance.go | 12 ++++++++++- pkg/pattern/metrics.go | 28 ++++++++++++------------- pkg/pattern/stream.go | 2 ++ 6 files changed, 60 insertions(+), 24 deletions(-) create mode 100644 pkg/pattern/drain/testdata/okta.txt diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index 7c6402d61a94a..da9b9dbd42a08 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -154,7 +154,9 @@ func New(config *Config, metrics *Metrics) *Drain { config.maxNodeDepth = config.LogClusterDepth - 2 var evictFn func(int, *LogCluster) if metrics != nil { - evictFn = func(int, *LogCluster) { metrics.PatternsEvictedTotal.Inc() } + evictFn = func(int, *LogCluster) { + metrics.PatternsEvictedTotal.WithLabelValues(metrics.DetectedLogFormat).Inc() + } } d := &Drain{ @@ -199,8 +201,8 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster return nil } if d.metrics != nil { - d.metrics.TokensPerLine.Observe(float64(len(tokens))) - d.metrics.MetadataPerLine.Observe(float64(len(state.([]int)))) + d.metrics.TokensPerLine.WithLabelValues(d.metrics.DetectedLogFormat).Observe(float64(len(tokens))) + d.metrics.MetadataPerLine.WithLabelValues(d.metrics.DetectedLogFormat).Observe(float64(len(state.([]int)))) } matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false) // Match no existing log cluster @@ -219,7 +221,7 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster d.idToCluster.Set(clusterID, matchCluster) d.addSeqToPrefixTree(d.rootNode, matchCluster) if d.metrics != nil { - d.metrics.PatternsDetectedTotal.Inc() + d.metrics.PatternsDetectedTotal.WithLabelValues(d.metrics.DetectedLogFormat).Inc() } } else { newTemplateTokens := d.createTemplate(tokens, matchCluster.Tokens) diff --git a/pkg/pattern/drain/metrics.go b/pkg/pattern/drain/metrics.go index 6a19ecaf462ca..a99c7217a6c21 100644 --- a/pkg/pattern/drain/metrics.go +++ b/pkg/pattern/drain/metrics.go @@ -1,10 +1,32 @@ package drain -import "github.com/prometheus/client_golang/prometheus" +import ( + "strings" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + FormatLogfmt = "logfmt" + FormatJson = "json" + FormatUnknown = "unknown" +) + +func DetectLogFormat(line string) string { + format := FormatUnknown + if line[0] == '{' && line[len(line)-1] == '}' { + return FormatJson + } + if strings.Count(line, "=") > strings.Count(line, " ")-5 { + format = FormatLogfmt + } + return format +} type Metrics struct { - PatternsEvictedTotal prometheus.Counter - PatternsDetectedTotal prometheus.Counter - TokensPerLine prometheus.Histogram - MetadataPerLine prometheus.Histogram + PatternsEvictedTotal *prometheus.CounterVec + PatternsDetectedTotal *prometheus.CounterVec + TokensPerLine *prometheus.HistogramVec + MetadataPerLine *prometheus.HistogramVec + DetectedLogFormat string } diff --git a/pkg/pattern/drain/testdata/okta.txt b/pkg/pattern/drain/testdata/okta.txt new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index cb5f8ae253147..63ca6f8ee8f3a 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "strings" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -17,6 +18,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/pattern/chunk" + "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/metric" "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/spanlogger" @@ -208,7 +210,15 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream } fp := i.getHashForLabels(labels) sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp) - s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger) + firstEntryLine := pushReqStream.Entries[0].Line + format := drain.FormatUnknown + if firstEntryLine[0] == '{' && firstEntryLine[len(firstEntryLine)-1] == '}' { + format = drain.FormatJson + } else if strings.Count(firstEntryLine, "=") > len(firstEntryLine)/20 { + format = drain.FormatLogfmt + } + s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger, format) + if err != nil { return nil, fmt.Errorf("failed to create stream: %w", err) } diff --git a/pkg/pattern/metrics.go b/pkg/pattern/metrics.go index 4048dec5b9b6e..39e57596c0acc 100644 --- a/pkg/pattern/metrics.go +++ b/pkg/pattern/metrics.go @@ -7,10 +7,10 @@ import ( type ingesterMetrics struct { flushQueueLength prometheus.Gauge - patternsDiscardedTotal prometheus.Counter - patternsDetectedTotal prometheus.Counter - tokensPerLine prometheus.Histogram - metadataPerLine prometheus.Histogram + patternsDiscardedTotal *prometheus.CounterVec + patternsDetectedTotal *prometheus.CounterVec + tokensPerLine *prometheus.HistogramVec + metadataPerLine *prometheus.HistogramVec } func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics { @@ -21,32 +21,32 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges Name: "flush_queue_length", Help: "The total number of series pending in the flush queue.", }), - patternsDiscardedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + patternsDiscardedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "pattern_ingester", Name: "patterns_evicted_total", - Help: "The total number of patterns evicted from the LRU cache.", - }), - patternsDetectedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Help: "The total number of patterns evicted from the LRU cache, by log format.", + }, []string{"format"}), + patternsDetectedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "pattern_ingester", Name: "patterns_detected_total", - Help: "The total number of patterns detected from incoming log lines.", - }), - tokensPerLine: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Help: "The total number of patterns detected from incoming log lines, by log format.", + }, []string{"format"}), + tokensPerLine: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ Namespace: metricsNamespace, Subsystem: "pattern_ingester", Name: "tokens_per_line", Help: "The number of tokens an incoming logline is split into for pattern recognision", Buckets: []float64{5, 10, 20, 40, 80, 120, 160, 240, 320, 640, 1280}, - }), - metadataPerLine: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + }, []string{"format"}), + metadataPerLine: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ Namespace: metricsNamespace, Subsystem: "pattern_ingester", Name: "metadata_per_line", Help: "The number of items of additional metadata returned alongside tokens for pattern recognition", Buckets: []float64{5, 10, 20, 40, 80, 100, 120, 140, 160, 240, 320, 640, 1280}, - }), + }, []string{"format"}), } } diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index 2add1943fbe9e..ebe321280712c 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -52,6 +52,7 @@ func newStream( chunkMetrics *metric.ChunkMetrics, cfg metric.AggregationConfig, logger log.Logger, + guessedFormat string, ) (*stream, error) { stream := &stream{ fp: fp, @@ -63,6 +64,7 @@ func newStream( PatternsDetectedTotal: metrics.patternsDetectedTotal, TokensPerLine: metrics.tokensPerLine, MetadataPerLine: metrics.metadataPerLine, + DetectedLogFormat: guessedFormat, }), cfg: cfg, logger: logger, From 01790dea2a65d9890daf2565fef0955c2c7358f9 Mon Sep 17 00:00:00 2001 From: Ben Clive <ben.clive@grafana.com> Date: Mon, 1 Jul 2024 16:45:25 +0100 Subject: [PATCH 3/6] Clean up & add tests --- pkg/pattern/drain/drain.go | 12 +++++------ pkg/pattern/drain/drain_benchmark_test.go | 2 +- pkg/pattern/drain/drain_test.go | 17 ++++++++++++++- pkg/pattern/drain/line_tokenizer.go | 20 +++++++++++------ pkg/pattern/drain/metrics.go | 23 ++++++++++---------- pkg/pattern/drain/testdata/okta.txt | 0 pkg/pattern/instance.go | 9 +------- pkg/pattern/metrics.go | 26 +++++++++++------------ pkg/pattern/stream.go | 10 ++++----- 9 files changed, 66 insertions(+), 53 deletions(-) delete mode 100644 pkg/pattern/drain/testdata/okta.txt diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index da9b9dbd42a08..e375646275c17 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -94,7 +94,7 @@ func (c *LogClusterCache) Get(key int) *LogCluster { func createNode() *Node { return &Node{ keyToChildNode: make(map[string]*Node), - clusterIDs: make([]int, 0, 8), + clusterIDs: make([]int, 0), } } @@ -154,9 +154,7 @@ func New(config *Config, metrics *Metrics) *Drain { config.maxNodeDepth = config.LogClusterDepth - 2 var evictFn func(int, *LogCluster) if metrics != nil { - evictFn = func(int, *LogCluster) { - metrics.PatternsEvictedTotal.WithLabelValues(metrics.DetectedLogFormat).Inc() - } + evictFn = func(int, *LogCluster) { metrics.PatternsEvictedTotal.Inc() } } d := &Drain{ @@ -201,8 +199,8 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster return nil } if d.metrics != nil { - d.metrics.TokensPerLine.WithLabelValues(d.metrics.DetectedLogFormat).Observe(float64(len(tokens))) - d.metrics.MetadataPerLine.WithLabelValues(d.metrics.DetectedLogFormat).Observe(float64(len(state.([]int)))) + d.metrics.TokensPerLine.Observe(float64(len(tokens))) + d.metrics.StatePerLine.Observe(float64(len(state.([]int)))) } matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false) // Match no existing log cluster @@ -221,7 +219,7 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster d.idToCluster.Set(clusterID, matchCluster) d.addSeqToPrefixTree(d.rootNode, matchCluster) if d.metrics != nil { - d.metrics.PatternsDetectedTotal.WithLabelValues(d.metrics.DetectedLogFormat).Inc() + d.metrics.PatternsDetectedTotal.Inc() } } else { newTemplateTokens := d.createTemplate(tokens, matchCluster.Tokens) diff --git a/pkg/pattern/drain/drain_benchmark_test.go b/pkg/pattern/drain/drain_benchmark_test.go index 35ec024af138e..5ef4c98b5b6f6 100644 --- a/pkg/pattern/drain/drain_benchmark_test.go +++ b/pkg/pattern/drain/drain_benchmark_test.go @@ -35,11 +35,11 @@ func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) { line := scanner.Text() lines = append(lines, line) } + drain := New(DefaultConfig(), nil) b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - drain := New(DefaultConfig(), nil) for _, line := range lines { drain.Train(line, 0) } diff --git a/pkg/pattern/drain/drain_test.go b/pkg/pattern/drain/drain_test.go index 34bcf8b4c12a5..0e052e204efa4 100644 --- a/pkg/pattern/drain/drain_test.go +++ b/pkg/pattern/drain/drain_test.go @@ -23,10 +23,12 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { drain *Drain inputFile string patterns []string + format string }{ { drain: New(DefaultConfig(), nil), inputFile: `testdata/agent-logfmt.txt`, + format: FormatLogfmt, patterns: []string{ `ts=2024-04-16T15:10:42.<_> level=info msg="finished node evaluation" controller_id=module.http.cloudwatch_pipelines node_id=prometheus.scrape.<_> duration=<_>.<_>`, `ts=2024-04-16T15:10:43.192290389Z caller=filetargetmanager.go:361 level=info component=logs logs_config=default msg="Adding target" key="/var/log/pods/*19a1cce8-5f04-46e0-a124-292b0dd9b343/testcoordinator/*.log:{batch_kubernetes_io_controller_uid=\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\", batch_kubernetes_io_job_name=\"testcoordinator-job-2665838\", container=\"testcoordinator\", controller_uid=\"25ec5edf-f78e-468b-b6f3-3b9685f0cc8f\", job=\"k6-cloud/testcoordinator\", job_name=\"testcoordinator-job-2665838\", name=\"testcoordinator\", namespace=\"k6-cloud\", pod=\"testcoordinator-job-2665838-9g8ds\"}"`, @@ -61,6 +63,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: `testdata/ingester-logfmt.txt`, + format: FormatLogfmt, patterns: []string{ `ts=2024-04-17T09:52:46.363974185Z caller=http.go:194 level=debug traceID=1b48f5156a61ca69 msg="GET /debug/pprof/delta_mutex (200) 1.161082ms"`, `ts=2024-04-17T09:52:46.<_> caller=head.go:216 level=debug tenant=987678 msg="profile is empty after delta computation" metricName=memory`, @@ -70,6 +73,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: `testdata/drone-json.txt`, + format: FormatJson, patterns: []string{ `{"duration":<_>,"level":"debug","method":"GET","msg":"request completed","referer":"","remote":"10.136.105.40:52702","request":"/metrics","status":200,"time":"<_>:<_>:<_>","user-agent":"GrafanaAgent/v0.40.3 (flow; linux; helm)"}`, `{"id":"<_>","level":"debug","max-pool":4,"min-pool":0,"msg":"check capacity","pending-builds":0,"running-builds":0,"server-buffer":0,"server-capacity":0,"server-count":0,"time":"<_>:<_>:<_>"}`, @@ -82,6 +86,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: "testdata/distributor-logfmt.txt", + format: FormatLogfmt, patterns: []string{ `ts=2024-05-02T12:17:22.851228301Z caller=http.go:194 level=debug traceID=1e1fe5ba1756bc38 orgID=1819 msg="POST /pyroscope/ingest?aggregationType=sum&from=1714652230&name=flamegraph.com%7Bapp_kubernetes_io_instance%3Dflamegraph-com%2Capp_kubernetes_io_name%3Dflamegraph-com%2Ccluster%3Dflamegraph.com%2Cinstance%3D10.0.11.146%3A8001%2Cjob%3Dkubernetes-pods%2Cnamespace%3Dflamegraph-com%2Cpod%3Dflamegraph-com-backend-79c858c7bf-jw2hn%2Cpod_template_hash%3D79c858c7bf%2Cpyroscope_tenant%3Dpyroscope%2Ctier%3Dbackend%7D&sampleRate=0&spyName=scrape&units=samples&until=1714652240 (200) 22.345191ms"`, `ts=2024-05-02T12:17:22.<_> caller=http.go:194 level=debug traceID=<_> orgID=75 msg="POST /ingest?aggregationType=&from=1714652227232613927&name=checkoutservice%7B__session_id__%3D294b9729f5a7de95%2Cnamespace%3Dotel-demo%7D&sampleRate=<_>&spyName=gospy&units=&until=1714652242232506798 (200) <_>.<_>"`, @@ -93,6 +98,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: "testdata/journald.txt", + format: FormatUnknown, patterns: []string{ ` ln --force -s /proc/$(pidof hgrun-pause)/root/bin/hgrun /bin/hgrun;`, ` while [ "$(pidof plugins-pause)" = "" ]; do sleep 0.5; done;`, @@ -199,6 +205,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: "testdata/kafka.txt", + format: FormatUnknown, patterns: []string{ `[2024-05-07 10:55:40,626] INFO [LocalLog partition=ingest-6, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=180391157, size=16991045, lastModifiedTime=1715075754780, largestRecordTimestamp=Some(1715075754774)),LogSegment(baseOffset=180393429, size=16997692, lastModifiedTime=1715075760206, largestRecordTimestamp=Some(1715075760186)),LogSegment(baseOffset=180395889, size=16998200, lastModifiedTime=1715075765542, largestRecordTimestamp=Some(1715075765526)),LogSegment(baseOffset=180398373, size=16977347, lastModifiedTime=1715075770515, largestRecordTimestamp=Some(1715075770504)) (kafka.log.LocalLog$)`, `[2024-05-07 10:55:53,038] INFO [LocalLog partition=mimir-dev-09-aggregations-offsets-1, dir=/bitnami/kafka/data] Deleting segment files LogSegment(baseOffset=447957, size=948, lastModifiedTime=1715059232052, largestRecordTimestamp=Some(1715059232002)),LogSegment(baseOffset=447969, size=948, lastModifiedTime=1715059424352, largestRecordTimestamp=Some(1715059424301)) (kafka.log.LocalLog$)`, @@ -219,6 +226,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: "testdata/kubernetes.txt", + format: FormatUnknown, patterns: []string{ `I0507 12:02:27.947830 1 nodeutilization.go:274] "Evicting pods based on priority, if they have same priority, they'll be evicted based on QoS tiers"`, `I0507 12:02:27.<_> 1 defaultevictor.go:163] "pod does not fit on any other node because of nodeSelector(s), Taint(s), or nodes marked as unschedulable" pod="<_>/<_>"`, @@ -268,6 +276,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: "testdata/vault.txt", + format: FormatUnknown, patterns: []string{ `2024-05-07T10:56:38.667Z [INFO] expiration: revoked lease: lease_id=auth/gcp/login/h4c031a99aa555040a0dd99864d828e946c6d4e31f4f5178757183def61f9d104`, `2024-05-07T10:<_>:<_>.<_> [INFO] expiration: revoked lease: lease_id=auth/kubernetes/<_>/login/<_>`, @@ -276,6 +285,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: "testdata/calico.txt", + format: FormatUnknown, patterns: []string{ `2024-05-08 15:23:56.403 [DEBUG][615489] felix/table.go 699: Finished loading iptables state ipVersion=0x4 table="filter"`, `2024-05-08 15:23:56.403 [INFO][615489] felix/summary.go 100: Summarising 1 dataplane reconciliation loops over 600ms: avg=119ms longest=119ms (resync-filter-v4)`, @@ -357,6 +367,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: "testdata/grafana-ruler.txt", + format: FormatLogfmt, patterns: []string{ `level=debug ts=2024-05-29T13:44:15.804597912Z caller=remote_instance_store.go:51 user=297794 slug=leanix msg="calling SaveAlertInstance"`, `level=debug ts=2024-05-29T13:44:15.<_> caller=remote_instance_store.go:51 user=396586 slug=opengov msg="calling SaveAlertInstance"`, @@ -411,10 +422,15 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { require.NoError(t, err) defer file.Close() + detectedFormat := false scanner := bufio.NewScanner(file) for scanner.Scan() { line := scanner.Text() tt.drain.Train(line, 0) + if !detectedFormat { + require.Equal(t, tt.format, DetectLogFormat(line)) + detectedFormat = true + } } var output []string @@ -564,7 +580,6 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) for _, line := range tt.inputLines { passes := matcher.Test([]byte(line)) require.Truef(t, passes, "Line should match extracted pattern: \nPatt[%q] \nLine[%q]", cluster.String(), line) - } }) } diff --git a/pkg/pattern/drain/line_tokenizer.go b/pkg/pattern/drain/line_tokenizer.go index b4ade63f7fc03..89bf34a5569b5 100644 --- a/pkg/pattern/drain/line_tokenizer.go +++ b/pkg/pattern/drain/line_tokenizer.go @@ -38,10 +38,12 @@ func newPunctuationTokenizer() *punctuationTokenizer { } func (p *punctuationTokenizer) Tokenize(line string) ([]string, interface{}) { - tokens := make([]string, 0, 128) - spacesAfter := make([]int, 0, 128) + tokens := make([]string, len(line)) // Maximum size is every character is punctuation + spacesAfter := make([]int, strings.Count(line, " ")) // Could be a bitmap, but it's not worth it for a few bytes. start := 0 + nextTokenIdx := 0 + nextSpaceIdx := 0 for i, char := range line { if unicode.IsLetter(char) || unicode.IsNumber(char) || char < 128 && p.excludeDelimiters[char] != 0 { continue @@ -49,22 +51,26 @@ func (p *punctuationTokenizer) Tokenize(line string) ([]string, interface{}) { included := char < 128 && p.includeDelimiters[char] != 0 if char == ' ' || included || unicode.IsPunct(char) { if i > start { - tokens = append(tokens, line[start:i]) + tokens[nextTokenIdx] = line[start:i] + nextTokenIdx++ } if char == ' ' { - spacesAfter = append(spacesAfter, len(tokens)-1) + spacesAfter[nextSpaceIdx] = nextTokenIdx - 1 + nextSpaceIdx++ } else { - tokens = append(tokens, line[i:i+1]) + tokens[nextTokenIdx] = line[i : i+1] + nextTokenIdx++ } start = i + 1 } } if start < len(line) { - tokens = append(tokens, line[start:]) + tokens[nextTokenIdx] = line[start:] + nextTokenIdx++ } - return tokens, spacesAfter + return tokens[:nextTokenIdx], spacesAfter[:nextSpaceIdx] } func (p *punctuationTokenizer) Join(tokens []string, state interface{}) string { diff --git a/pkg/pattern/drain/metrics.go b/pkg/pattern/drain/metrics.go index a99c7217a6c21..206d30288796b 100644 --- a/pkg/pattern/drain/metrics.go +++ b/pkg/pattern/drain/metrics.go @@ -12,21 +12,22 @@ const ( FormatUnknown = "unknown" ) +// DetectLogFormat guesses at how the logs are encoded based on some simple heuristics. +// It only runs on the first log line when a new stream is created, so it could do some more complex parsing or regex. func DetectLogFormat(line string) string { - format := FormatUnknown - if line[0] == '{' && line[len(line)-1] == '}' { + if len(line) < 2 { + return FormatUnknown + } else if line[0] == '{' && line[len(line)-1] == '}' { return FormatJson + } else if strings.Count(line, "=") > strings.Count(line, " ")-5 { + return FormatLogfmt } - if strings.Count(line, "=") > strings.Count(line, " ")-5 { - format = FormatLogfmt - } - return format + return FormatUnknown } type Metrics struct { - PatternsEvictedTotal *prometheus.CounterVec - PatternsDetectedTotal *prometheus.CounterVec - TokensPerLine *prometheus.HistogramVec - MetadataPerLine *prometheus.HistogramVec - DetectedLogFormat string + PatternsEvictedTotal prometheus.Counter + PatternsDetectedTotal prometheus.Counter + TokensPerLine prometheus.Observer + StatePerLine prometheus.Observer } diff --git a/pkg/pattern/drain/testdata/okta.txt b/pkg/pattern/drain/testdata/okta.txt deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index 63ca6f8ee8f3a..f19b0373858d9 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "net/http" - "strings" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -211,13 +210,7 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream fp := i.getHashForLabels(labels) sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp) firstEntryLine := pushReqStream.Entries[0].Line - format := drain.FormatUnknown - if firstEntryLine[0] == '{' && firstEntryLine[len(firstEntryLine)-1] == '}' { - format = drain.FormatJson - } else if strings.Count(firstEntryLine, "=") > len(firstEntryLine)/20 { - format = drain.FormatLogfmt - } - s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger, format) + s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID) if err != nil { return nil, fmt.Errorf("failed to create stream: %w", err) diff --git a/pkg/pattern/metrics.go b/pkg/pattern/metrics.go index 39e57596c0acc..94ca7e6e97915 100644 --- a/pkg/pattern/metrics.go +++ b/pkg/pattern/metrics.go @@ -10,7 +10,7 @@ type ingesterMetrics struct { patternsDiscardedTotal *prometheus.CounterVec patternsDetectedTotal *prometheus.CounterVec tokensPerLine *prometheus.HistogramVec - metadataPerLine *prometheus.HistogramVec + statePerLine *prometheus.HistogramVec } func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics { @@ -25,28 +25,28 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges Namespace: metricsNamespace, Subsystem: "pattern_ingester", Name: "patterns_evicted_total", - Help: "The total number of patterns evicted from the LRU cache, by log format.", - }, []string{"format"}), + Help: "The total number of patterns evicted from the LRU cache.", + }, []string{"tenant", "format"}), patternsDetectedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: "pattern_ingester", Name: "patterns_detected_total", - Help: "The total number of patterns detected from incoming log lines, by log format.", - }, []string{"format"}), + Help: "The total number of patterns detected from incoming log lines.", + }, []string{"tenant", "format"}), tokensPerLine: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ Namespace: metricsNamespace, Subsystem: "pattern_ingester", Name: "tokens_per_line", - Help: "The number of tokens an incoming logline is split into for pattern recognision", - Buckets: []float64{5, 10, 20, 40, 80, 120, 160, 240, 320, 640, 1280}, - }, []string{"format"}), - metadataPerLine: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Help: "The number of tokens an incoming logline is split into for pattern recognition.", + Buckets: []float64{20, 40, 80, 120, 160, 320, 640, 1280}, + }, []string{"tenant", "format"}), + statePerLine: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ Namespace: metricsNamespace, Subsystem: "pattern_ingester", - Name: "metadata_per_line", - Help: "The number of items of additional metadata returned alongside tokens for pattern recognition", - Buckets: []float64{5, 10, 20, 40, 80, 100, 120, 140, 160, 240, 320, 640, 1280}, - }, []string{"format"}), + Name: "state_per_line", + Help: "The number of items of additional state returned alongside tokens for pattern recognition.", + Buckets: []float64{20, 40, 80, 120, 160, 320, 640, 1280}, + }, []string{"tenant", "format"}), } } diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index ebe321280712c..08c121ee93c23 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -53,6 +53,7 @@ func newStream( cfg metric.AggregationConfig, logger log.Logger, guessedFormat string, + instanceID string, ) (*stream, error) { stream := &stream{ fp: fp, @@ -60,11 +61,10 @@ func newStream( labelsString: labels.String(), labelHash: labels.Hash(), patterns: drain.New(drain.DefaultConfig(), &drain.Metrics{ - PatternsEvictedTotal: metrics.patternsDiscardedTotal, - PatternsDetectedTotal: metrics.patternsDetectedTotal, - TokensPerLine: metrics.tokensPerLine, - MetadataPerLine: metrics.metadataPerLine, - DetectedLogFormat: guessedFormat, + PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat), + PatternsDetectedTotal: metrics.patternsDetectedTotal.WithLabelValues(instanceID, guessedFormat), + TokensPerLine: metrics.tokensPerLine.WithLabelValues(instanceID, guessedFormat), + StatePerLine: metrics.statePerLine.WithLabelValues(instanceID, guessedFormat), }), cfg: cfg, logger: logger, From 52ff91fa8e4dd5625d7727671aa96193b1dbedac Mon Sep 17 00:00:00 2001 From: Ben Clive <ben.clive@grafana.com> Date: Tue, 2 Jul 2024 10:04:44 +0100 Subject: [PATCH 4/6] Use regex to detect logfmt logs --- pkg/pattern/drain/metrics.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/pattern/drain/metrics.go b/pkg/pattern/drain/metrics.go index 206d30288796b..9de42e484195a 100644 --- a/pkg/pattern/drain/metrics.go +++ b/pkg/pattern/drain/metrics.go @@ -1,7 +1,7 @@ package drain import ( - "strings" + "regexp" "github.com/prometheus/client_golang/prometheus" ) @@ -12,6 +12,8 @@ const ( FormatUnknown = "unknown" ) +var logfmtRegex = regexp.MustCompile("^(\\w+?=([^\"]\\S*?|\".+?\") )*?(\\w+?=([^\"]\\S*?|\".+?\"))+$") + // DetectLogFormat guesses at how the logs are encoded based on some simple heuristics. // It only runs on the first log line when a new stream is created, so it could do some more complex parsing or regex. func DetectLogFormat(line string) string { @@ -19,7 +21,7 @@ func DetectLogFormat(line string) string { return FormatUnknown } else if line[0] == '{' && line[len(line)-1] == '}' { return FormatJson - } else if strings.Count(line, "=") > strings.Count(line, " ")-5 { + } else if logfmtRegex.MatchString(line) { return FormatLogfmt } return FormatUnknown From 39aa4571f6c68b762bb2d63fafd1aca2395a25e1 Mon Sep 17 00:00:00 2001 From: Ben Clive <ben.clive@grafana.com> Date: Tue, 2 Jul 2024 10:54:42 +0100 Subject: [PATCH 5/6] Fix tests --- pkg/pattern/stream_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/pattern/stream_test.go b/pkg/pattern/stream_test.go index cee3df791319f..65342f7217851 100644 --- a/pkg/pattern/stream_test.go +++ b/pkg/pattern/stream_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/iter" "github.com/grafana/loki/v3/pkg/pattern/metric" @@ -28,6 +29,8 @@ func TestAddStream(t *testing.T) { Enabled: false, }, log.NewNopLogger(), + drain.FormatUnknown, + "123", ) require.NoError(t, err) @@ -65,6 +68,8 @@ func TestPruneStream(t *testing.T) { Enabled: false, }, log.NewNopLogger(), + drain.FormatUnknown, + "123", ) require.NoError(t, err) @@ -113,6 +118,8 @@ func TestSampleIterator(t *testing.T) { Enabled: true, }, log.NewNopLogger(), + drain.FormatUnknown, + "123", ) require.NoError(t, err) @@ -158,6 +165,8 @@ func TestSampleIterator(t *testing.T) { Enabled: true, }, log.NewNopLogger(), + drain.FormatUnknown, + "123", ) require.NoError(t, err) @@ -244,6 +253,8 @@ func TestSampleIterator(t *testing.T) { Enabled: true, }, log.NewNopLogger(), + drain.FormatUnknown, + "123", ) require.NoError(t, err) From ab44f68cf9f948875880ebdfe26919dd96c7ea06 Mon Sep 17 00:00:00 2001 From: Ben Clive <ben.clive@grafana.com> Date: Tue, 2 Jul 2024 11:03:05 +0100 Subject: [PATCH 6/6] Lint --- pkg/pattern/drain/drain_test.go | 2 +- pkg/pattern/drain/metrics.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/pattern/drain/drain_test.go b/pkg/pattern/drain/drain_test.go index 0e052e204efa4..4cbc84dc232ea 100644 --- a/pkg/pattern/drain/drain_test.go +++ b/pkg/pattern/drain/drain_test.go @@ -73,7 +73,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { { drain: New(DefaultConfig(), nil), inputFile: `testdata/drone-json.txt`, - format: FormatJson, + format: FormatJSON, patterns: []string{ `{"duration":<_>,"level":"debug","method":"GET","msg":"request completed","referer":"","remote":"10.136.105.40:52702","request":"/metrics","status":200,"time":"<_>:<_>:<_>","user-agent":"GrafanaAgent/v0.40.3 (flow; linux; helm)"}`, `{"id":"<_>","level":"debug","max-pool":4,"min-pool":0,"msg":"check capacity","pending-builds":0,"running-builds":0,"server-buffer":0,"server-capacity":0,"server-count":0,"time":"<_>:<_>:<_>"}`, diff --git a/pkg/pattern/drain/metrics.go b/pkg/pattern/drain/metrics.go index 9de42e484195a..3441169013546 100644 --- a/pkg/pattern/drain/metrics.go +++ b/pkg/pattern/drain/metrics.go @@ -8,7 +8,7 @@ import ( const ( FormatLogfmt = "logfmt" - FormatJson = "json" + FormatJSON = "json" FormatUnknown = "unknown" ) @@ -20,7 +20,7 @@ func DetectLogFormat(line string) string { if len(line) < 2 { return FormatUnknown } else if line[0] == '{' && line[len(line)-1] == '}' { - return FormatJson + return FormatJSON } else if logfmtRegex.MatchString(line) { return FormatLogfmt }