From 2efa5712d52169d17c54e064f83f1174ed310b0f Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 2 Jul 2024 18:39:42 +0200 Subject: [PATCH 01/10] feat: Drain uses different tokenizer based on log format --- pkg/pattern/drain/drain.go | 11 ++- pkg/pattern/drain/drain_benchmark_test.go | 2 +- pkg/pattern/drain/drain_test.go | 36 ++++----- pkg/pattern/drain/line_tokenizer.go | 56 ++++++++++++++ pkg/pattern/drain/line_tokenizer_test.go | 94 ++++++++++++++++++++++- pkg/pattern/ingester_querier.go | 2 +- pkg/pattern/instance.go | 10 ++- pkg/pattern/stream.go | 2 +- 8 files changed, 185 insertions(+), 28 deletions(-) diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index ec54a9f145cd2..9fa93f8273d03 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -147,7 +147,7 @@ func DefaultConfig() *Config { } } -func New(config *Config, metrics *Metrics) *Drain { +func New(config *Config, format string, metrics *Metrics) *Drain { if config.LogClusterDepth < 3 { panic("depth argument must be at least 3") } @@ -156,13 +156,18 @@ func New(config *Config, metrics *Metrics) *Drain { if metrics != nil { evictFn = func(int, *LogCluster) { metrics.PatternsEvictedTotal.Inc() } } - + var tokenizer LineTokenizer + if format == FormatLogfmt { + tokenizer = newLogfmtTokenizer(config.ParamString) + } else { + tokenizer = newPunctuationTokenizer() + } d := &Drain{ config: config, rootNode: createNode(), idToCluster: createLogClusterCache(config.MaxClusters, evictFn), metrics: metrics, - tokenizer: newPunctuationTokenizer(), + tokenizer: tokenizer, maxAllowedLineLength: 3000, } return d diff --git a/pkg/pattern/drain/drain_benchmark_test.go b/pkg/pattern/drain/drain_benchmark_test.go index 5ef4c98b5b6f6..e7c95f721ed4c 100644 --- a/pkg/pattern/drain/drain_benchmark_test.go +++ b/pkg/pattern/drain/drain_benchmark_test.go @@ -35,7 +35,7 @@ func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) { line := scanner.Text() lines = append(lines, line) } - drain := New(DefaultConfig(), nil) + drain := New(DefaultConfig(), DetectLogFormat(lines[0]), nil) b.ReportAllocs() b.ResetTimer() diff --git a/pkg/pattern/drain/drain_test.go b/pkg/pattern/drain/drain_test.go index 0f9313391c1eb..b0d6c3d74d629 100644 --- a/pkg/pattern/drain/drain_test.go +++ b/pkg/pattern/drain/drain_test.go @@ -27,7 +27,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { format string }{ { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: `testdata/agent-logfmt.txt`, format: FormatLogfmt, patterns: []string{ @@ -62,7 +62,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: `testdata/ingester-logfmt.txt`, format: FormatLogfmt, patterns: []string{ @@ -72,7 +72,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: `testdata/drone-json.txt`, format: FormatJSON, patterns: []string{ @@ -85,7 +85,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: "testdata/distributor-logfmt.txt", format: FormatLogfmt, patterns: []string{ @@ -97,7 +97,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: "testdata/journald.txt", format: FormatUnknown, patterns: []string{ @@ -204,7 +204,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: "testdata/kafka.txt", format: FormatUnknown, patterns: []string{ @@ -225,7 +225,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: "testdata/kubernetes.txt", format: FormatUnknown, patterns: []string{ @@ -275,7 +275,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: "testdata/vault.txt", format: FormatUnknown, patterns: []string{ @@ -284,7 +284,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: "testdata/calico.txt", format: FormatUnknown, patterns: []string{ @@ -366,7 +366,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: "testdata/grafana-ruler.txt", format: FormatLogfmt, patterns: []string{ @@ -462,7 +462,7 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) { }{ { name: "should match each line against a pattern", - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputLines: []string{ "test test test test", "test test test test", @@ -472,7 +472,7 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) { }, { name: "should also match newlines", - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputLines: []string{ `test test test test `, @@ -509,7 +509,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }{ { name: "should extract patterns that all lines match", - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputLines: []string{ "test 1 test test", "test 2 test test", @@ -519,7 +519,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "should extract patterns that match if line ends with newlines", - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputLines: []string{ `test 1 test test `, @@ -533,7 +533,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "should extract patterns that match if line ends with empty space", - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputLines: []string{ `test 1 test test `, `test 2 test test `, @@ -543,7 +543,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "should extract patterns that match if line starts with empty space", - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputLines: []string{ ` test 1 test test`, ` test 2 test test`, @@ -553,7 +553,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "Scheduler patterns are matchable", - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputLines: []string{ `ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`, `ts=2024-05-30T12:50:36.350575929Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`, @@ -651,7 +651,7 @@ func TestDrain_PruneTreeClearsOldBranches(t *testing.T) { }{ { name: "should prune old branches", - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputLines: []string{ "test test test A", "test test test B", diff --git a/pkg/pattern/drain/line_tokenizer.go b/pkg/pattern/drain/line_tokenizer.go index 89bf34a5569b5..335d318859006 100644 --- a/pkg/pattern/drain/line_tokenizer.go +++ b/pkg/pattern/drain/line_tokenizer.go @@ -1,8 +1,12 @@ package drain import ( + "bytes" "strings" "unicode" + + gologfmt "github.com/go-logfmt/logfmt" + "github.com/grafana/loki/v3/pkg/logql/log/logfmt" ) type LineTokenizer interface { @@ -131,3 +135,55 @@ func (splittingTokenizer) Join(tokens []string, state interface{}) string { } return strBuilder.String() } + +type logfmtTokenizer struct { + dec *logfmt.Decoder + varReplace string +} + +func newLogfmtTokenizer(varReplace string) *logfmtTokenizer { + return &logfmtTokenizer{ + dec: logfmt.NewDecoder(nil), + varReplace: varReplace, + } +} + +func (t *logfmtTokenizer) Tokenize(line string) ([]string, interface{}) { + var tokens []string + t.dec.Reset([]byte(line)) + for !t.dec.EOL() && t.dec.ScanKeyval() { + key := t.dec.Key() + if isTimeStampField(key) { + tokens = append(tokens, string(t.dec.Key()), t.varReplace) + + continue + } + tokens = append(tokens, string(t.dec.Key()), string(t.dec.Value())) + } + if t.dec.Err() != nil { + return nil, nil + } + return tokens, nil +} + +func (t *logfmtTokenizer) Join(tokens []string, state interface{}) string { + if len(tokens) == 0 { + return "" + } + if len(tokens)%2 == 1 { + tokens = append(tokens, "") + } + buf := bytes.NewBuffer(make([]byte, 0, 1024)) + enc := gologfmt.NewEncoder(buf) + for i := 0; i < len(tokens); i += 2 { + k, v := tokens[i], tokens[i+1] + if err := enc.EncodeKeyval(k, v); err != nil { + return "" + } + } + return buf.String() +} + +func isTimeStampField(key []byte) bool { + return bytes.EqualFold(key, []byte("ts")) || bytes.EqualFold(key, []byte("time")) || bytes.EqualFold(key, []byte("timestamp")) +} diff --git a/pkg/pattern/drain/line_tokenizer_test.go b/pkg/pattern/drain/line_tokenizer_test.go index 1eda1b51068a3..15062ffb09e5e 100644 --- a/pkg/pattern/drain/line_tokenizer_test.go +++ b/pkg/pattern/drain/line_tokenizer_test.go @@ -12,8 +12,10 @@ type TestCase struct { want map[string][]string } -const typePunctuation = "punctuation" -const typeSplitting = "splitting" +const ( + typePunctuation = "punctuation" + typeSplitting = "splitting" +) var testCases = []TestCase{ { @@ -162,3 +164,91 @@ func BenchmarkSplittingTokenizer(b *testing.B) { }) } } + +func TestLogFmtTokenizer(t *testing.T) { + param := DefaultConfig().ParamString + tests := []struct { + name string + line string + want []string + }{ + { + line: `foo=bar baz="this is a message"`, + want: []string{"foo", "bar", "baz", "this is a message"}, + }, + { + line: `foo baz="this is a message"`, + want: []string{"foo", "", "baz", "this is a message"}, + }, + { + line: `foo= baz="this is a message"`, + want: []string{"foo", "", "baz", "this is a message"}, + }, + { + line: `foo baz`, + want: []string{"foo", "", "baz", ""}, + }, + { + line: `ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`, + want: []string{"ts", param, "caller", "scheduler_processor.go:143", "level", "warn", "msg", "error contacting scheduler", "err", "rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"", "addr", "10.0.151.101:9095"}, + }, + } + + tokenizer := newLogfmtTokenizer(param) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, _ := tokenizer.Tokenize(tt.line) + require.Equal(t, tt.want, got) + }) + } +} + +func TestLogFmtTokenizerJoin(t *testing.T) { + tests := []struct { + tokens []string + want string + }{ + { + want: ``, + tokens: []string{}, + }, + { + want: `foo=bar baz="this is a message"`, + tokens: []string{"foo", "bar", "baz", "this is a message"}, + }, + { + want: `foo= baz="this is a message"`, + tokens: []string{"foo", "", "baz", "this is a message"}, + }, + { + want: `foo= baz="this is a message"`, + tokens: []string{"foo", "", "baz", "this is a message"}, + }, + { + want: `foo= baz=`, + tokens: []string{"foo", "", "baz", ""}, + }, + { + want: `foo=`, + tokens: []string{"foo"}, + }, + { + want: `foo= bar=`, + tokens: []string{"foo", "", "bar"}, + }, + { + want: `ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`, + tokens: []string{"ts", "2024-05-30T12:50:36.648377186Z", "caller", "scheduler_processor.go:143", "level", "warn", "msg", "error contacting scheduler", "err", "rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"", "addr", "10.0.151.101:9095"}, + }, + } + + tokenizer := newLogfmtTokenizer("") + + for _, tt := range tests { + t.Run("", func(t *testing.T) { + got := tokenizer.Join(tt.tokens, nil) + require.Equal(t, tt.want, got) + }) + } +} diff --git a/pkg/pattern/ingester_querier.go b/pkg/pattern/ingester_querier.go index 5e323f17dc457..1e18852eddaa3 100644 --- a/pkg/pattern/ingester_querier.go +++ b/pkg/pattern/ingester_querier.go @@ -137,7 +137,7 @@ func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int, met pruneConfig.SimTh = 1.0 // Merge & de-dup patterns but don't modify them patternsBefore := len(resp.Series) - d := drain.New(pruneConfig, nil) + d := drain.New(pruneConfig, "", nil) for _, p := range resp.Series { d.TrainPattern(p.GetPattern(), p.Samples) } diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index f19b0373858d9..4f9225ca751ea 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -75,6 +75,9 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { appendErr.Add(err) continue } + if s == nil { + continue + } err = s.Push(ctx, reqStream.Entries) if err != nil { appendErr.Add(err) @@ -210,8 +213,11 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream fp := i.getHashForLabels(labels) sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp) firstEntryLine := pushReqStream.Entries[0].Line - s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID) - + format := drain.DetectLogFormat(firstEntryLine) + if format == drain.FormatJSON { + return nil, nil + } + s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger, format, i.instanceID) if err != nil { return nil, fmt.Errorf("failed to create stream: %w", err) } diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index eae5a2c22eb47..578653580b183 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -60,7 +60,7 @@ func newStream( labels: labels, labelsString: labels.String(), labelHash: labels.Hash(), - patterns: drain.New(drain.DefaultConfig(), &drain.Metrics{ + patterns: drain.New(drain.DefaultConfig(), guessedFormat, &drain.Metrics{ PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat), PatternsDetectedTotal: metrics.patternsDetectedTotal.WithLabelValues(instanceID, guessedFormat), TokensPerLine: metrics.tokensPerLine.WithLabelValues(instanceID, guessedFormat), From feb699b738a0cb7c37385031fe108950e66387b5 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 2 Jul 2024 19:27:11 +0200 Subject: [PATCH 02/10] fixes panic in metrics stat --- pkg/pattern/drain/drain.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index 9fa93f8273d03..bae9220f2abd5 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -205,7 +205,9 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster } if d.metrics != nil { d.metrics.TokensPerLine.Observe(float64(len(tokens))) - d.metrics.StatePerLine.Observe(float64(len(state.([]int)))) + if stateInts, ok := state.([]int); ok { + d.metrics.StatePerLine.Observe(float64(len(stateInts))) + } } matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false) // Match no existing log cluster From 458d2ec4aa4242acaddb53d066ef86960bf764c8 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 3 Jul 2024 09:11:48 +0200 Subject: [PATCH 03/10] fixes panic in pattern ingestion for json --- pkg/pattern/drain/drain.go | 5 +++++ pkg/pattern/instance.go | 9 +-------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index bae9220f2abd5..c15b439c80ea2 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -169,6 +169,7 @@ func New(config *Config, format string, metrics *Metrics) *Drain { metrics: metrics, tokenizer: tokenizer, maxAllowedLineLength: 3000, + format: format, } return d } @@ -181,6 +182,7 @@ type Drain struct { metrics *Metrics tokenizer LineTokenizer maxAllowedLineLength int + format string } func (d *Drain) Clusters() []*LogCluster { @@ -192,6 +194,9 @@ func (d *Drain) TrainTokens(tokens []string, stringer func([]string) string, ts } func (d *Drain) Train(content string, ts int64) *LogCluster { + if d.format == FormatJSON { + return nil + } if len(content) > d.maxAllowedLineLength { return nil } diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index 4f9225ca751ea..ee2b0d1fb68cb 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -75,9 +75,6 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { appendErr.Add(err) continue } - if s == nil { - continue - } err = s.Push(ctx, reqStream.Entries) if err != nil { appendErr.Add(err) @@ -213,11 +210,7 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream fp := i.getHashForLabels(labels) sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp) firstEntryLine := pushReqStream.Entries[0].Line - format := drain.DetectLogFormat(firstEntryLine) - if format == drain.FormatJSON { - return nil, nil - } - s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger, format, i.instanceID) + s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID) if err != nil { return nil, fmt.Errorf("failed to create stream: %w", err) } From 3cf6a6e804af494647e7cf0a4887ed1313430db4 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 3 Jul 2024 12:25:13 +0200 Subject: [PATCH 04/10] add support for json --- pkg/pattern/drain/drain.go | 8 +++-- pkg/pattern/drain/line_tokenizer.go | 42 ++++++++++++++++++++++-- pkg/pattern/drain/line_tokenizer_test.go | 37 +++++++++++++++++++++ 3 files changed, 82 insertions(+), 5 deletions(-) diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index c15b439c80ea2..08292b798b89f 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -157,11 +157,15 @@ func New(config *Config, format string, metrics *Metrics) *Drain { evictFn = func(int, *LogCluster) { metrics.PatternsEvictedTotal.Inc() } } var tokenizer LineTokenizer - if format == FormatLogfmt { + switch format { + case FormatJSON: + tokenizer = newJSONTokenizer(config.ParamString) + case FormatLogfmt: tokenizer = newLogfmtTokenizer(config.ParamString) - } else { + default: tokenizer = newPunctuationTokenizer() } + d := &Drain{ config: config, rootNode: createNode(), diff --git a/pkg/pattern/drain/line_tokenizer.go b/pkg/pattern/drain/line_tokenizer.go index 335d318859006..fec448fdb7579 100644 --- a/pkg/pattern/drain/line_tokenizer.go +++ b/pkg/pattern/drain/line_tokenizer.go @@ -2,11 +2,14 @@ package drain import ( "bytes" + "fmt" "strings" "unicode" + "github.com/buger/jsonparser" gologfmt "github.com/go-logfmt/logfmt" "github.com/grafana/loki/v3/pkg/logql/log/logfmt" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/util" ) type LineTokenizer interface { @@ -153,7 +156,7 @@ func (t *logfmtTokenizer) Tokenize(line string) ([]string, interface{}) { t.dec.Reset([]byte(line)) for !t.dec.EOL() && t.dec.ScanKeyval() { key := t.dec.Key() - if isTimeStampField(key) { + if isVariableField(key) { tokens = append(tokens, string(t.dec.Key()), t.varReplace) continue @@ -184,6 +187,39 @@ func (t *logfmtTokenizer) Join(tokens []string, state interface{}) string { return buf.String() } -func isTimeStampField(key []byte) bool { - return bytes.EqualFold(key, []byte("ts")) || bytes.EqualFold(key, []byte("time")) || bytes.EqualFold(key, []byte("timestamp")) +type jsonTokenizer struct { + *punctuationTokenizer + varReplace string +} + +func newJSONTokenizer(varReplace string) *jsonTokenizer { + return &jsonTokenizer{newPunctuationTokenizer(), varReplace} +} + +func (t *jsonTokenizer) Tokenize(line string) ([]string, interface{}) { + var found []byte + for _, key := range []string{"log", "message", "msg", "msg_", "_msg", "content"} { + msg, ty, _, err := jsonparser.Get(util.GetUnsafeBytes(line), key) + if err == nil && ty == jsonparser.String { + found = msg + break + } + } + + if found == nil { + return nil, nil + } + tokens, state := t.punctuationTokenizer.Tokenize(util.GetUnsafeString(found)) + return tokens, state +} + +func (t *jsonTokenizer) Join(tokens []string, state interface{}) string { + return fmt.Sprintf("%s%s%s", t.varReplace, t.punctuationTokenizer.Join(tokens, state), t.varReplace) +} + +func isVariableField(key []byte) bool { + return bytes.EqualFold(key, []byte("ts")) || + bytes.EqualFold(key, []byte("traceID")) || + bytes.EqualFold(key, []byte("time")) || + bytes.EqualFold(key, []byte("timestamp")) } diff --git a/pkg/pattern/drain/line_tokenizer_test.go b/pkg/pattern/drain/line_tokenizer_test.go index 15062ffb09e5e..00eb239c4fd24 100644 --- a/pkg/pattern/drain/line_tokenizer_test.go +++ b/pkg/pattern/drain/line_tokenizer_test.go @@ -252,3 +252,40 @@ func TestLogFmtTokenizerJoin(t *testing.T) { }) } } + +func TestJsonTokenizer(t *testing.T) { + param := DefaultConfig().ParamString + tests := []struct { + name string + line string + pattern string + want []string + }{ + { + line: `{"level":30,"time":1719998371869,"pid":17,"hostname":"otel-demo-ops-paymentservice-7c759bf575-55t4p","trace_id":"1425c6df5a4321cf6a7de254de5b8204","span_id":"2ac7a3fc800b80d4","trace_flags":"01","transactionId":"e501032b-3215-4e43-b1db-f4886a906fc5","cardType":"visa","lastFourDigits":"5647","amount":{"units":{"low":656,"high":0,"unsigned":false},"nanos":549999996,"currencyCode":"USD"},"msg":"Transaction complete."}`, + want: []string{"Transaction", "complete", "."}, + pattern: "<_>Transaction complete.<_>", + }, + { + line: `{"event":{"actor":{"alternateId":"foo@grafana.com","displayName":"Foo bar","id":"dq23","type":"User"},"authenticationContext":{"authenticationStep":0,"externalSessionId":"123d"},"client":{"device":"Computer","geographicalContext":{"city":"Berlin","country":"DE","state":"Land Berlin"},"ipAddress":"0.0.0.0","userAgent":{"browser":"CHROME","os":"Mac OS X","rawUserAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"},"zone":"null"},"debugContext":{"debugData":{"authMethodFirstEnrollment":"123","authMethodFirstType":"foo","authMethodFirstVerificationTime":"2024-07-02T11:28:03.219Z","authMethodSecondEnrollment":"var","authMethodSecondType":"ddd","authMethodSecondVerificationTime":"2024-07-03T06:59:09.151Z","authnRequestId":"1","dtHash":"1","logOnlySecurityData":"{\"risk\":{\"level\":\"LOW\"},\"behaviors\":{\"New Geo-Location\":\"NEGATIVE\",\"New Device\":\"NEGATIVE\",\"New IP\":\"NEGATIVE\",\"New State\":\"NEGATIVE\",\"New Country\":\"NEGATIVE\",\"Velocity\":\"NEGATIVE\",\"New City\":\"NEGATIVE\"}}","requestId":"1","threatSuspected":"false","url":"/foo?"}},"displayMessage":"Evaluation of sign-on policy","eventType":"policy.evaluate_sign_on","legacyEventType":"app.oauth2.token.grant.refresh_token_success","outcome":{"reason":"Sign-on policy evaluation resulted in AUTHENTICATED","result":"ALLOW"},"published":"2024-07-03T09:19:59.973Z","request":{"ipChain":[{"geographicalContext":{"city":"Berlin","country":"Germany","geolocation":{"lat":52.5363,"lon":13.4169},"postalCode":"10435","state":"Land Berlin"},"ip":"95.90.234.241","version":"V4"}]},"securityContext":{"asNumber":3209,"asOrg":"kabel deutschland breitband customer 19","domain":"kabel-deutschland.de","isProxy":false,"isp":"vodafone gmbh"},"severity":"INFO","target":[{"alternateId":"Salesforce.com","detailEntry":{"signOnModeEvaluationResult":"AUTHENTICATED","signOnModeType":"SAML_2_0"},"displayName":"Salesforce.com","id":"0oa5sfmj3hz0mTgoW357","type":"AppInstance"},{"alternateId":"unknown","detailEntry":{"policyRuleFactorMode":"2FA"},"displayName":"Catch-all Rule","id":"1","type":"Rule"}],"transaction":{"detail":{},"id":"1","type":"WEB"},"uuid":"1","version":"0"},"level":"info","msg":"received event","time":"2024-07-03T09:19:59Z"}`, + want: []string{"received", "event"}, + pattern: "<_>received event<_>", + }, + { + line: `{"code":200,"message":"OK","data":{"id":"1","name":"foo"}}`, + want: []string{"OK"}, + pattern: "<_>OK<_>", + }, + } + + tokenizer := newJSONTokenizer(param) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, state := tokenizer.Tokenize(tt.line) + require.Equal(t, tt.want, got) + pattern := tokenizer.Join(got, state) + require.Equal(t, tt.pattern, pattern) + }) + } +} From 9776b5434efa7114c584d2ec5b191d09c272066d Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 3 Jul 2024 12:54:08 +0200 Subject: [PATCH 05/10] add support for json --- pkg/pattern/drain/drain.go | 3 --- pkg/pattern/drain/line_tokenizer_test.go | 5 +++++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index 08292b798b89f..06b6615303e2d 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -198,9 +198,6 @@ func (d *Drain) TrainTokens(tokens []string, stringer func([]string) string, ts } func (d *Drain) Train(content string, ts int64) *LogCluster { - if d.format == FormatJSON { - return nil - } if len(content) > d.maxAllowedLineLength { return nil } diff --git a/pkg/pattern/drain/line_tokenizer_test.go b/pkg/pattern/drain/line_tokenizer_test.go index 79ced43c5b0fe..d660288fc2af6 100644 --- a/pkg/pattern/drain/line_tokenizer_test.go +++ b/pkg/pattern/drain/line_tokenizer_test.go @@ -293,6 +293,11 @@ func TestJsonTokenizer(t *testing.T) { want: []string{"OK"}, pattern: "<_>OK<_>", }, + { + line: `{"time":"2024-07-03T10:48:10.58330448Z","level":"INFO","msg":"successfully discovered 15 agent IP addresses","git_commit":"1","git_time":"2024-06-26T06:59:04Z","git_modified":true,"go_os":"linux","go_arch":"arm64","process_generation":"ea2d9b41-0314-4ddc-a415-f8af2d80a32c","hostname_fqdn":"foobar","hostname_short":foobar","private_ips":["10.0.132.23"],"num_vcpus":8,"kafka_enabled":true,"service_protocol":"VIRTUALENV_ZONE_LOCAL","module":"agent_resolver","ip_addresses":[{"Hostname":"10.0.100.152","Port":8080},{"Hostname":"10.0.41.210","Port":8080},{"Hostname":"10.0.212.83","Port":8080},{"Hostname":"10.0.145.77","Port":8080},{"Hostname":"10.0.59.71","Port":8080},{"Hostname":"10.0.224.219","Port":8080},{"Hostname":"10.0.103.29","Port":8080},{"Hostname":"10.0.86.220","Port":8080},{"Hostname":"10.0.154.82","Port":8080},{"Hostname":"10.0.9.213","Port":8080},{"Hostname":"10.0.240.157","Port":8080},{"Hostname":"10.0.166.11","Port":8080},{"Hostname":"10.0.230.22","Port":8080},{"Hostname":"10.0.123.239","Port":8080},{"Hostname":"10.0.233.210","Port":8080}]}`, + want: []string{"successfully", "discovered", "15", "agent", "IP", "addresses"}, + pattern: "<_>successfully discovered 15 agent IP addresses<_>", + }, } tokenizer := newJSONTokenizer(param) From 9e6cfc44f46f4fbb2d3a243376dae82082893c4d Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 3 Jul 2024 20:29:13 +0200 Subject: [PATCH 06/10] improve performance by removing allocs --- pkg/pattern/drain/drain.go | 29 +++---- pkg/pattern/drain/drain_test.go | 54 ++----------- pkg/pattern/drain/line_tokenizer.go | 96 +++++++++++++++++++----- pkg/pattern/drain/line_tokenizer_test.go | 12 +-- 4 files changed, 101 insertions(+), 90 deletions(-) diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index 06b6615303e2d..54bab3ef64434 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -187,6 +187,8 @@ type Drain struct { tokenizer LineTokenizer maxAllowedLineLength int format string + tokens []string + state interface{} } func (d *Drain) Clusters() []*LogCluster { @@ -201,8 +203,8 @@ func (d *Drain) Train(content string, ts int64) *LogCluster { if len(content) > d.maxAllowedLineLength { return nil } - tokens, state := d.tokenizer.Tokenize(content) - return d.train(tokens, state, ts) + d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state) + return d.train(d.tokens, d.state, ts) } func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster { @@ -220,6 +222,7 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster if matchCluster == nil { d.clustersCounter++ clusterID := d.clustersCounter + tokens, state = d.tokenizer.Clone(tokens, state) matchCluster = &LogCluster{ Tokens: tokens, TokenState: state, @@ -235,8 +238,7 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster d.metrics.PatternsDetectedTotal.Inc() } } else { - newTemplateTokens := d.createTemplate(tokens, matchCluster.Tokens) - matchCluster.Tokens = newTemplateTokens + matchCluster.Tokens = d.createTemplate(tokens, matchCluster.Tokens) matchCluster.append(model.TimeFromUnixNano(ts)) // Touch cluster to update its state in the cache. d.idToCluster.Get(matchCluster.id) @@ -245,12 +247,13 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster } func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) *LogCluster { - tokens, state := d.tokenizer.Tokenize(content) + tokens, state := d.tokenizer.Tokenize(content, d.tokens, d.state) matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, true) // Match no existing log cluster if matchCluster == nil { d.clustersCounter++ clusterID := d.clustersCounter + tokens, state = d.tokenizer.Clone(tokens, state) matchCluster = &LogCluster{ Tokens: tokens, TokenState: state, @@ -259,8 +262,7 @@ func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) d.idToCluster.Set(clusterID, matchCluster) d.addSeqToPrefixTree(d.rootNode, matchCluster) } else { - newTemplateTokens := d.createTemplate(tokens, matchCluster.Tokens) - matchCluster.Tokens = newTemplateTokens + matchCluster.Tokens = d.createTemplate(tokens, matchCluster.Tokens) // Touch cluster to update its state in the cache. d.idToCluster.Get(matchCluster.id) } @@ -326,13 +328,6 @@ func (d *Drain) Delete(cluster *LogCluster) { d.idToCluster.cache.Remove(cluster.id) } -// Match against an already existing cluster. Match shall be perfect (sim_th=1.0). New cluster will not be created as a result of this call, nor any cluster modifications. -func (d *Drain) Match(content string) *LogCluster { - contentTokens, _ := d.tokenizer.Tokenize(content) - matchCluster := d.treeSearch(d.rootNode, contentTokens, 1.0, true) - return matchCluster -} - func (d *Drain) treeSearch(rootNode *Node, tokens []string, simTh float64, includeParams bool) *LogCluster { tokenCount := len(tokens) @@ -524,12 +519,10 @@ func (d *Drain) createTemplate(tokens, matchClusterTokens []string) []string { if len(tokens) != len(matchClusterTokens) { panic("seq1 seq2 be of same length") } - retVal := make([]string, len(matchClusterTokens)) - copy(retVal, matchClusterTokens) for i := range tokens { if tokens[i] != matchClusterTokens[i] { - retVal[i] = d.config.ParamString + matchClusterTokens[i] = d.config.ParamString } } - return retVal + return matchClusterTokens } diff --git a/pkg/pattern/drain/drain_test.go b/pkg/pattern/drain/drain_test.go index fc30b665430a3..c8c5498327fd1 100644 --- a/pkg/pattern/drain/drain_test.go +++ b/pkg/pattern/drain/drain_test.go @@ -62,7 +62,8 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { patterns: []string{ `ts=2024-04-17T09:52:46.363974185Z caller=http.go:194 level=debug traceID=1b48f5156a61ca69 msg="GET /debug/pprof/delta_mutex (200) 1.161082ms"`, `ts=<_> caller=head.go:216 level=debug tenant=987678 msg="profile is empty after delta computation" metricName=memory`, - `ts=<_> caller=http.go:194 level=debug traceID=<_> orgID=<_> msg="POST /ingester.v1.IngesterService/Push (200) <_>"`}, + `ts=<_> caller=http.go:194 level=debug traceID=<_> orgID=<_> msg="POST /ingester.v1.IngesterService/Push (200) <_>"`, + }, }, { drain: New(DefaultConfig(), "", nil), @@ -86,7 +87,8 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { `ts=2024-05-02T12:17:22.242343806Z caller=http.go:194 level=debug traceID=404c6a83a18e66a4 orgID=75 msg="POST /ingest?aggregationType=average&from=1714652227232613927&name=checkoutservice%7B__session_id__%3D294b9729f5a7de95%2Cnamespace%3Dotel-demo%7D&sampleRate=0&spyName=gospy&units=goroutines&until=1714652242232506798 (200) 2.902485ms"`, `ts=<_> caller=http.go:194 level=debug traceID=<_> orgID=1819 msg="POST /pyroscope/ingest?aggregationType=sum&from=1714652230&name=<_>%7Bapp_kubernetes_io_instance%3Dflamegraph-com%2Capp_kubernetes_io_name%3Dflamegraph-com%2Ccluster%3Dflamegraph.com%2Cinstance%<_>%<_>%2Cjob%3Dkubernetes-pods%2Cnamespace%3Dflamegraph-com%2Cpod%<_>%2Cpod_template_hash%<_>%2Cpyroscope_tenant%3Dpyroscope%2Ctier%<_>%7D&sampleRate=0&spyName=scrape&units=samples&until=1714652240 (200) <_>"`, `ts=<_> caller=http.go:194 level=debug traceID=<_> orgID=75 msg="POST /ingest?aggregationType=&from=1714652227232613927&name=checkoutservice%7B__session_id__%3D294b9729f5a7de95%2Cnamespace%3Dotel-demo%7D&sampleRate=<_>&spyName=gospy&units=&until=1714652242232506798 (200) <_>"`, - `ts=<_> caller=http.go:194 level=debug traceID=<_> orgID=<_> msg="POST /push.v1.PusherService/Push (<_>) <_>"`}, + `ts=<_> caller=http.go:194 level=debug traceID=<_> orgID=<_> msg="POST /push.v1.PusherService/Push (<_>) <_>"`, + }, }, { drain: New(DefaultConfig(), "", nil), @@ -439,6 +441,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { } for _, tt := range tests { + tt := tt t.Run(tt.inputFile, func(t *testing.T) { file, err := os.Open(tt.inputFile) require.NoError(t, err) @@ -474,53 +477,6 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { } } -func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) { - t.Parallel() - tests := []struct { - name string - drain *Drain - inputLines []string - }{ - { - name: "should match each line against a pattern", - drain: New(DefaultConfig(), "", nil), - inputLines: []string{ - "test test test test", - "test test test test", - "test test test test", - "test test test test", - }, - }, - { - name: "should also match newlines", - drain: New(DefaultConfig(), "", nil), - inputLines: []string{ - `test test test test -`, - `test test test test -`, - `test test test test -`, - `test test test test -`, - }, - }, - } - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - for _, line := range tt.inputLines { - tt.drain.Train(line, 0) - } - - for _, line := range tt.inputLines { - match := tt.drain.Match(line) - require.NotNil(t, match, `Line should match a cluster`) - } - }) - } -} - func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) { t.Parallel() tests := []struct { diff --git a/pkg/pattern/drain/line_tokenizer.go b/pkg/pattern/drain/line_tokenizer.go index 2746e13de9e4c..885e1230ec1b7 100644 --- a/pkg/pattern/drain/line_tokenizer.go +++ b/pkg/pattern/drain/line_tokenizer.go @@ -13,13 +13,14 @@ import ( ) type LineTokenizer interface { - Tokenize(line string) ([]string, interface{}) + Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) Join(tokens []string, state interface{}) string + Clone(tokens []string, state interface{}) ([]string, interface{}) } type spacesTokenizer struct{} -func (spacesTokenizer) Tokenize(line string) ([]string, interface{}) { +func (spacesTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) { return strings.Split(line, " "), nil } @@ -27,6 +28,12 @@ func (spacesTokenizer) Join(tokens []string, _ interface{}) string { return strings.Join(tokens, " ") } +func (spacesTokenizer) Clone(tokens []string, _ interface{}) ([]string, interface{}) { + res := make([]string, len(tokens)) + copy(res, tokens) + return res, nil +} + type punctuationTokenizer struct { includeDelimiters [128]rune excludeDelimiters [128]rune @@ -46,13 +53,20 @@ func newPunctuationTokenizer() *punctuationTokenizer { } } -func (p *punctuationTokenizer) Tokenize(line string) ([]string, interface{}) { - tokens := make([]string, 0, 128) - spacesAfter := make([]int, 0, 64) +func (p *punctuationTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) { + if cap(tokens) == 0 { + tokens = make([]string, 0, 128) + } + tokens = tokens[:0] + if state == nil || cap(state.([]int)) == 0 { + state = make([]int, 0, 64) + } + spacesAfter := state.([]int) + spacesAfter = spacesAfter[:0] start := 0 for i, char := range line { - if len(tokens) >= cap(tokens)-1 { + if len(tokens) >= 127 { break } if unicode.IsLetter(char) || unicode.IsNumber(char) || char < 128 && p.excludeDelimiters[char] != 0 { @@ -94,9 +108,23 @@ func (p *punctuationTokenizer) Join(tokens []string, state interface{}) string { return strBuilder.String() } +func (p *punctuationTokenizer) Clone(tokens []string, state interface{}) ([]string, interface{}) { + res := make([]string, len(tokens)) + for i, token := range tokens { + res[i] = strings.Clone(token) + } + if state == nil { + return res, nil + } + spacesAfter := state.([]int) + spacesAfterCopy := make([]int, len(spacesAfter)) + copy(spacesAfterCopy, spacesAfter) + return res, spacesAfterCopy +} + type splittingTokenizer struct{} -func (splittingTokenizer) Tokenize(line string) ([]string, interface{}) { +func (splittingTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) { numEquals := strings.Count(line, "=") numColons := strings.Count(line, ":") numSpaces := strings.Count(line, " ") @@ -108,8 +136,16 @@ func (splittingTokenizer) Tokenize(line string) ([]string, interface{}) { expectedTokens = numSpaces + numColons } - tokens := make([]string, 0, expectedTokens) - spacesAfter := make([]int, 0, strings.Count(line, " ")) + if cap(tokens) == 0 { + tokens = make([]string, 0, expectedTokens) + } + tokens = tokens[:0] + if state == nil || cap(state.([]int)) == 0 { + state = make([]int, 0, numSpaces) + } + spacesAfter := state.([]int) + spacesAfter = spacesAfter[:0] + for _, token := range strings.SplitAfter(line, keyvalSeparator) { words := strings.Split(token, " ") for i, entry := range words { @@ -138,6 +174,20 @@ func (splittingTokenizer) Join(tokens []string, state interface{}) string { return strBuilder.String() } +func (splittingTokenizer) Clone(tokens []string, state interface{}) ([]string, interface{}) { + res := make([]string, len(tokens)) + for i, token := range tokens { + res[i] = strings.Clone(token) + } + if state == nil { + return res, nil + } + spacesAfter := state.([]int) + spacesAfterCopy := make([]int, len(spacesAfter)) + copy(spacesAfterCopy, spacesAfter) + return res, spacesAfterCopy +} + type logfmtTokenizer struct { dec *logfmt.Decoder varReplace string @@ -150,17 +200,21 @@ func newLogfmtTokenizer(varReplace string) *logfmtTokenizer { } } -func (t *logfmtTokenizer) Tokenize(line string) ([]string, interface{}) { - var tokens []string - t.dec.Reset([]byte(line)) +func (t *logfmtTokenizer) Tokenize(line string, tokens []string, _ interface{}) ([]string, interface{}) { + if cap(tokens) == 0 { + tokens = make([]string, 0, 64) + } + tokens = tokens[:0] + t.dec.Reset(util.GetUnsafeBytes(line)) for !t.dec.EOL() && t.dec.ScanKeyval() { key := t.dec.Key() if isVariableField(key) { - tokens = append(tokens, string(t.dec.Key()), t.varReplace) + tokens = append(tokens, util.GetUnsafeString(t.dec.Key()), t.varReplace) continue } - tokens = append(tokens, string(t.dec.Key()), string(t.dec.Value())) + // todo we want to pass bytes and let user copy if needed. + tokens = append(tokens, util.GetUnsafeString(t.dec.Key()), util.GetUnsafeString(t.dec.Value())) } if t.dec.Err() != nil { return nil, nil @@ -186,6 +240,14 @@ func (t *logfmtTokenizer) Join(tokens []string, state interface{}) string { return buf.String() } +func (t *logfmtTokenizer) Clone(tokens []string, _ interface{}) ([]string, interface{}) { + res := make([]string, len(tokens)) + for i, token := range tokens { + res[i] = strings.Clone(token) + } + return res, nil +} + type jsonTokenizer struct { *punctuationTokenizer varReplace string @@ -195,7 +257,7 @@ func newJSONTokenizer(varReplace string) *jsonTokenizer { return &jsonTokenizer{newPunctuationTokenizer(), varReplace} } -func (t *jsonTokenizer) Tokenize(line string) ([]string, interface{}) { +func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) { var found []byte for _, key := range []string{"log", "message", "msg", "msg_", "_msg", "content"} { msg, ty, _, err := jsonparser.Get(util.GetUnsafeBytes(line), key) @@ -208,8 +270,8 @@ func (t *jsonTokenizer) Tokenize(line string) ([]string, interface{}) { if found == nil { return nil, nil } - tokens, state := t.punctuationTokenizer.Tokenize(util.GetUnsafeString(found)) - return tokens, state + + return t.punctuationTokenizer.Tokenize(util.GetUnsafeString(found), tokens, state) } func (t *jsonTokenizer) Join(tokens []string, state interface{}) string { diff --git a/pkg/pattern/drain/line_tokenizer_test.go b/pkg/pattern/drain/line_tokenizer_test.go index 0534da807e899..1c1d47dda937e 100644 --- a/pkg/pattern/drain/line_tokenizer_test.go +++ b/pkg/pattern/drain/line_tokenizer_test.go @@ -135,7 +135,7 @@ func TestTokenizer_Tokenize(t *testing.T) { for _, tt := range tests { for _, tc := range testCases { t.Run(tt.name+":"+tc.name, func(t *testing.T) { - got, _ := tt.tokenizer.Tokenize(tc.line) + got, _ := tt.tokenizer.Tokenize(tc.line, nil, nil) require.Equal(t, tc.want[tt.name], got) }) } @@ -160,7 +160,7 @@ func TestTokenizer_TokenizeAndJoin(t *testing.T) { for _, tt := range tests { for _, tc := range testCases { t.Run(tt.name+":"+tc.name, func(t *testing.T) { - got := tt.tokenizer.Join(tt.tokenizer.Tokenize(tc.line)) + got := tt.tokenizer.Join(tt.tokenizer.Tokenize(tc.line, nil, nil)) require.Equal(t, tc.line, got) }) } @@ -176,7 +176,7 @@ func BenchmarkSplittingTokenizer(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - tokenizer.Tokenize(tc.line) + tokenizer.Tokenize(tc.line, nil, nil) } }) } @@ -215,7 +215,7 @@ func TestLogFmtTokenizer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, _ := tokenizer.Tokenize(tt.line) + got, _ := tokenizer.Tokenize(tt.line, nil, nil) require.Equal(t, tt.want, got) }) } @@ -280,7 +280,7 @@ func TestJsonTokenizer(t *testing.T) { }{ { line: `{"level":30,"time":1719998371869,"pid":17,"hostname":"otel-demo-ops-paymentservice-7c759bf575-55t4p","trace_id":"1425c6df5a4321cf6a7de254de5b8204","span_id":"2ac7a3fc800b80d4","trace_flags":"01","transactionId":"e501032b-3215-4e43-b1db-f4886a906fc5","cardType":"visa","lastFourDigits":"5647","amount":{"units":{"low":656,"high":0,"unsigned":false},"nanos":549999996,"currencyCode":"USD"},"msg":"Transaction complete."}`, - want: []string{"Transaction", "complete", "."}, + want: []string{"Transaction", "complete."}, pattern: "<_>Transaction complete.<_>", }, { @@ -304,7 +304,7 @@ func TestJsonTokenizer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, state := tokenizer.Tokenize(tt.line) + got, state := tokenizer.Tokenize(tt.line, nil, nil) require.Equal(t, tt.want, got) pattern := tokenizer.Join(got, state) require.Equal(t, tt.pattern, pattern) From 466197db563ecc09727ac17a0d40e6ef8fdf6564 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 3 Jul 2024 22:25:20 +0200 Subject: [PATCH 07/10] use better unsafe function --- pkg/pattern/drain/drain.go | 10 +++++++++- pkg/pattern/drain/line_tokenizer.go | 11 +++++------ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index 54bab3ef64434..8311036c3df86 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -292,7 +292,7 @@ func deduplicatePlaceholders(line string, placeholder string) string { } builder = append(builder, line[low:]...) - return unsafe.String(unsafe.SliceData(builder), len(builder)) + return unsafeString(builder) } func (d *Drain) PatternString(c *LogCluster) string { @@ -526,3 +526,11 @@ func (d *Drain) createTemplate(tokens, matchClusterTokens []string) []string { } return matchClusterTokens } + +func unsafeString(s []byte) string { + return unsafe.String(unsafe.SliceData(s), len(s)) +} + +func unsafeBytes(s string) []byte { + return unsafe.Slice(unsafe.StringData(s), len(s)) +} diff --git a/pkg/pattern/drain/line_tokenizer.go b/pkg/pattern/drain/line_tokenizer.go index 885e1230ec1b7..bf142ea895809 100644 --- a/pkg/pattern/drain/line_tokenizer.go +++ b/pkg/pattern/drain/line_tokenizer.go @@ -9,7 +9,6 @@ import ( "github.com/buger/jsonparser" gologfmt "github.com/go-logfmt/logfmt" "github.com/grafana/loki/v3/pkg/logql/log/logfmt" - "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/util" ) type LineTokenizer interface { @@ -205,16 +204,16 @@ func (t *logfmtTokenizer) Tokenize(line string, tokens []string, _ interface{}) tokens = make([]string, 0, 64) } tokens = tokens[:0] - t.dec.Reset(util.GetUnsafeBytes(line)) + t.dec.Reset(unsafeBytes(line)) for !t.dec.EOL() && t.dec.ScanKeyval() { key := t.dec.Key() if isVariableField(key) { - tokens = append(tokens, util.GetUnsafeString(t.dec.Key()), t.varReplace) + tokens = append(tokens, unsafeString(t.dec.Key()), t.varReplace) continue } // todo we want to pass bytes and let user copy if needed. - tokens = append(tokens, util.GetUnsafeString(t.dec.Key()), util.GetUnsafeString(t.dec.Value())) + tokens = append(tokens, unsafeString(t.dec.Key()), unsafeString(t.dec.Value())) } if t.dec.Err() != nil { return nil, nil @@ -260,7 +259,7 @@ func newJSONTokenizer(varReplace string) *jsonTokenizer { func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) { var found []byte for _, key := range []string{"log", "message", "msg", "msg_", "_msg", "content"} { - msg, ty, _, err := jsonparser.Get(util.GetUnsafeBytes(line), key) + msg, ty, _, err := jsonparser.Get(unsafeBytes(line), key) if err == nil && ty == jsonparser.String { found = msg break @@ -271,7 +270,7 @@ func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{} return nil, nil } - return t.punctuationTokenizer.Tokenize(util.GetUnsafeString(found), tokens, state) + return t.punctuationTokenizer.Tokenize(unsafeString(found), tokens, state) } func (t *jsonTokenizer) Join(tokens []string, state interface{}) string { From b390b5839dabd8dbd1cb9e3ce09dcd2aec32b587 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 3 Jul 2024 22:39:12 +0200 Subject: [PATCH 08/10] lint --- pkg/pattern/drain/line_tokenizer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/pattern/drain/line_tokenizer.go b/pkg/pattern/drain/line_tokenizer.go index bf142ea895809..4da135f06b0e1 100644 --- a/pkg/pattern/drain/line_tokenizer.go +++ b/pkg/pattern/drain/line_tokenizer.go @@ -19,7 +19,7 @@ type LineTokenizer interface { type spacesTokenizer struct{} -func (spacesTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) { +func (spacesTokenizer) Tokenize(line string, _ []string, _ interface{}) ([]string, interface{}) { return strings.Split(line, " "), nil } @@ -221,7 +221,7 @@ func (t *logfmtTokenizer) Tokenize(line string, tokens []string, _ interface{}) return tokens, nil } -func (t *logfmtTokenizer) Join(tokens []string, state interface{}) string { +func (t *logfmtTokenizer) Join(tokens []string, _ interface{}) string { if len(tokens) == 0 { return "" } From 347b0e9565c137f173d1fdc997527b14b06d7a83 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 4 Jul 2024 00:00:43 +0200 Subject: [PATCH 09/10] lint files --- pkg/pattern/drain/line_tokenizer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/pattern/drain/line_tokenizer.go b/pkg/pattern/drain/line_tokenizer.go index 4da135f06b0e1..d46519992af11 100644 --- a/pkg/pattern/drain/line_tokenizer.go +++ b/pkg/pattern/drain/line_tokenizer.go @@ -8,6 +8,7 @@ import ( "github.com/buger/jsonparser" gologfmt "github.com/go-logfmt/logfmt" + "github.com/grafana/loki/v3/pkg/logql/log/logfmt" ) @@ -279,6 +280,7 @@ func (t *jsonTokenizer) Join(tokens []string, state interface{}) string { func isVariableField(key []byte) bool { return bytes.EqualFold(key, []byte("ts")) || + bytes.Equal(key, []byte("t")) || bytes.EqualFold(key, []byte("traceID")) || bytes.EqualFold(key, []byte("time")) || bytes.EqualFold(key, []byte("timestamp")) From a7bd24f0eb2f9c9861cb3f904148298ff756c11a Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 4 Jul 2024 10:43:08 +0200 Subject: [PATCH 10/10] skip empty streams --- pkg/pattern/instance.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index 55fc4e8f9e9d6..a4a65f0cb7c5e 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -66,6 +66,9 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { appendErr := multierror.New() for _, reqStream := range req.Streams { + if reqStream.Entries == nil || len(reqStream.Entries) == 0 { + continue + } s, _, err := i.streams.LoadOrStoreNew(reqStream.Labels, func() (*stream, error) { // add stream