diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index ec54a9f145cd2..8311036c3df86 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -147,7 +147,7 @@ func DefaultConfig() *Config { } } -func New(config *Config, metrics *Metrics) *Drain { +func New(config *Config, format string, metrics *Metrics) *Drain { if config.LogClusterDepth < 3 { panic("depth argument must be at least 3") } @@ -156,14 +156,24 @@ func New(config *Config, metrics *Metrics) *Drain { if metrics != nil { evictFn = func(int, *LogCluster) { metrics.PatternsEvictedTotal.Inc() } } + var tokenizer LineTokenizer + switch format { + case FormatJSON: + tokenizer = newJSONTokenizer(config.ParamString) + case FormatLogfmt: + tokenizer = newLogfmtTokenizer(config.ParamString) + default: + tokenizer = newPunctuationTokenizer() + } d := &Drain{ config: config, rootNode: createNode(), idToCluster: createLogClusterCache(config.MaxClusters, evictFn), metrics: metrics, - tokenizer: newPunctuationTokenizer(), + tokenizer: tokenizer, maxAllowedLineLength: 3000, + format: format, } return d } @@ -176,6 +186,9 @@ type Drain struct { metrics *Metrics tokenizer LineTokenizer maxAllowedLineLength int + format string + tokens []string + state interface{} } func (d *Drain) Clusters() []*LogCluster { @@ -190,8 +203,8 @@ func (d *Drain) Train(content string, ts int64) *LogCluster { if len(content) > d.maxAllowedLineLength { return nil } - tokens, state := d.tokenizer.Tokenize(content) - return d.train(tokens, state, ts) + d.tokens, d.state = d.tokenizer.Tokenize(content, d.tokens, d.state) + return d.train(d.tokens, d.state, ts) } func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster { @@ -200,13 +213,16 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster } if d.metrics != nil { d.metrics.TokensPerLine.Observe(float64(len(tokens))) - d.metrics.StatePerLine.Observe(float64(len(state.([]int)))) + if stateInts, ok := state.([]int); ok { + d.metrics.StatePerLine.Observe(float64(len(stateInts))) + } } matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false) // Match no existing log cluster if matchCluster == nil { d.clustersCounter++ clusterID := d.clustersCounter + tokens, state = d.tokenizer.Clone(tokens, state) matchCluster = &LogCluster{ Tokens: tokens, TokenState: state, @@ -222,8 +238,7 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster d.metrics.PatternsDetectedTotal.Inc() } } else { - newTemplateTokens := d.createTemplate(tokens, matchCluster.Tokens) - matchCluster.Tokens = newTemplateTokens + matchCluster.Tokens = d.createTemplate(tokens, matchCluster.Tokens) matchCluster.append(model.TimeFromUnixNano(ts)) // Touch cluster to update its state in the cache. d.idToCluster.Get(matchCluster.id) @@ -232,12 +247,13 @@ func (d *Drain) train(tokens []string, state interface{}, ts int64) *LogCluster } func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) *LogCluster { - tokens, state := d.tokenizer.Tokenize(content) + tokens, state := d.tokenizer.Tokenize(content, d.tokens, d.state) matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, true) // Match no existing log cluster if matchCluster == nil { d.clustersCounter++ clusterID := d.clustersCounter + tokens, state = d.tokenizer.Clone(tokens, state) matchCluster = &LogCluster{ Tokens: tokens, TokenState: state, @@ -246,8 +262,7 @@ func (d *Drain) TrainPattern(content string, samples []*logproto.PatternSample) d.idToCluster.Set(clusterID, matchCluster) d.addSeqToPrefixTree(d.rootNode, matchCluster) } else { - newTemplateTokens := d.createTemplate(tokens, matchCluster.Tokens) - matchCluster.Tokens = newTemplateTokens + matchCluster.Tokens = d.createTemplate(tokens, matchCluster.Tokens) // Touch cluster to update its state in the cache. d.idToCluster.Get(matchCluster.id) } @@ -277,7 +292,7 @@ func deduplicatePlaceholders(line string, placeholder string) string { } builder = append(builder, line[low:]...) - return unsafe.String(unsafe.SliceData(builder), len(builder)) + return unsafeString(builder) } func (d *Drain) PatternString(c *LogCluster) string { @@ -313,13 +328,6 @@ func (d *Drain) Delete(cluster *LogCluster) { d.idToCluster.cache.Remove(cluster.id) } -// Match against an already existing cluster. Match shall be perfect (sim_th=1.0). New cluster will not be created as a result of this call, nor any cluster modifications. -func (d *Drain) Match(content string) *LogCluster { - contentTokens, _ := d.tokenizer.Tokenize(content) - matchCluster := d.treeSearch(d.rootNode, contentTokens, 1.0, true) - return matchCluster -} - func (d *Drain) treeSearch(rootNode *Node, tokens []string, simTh float64, includeParams bool) *LogCluster { tokenCount := len(tokens) @@ -511,12 +519,18 @@ func (d *Drain) createTemplate(tokens, matchClusterTokens []string) []string { if len(tokens) != len(matchClusterTokens) { panic("seq1 seq2 be of same length") } - retVal := make([]string, len(matchClusterTokens)) - copy(retVal, matchClusterTokens) for i := range tokens { if tokens[i] != matchClusterTokens[i] { - retVal[i] = d.config.ParamString + matchClusterTokens[i] = d.config.ParamString } } - return retVal + return matchClusterTokens +} + +func unsafeString(s []byte) string { + return unsafe.String(unsafe.SliceData(s), len(s)) +} + +func unsafeBytes(s string) []byte { + return unsafe.Slice(unsafe.StringData(s), len(s)) } diff --git a/pkg/pattern/drain/drain_benchmark_test.go b/pkg/pattern/drain/drain_benchmark_test.go index 5ef4c98b5b6f6..e7c95f721ed4c 100644 --- a/pkg/pattern/drain/drain_benchmark_test.go +++ b/pkg/pattern/drain/drain_benchmark_test.go @@ -35,7 +35,7 @@ func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) { line := scanner.Text() lines = append(lines, line) } - drain := New(DefaultConfig(), nil) + drain := New(DefaultConfig(), DetectLogFormat(lines[0]), nil) b.ReportAllocs() b.ResetTimer() diff --git a/pkg/pattern/drain/drain_test.go b/pkg/pattern/drain/drain_test.go index 1f964a0fef70c..944fd5ccdcb98 100644 --- a/pkg/pattern/drain/drain_test.go +++ b/pkg/pattern/drain/drain_test.go @@ -27,7 +27,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { format string }{ { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: `testdata/agent-logfmt.txt`, format: FormatLogfmt, patterns: []string{ @@ -56,7 +56,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: `testdata/ingester-logfmt.txt`, format: FormatLogfmt, patterns: []string{ @@ -66,7 +66,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: `testdata/drone-json.txt`, format: FormatJSON, patterns: []string{ @@ -79,7 +79,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: "testdata/distributor-logfmt.txt", format: FormatLogfmt, patterns: []string{ @@ -91,7 +91,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: "testdata/journald.txt", format: FormatUnknown, patterns: []string{ @@ -211,7 +211,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: "testdata/kafka.txt", format: FormatUnknown, patterns: []string{ @@ -232,7 +232,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: "testdata/kubernetes.txt", format: FormatUnknown, patterns: []string{ @@ -273,7 +273,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: "testdata/vault.txt", format: FormatUnknown, patterns: []string{ @@ -281,7 +281,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: "testdata/calico.txt", format: FormatUnknown, patterns: []string{ @@ -374,7 +374,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }, }, { - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputFile: "testdata/grafana-ruler.txt", format: FormatLogfmt, patterns: []string{ @@ -426,6 +426,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { } for _, tt := range tests { + tt := tt t.Run(tt.inputFile, func(t *testing.T) { file, err := os.Open(tt.inputFile) require.NoError(t, err) @@ -461,53 +462,6 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { } } -func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) { - t.Parallel() - tests := []struct { - name string - drain *Drain - inputLines []string - }{ - { - name: "should match each line against a pattern", - drain: New(DefaultConfig(), nil), - inputLines: []string{ - "test test test test", - "test test test test", - "test test test test", - "test test test test", - }, - }, - { - name: "should also match newlines", - drain: New(DefaultConfig(), nil), - inputLines: []string{ - `test test test test -`, - `test test test test -`, - `test test test test -`, - `test test test test -`, - }, - }, - } - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - for _, line := range tt.inputLines { - tt.drain.Train(line, 0) - } - - for _, line := range tt.inputLines { - match := tt.drain.Match(line) - require.NotNil(t, match, `Line should match a cluster`) - } - }) - } -} - func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) { t.Parallel() tests := []struct { @@ -517,7 +471,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }{ { name: "should extract patterns that all lines match", - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputLines: []string{ "test 1 test test", "test 2 test test", @@ -527,7 +481,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "should extract patterns that match if line ends with newlines", - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputLines: []string{ `test 1 test test `, @@ -541,7 +495,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "should extract patterns that match if line ends with empty space", - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputLines: []string{ `test 1 test test `, `test 2 test test `, @@ -551,7 +505,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "should extract patterns that match if line starts with empty space", - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputLines: []string{ ` test 1 test test`, ` test 2 test test`, @@ -561,7 +515,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) }, { name: "Scheduler patterns are matchable", - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputLines: []string{ `ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`, `ts=2024-05-30T12:50:36.350575929Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`, @@ -659,7 +613,7 @@ func TestDrain_PruneTreeClearsOldBranches(t *testing.T) { }{ { name: "should prune old branches", - drain: New(DefaultConfig(), nil), + drain: New(DefaultConfig(), "", nil), inputLines: []string{ "test test test A", "test test test B", diff --git a/pkg/pattern/drain/line_tokenizer.go b/pkg/pattern/drain/line_tokenizer.go index 075cd32c8ff0a..04b998b8dba53 100644 --- a/pkg/pattern/drain/line_tokenizer.go +++ b/pkg/pattern/drain/line_tokenizer.go @@ -1,18 +1,26 @@ package drain import ( + "bytes" + "fmt" "strings" "unicode" + + "github.com/buger/jsonparser" + gologfmt "github.com/go-logfmt/logfmt" + + "github.com/grafana/loki/v3/pkg/logql/log/logfmt" ) type LineTokenizer interface { - Tokenize(line string) ([]string, interface{}) + Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) Join(tokens []string, state interface{}) string + Clone(tokens []string, state interface{}) ([]string, interface{}) } type spacesTokenizer struct{} -func (spacesTokenizer) Tokenize(line string) ([]string, interface{}) { +func (spacesTokenizer) Tokenize(line string, _ []string, _ interface{}) ([]string, interface{}) { return strings.Split(line, " "), nil } @@ -20,6 +28,12 @@ func (spacesTokenizer) Join(tokens []string, _ interface{}) string { return strings.Join(tokens, " ") } +func (spacesTokenizer) Clone(tokens []string, _ interface{}) ([]string, interface{}) { + res := make([]string, len(tokens)) + copy(res, tokens) + return res, nil +} + type punctuationTokenizer struct { includeDelimiters [128]rune excludeDelimiters [128]rune @@ -40,13 +54,20 @@ func newPunctuationTokenizer() *punctuationTokenizer { } } -func (p *punctuationTokenizer) Tokenize(line string) ([]string, interface{}) { - tokens := make([]string, 0, 128) - spacesAfter := make([]int, 0, 64) +func (p *punctuationTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) { + if cap(tokens) == 0 { + tokens = make([]string, 0, 128) + } + tokens = tokens[:0] + if state == nil || cap(state.([]int)) == 0 { + state = make([]int, 0, 64) + } + spacesAfter := state.([]int) + spacesAfter = spacesAfter[:0] start := 0 for i, char := range line { - if len(tokens) >= cap(tokens)-1 { + if len(tokens) >= 127 { break } if unicode.IsLetter(char) || unicode.IsNumber(char) || char < 128 && p.excludeDelimiters[char] != 0 { @@ -88,9 +109,23 @@ func (p *punctuationTokenizer) Join(tokens []string, state interface{}) string { return strBuilder.String() } +func (p *punctuationTokenizer) Clone(tokens []string, state interface{}) ([]string, interface{}) { + res := make([]string, len(tokens)) + for i, token := range tokens { + res[i] = strings.Clone(token) + } + if state == nil { + return res, nil + } + spacesAfter := state.([]int) + spacesAfterCopy := make([]int, len(spacesAfter)) + copy(spacesAfterCopy, spacesAfter) + return res, spacesAfterCopy +} + type splittingTokenizer struct{} -func (splittingTokenizer) Tokenize(line string) ([]string, interface{}) { +func (splittingTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) { numEquals := strings.Count(line, "=") numColons := strings.Count(line, ":") numSpaces := strings.Count(line, " ") @@ -102,8 +137,16 @@ func (splittingTokenizer) Tokenize(line string) ([]string, interface{}) { expectedTokens = numSpaces + numColons } - tokens := make([]string, 0, expectedTokens) - spacesAfter := make([]int, 0, strings.Count(line, " ")) + if cap(tokens) == 0 { + tokens = make([]string, 0, expectedTokens) + } + tokens = tokens[:0] + if state == nil || cap(state.([]int)) == 0 { + state = make([]int, 0, numSpaces) + } + spacesAfter := state.([]int) + spacesAfter = spacesAfter[:0] + for _, token := range strings.SplitAfter(line, keyvalSeparator) { words := strings.Split(token, " ") for i, entry := range words { @@ -131,3 +174,115 @@ func (splittingTokenizer) Join(tokens []string, state interface{}) string { } return strBuilder.String() } + +func (splittingTokenizer) Clone(tokens []string, state interface{}) ([]string, interface{}) { + res := make([]string, len(tokens)) + for i, token := range tokens { + res[i] = strings.Clone(token) + } + if state == nil { + return res, nil + } + spacesAfter := state.([]int) + spacesAfterCopy := make([]int, len(spacesAfter)) + copy(spacesAfterCopy, spacesAfter) + return res, spacesAfterCopy +} + +type logfmtTokenizer struct { + dec *logfmt.Decoder + varReplace string +} + +func newLogfmtTokenizer(varReplace string) *logfmtTokenizer { + return &logfmtTokenizer{ + dec: logfmt.NewDecoder(nil), + varReplace: varReplace, + } +} + +func (t *logfmtTokenizer) Tokenize(line string, tokens []string, _ interface{}) ([]string, interface{}) { + if cap(tokens) == 0 { + tokens = make([]string, 0, 64) + } + tokens = tokens[:0] + t.dec.Reset(unsafeBytes(line)) + for !t.dec.EOL() && t.dec.ScanKeyval() { + key := t.dec.Key() + if isVariableField(key) { + tokens = append(tokens, unsafeString(t.dec.Key()), t.varReplace) + + continue + } + // todo we want to pass bytes and let user copy if needed. + tokens = append(tokens, unsafeString(t.dec.Key()), unsafeString(t.dec.Value())) + } + if t.dec.Err() != nil { + return nil, nil + } + return tokens, nil +} + +func (t *logfmtTokenizer) Join(tokens []string, _ interface{}) string { + if len(tokens) == 0 { + return "" + } + if len(tokens)%2 == 1 { + tokens = append(tokens, "") + } + buf := bytes.NewBuffer(make([]byte, 0, 1024)) + enc := gologfmt.NewEncoder(buf) + for i := 0; i < len(tokens); i += 2 { + k, v := tokens[i], tokens[i+1] + if err := enc.EncodeKeyval(k, v); err != nil { + return "" + } + } + return buf.String() +} + +func (t *logfmtTokenizer) Clone(tokens []string, _ interface{}) ([]string, interface{}) { + res := make([]string, len(tokens)) + for i, token := range tokens { + res[i] = strings.Clone(token) + } + return res, nil +} + +type jsonTokenizer struct { + *punctuationTokenizer + varReplace string +} + +func newJSONTokenizer(varReplace string) *jsonTokenizer { + return &jsonTokenizer{newPunctuationTokenizer(), varReplace} +} + +func (t *jsonTokenizer) Tokenize(line string, tokens []string, state interface{}) ([]string, interface{}) { + var found []byte + for _, key := range []string{"log", "message", "msg", "msg_", "_msg", "content"} { + msg, ty, _, err := jsonparser.Get(unsafeBytes(line), key) + if err == nil && ty == jsonparser.String { + found = msg + break + } + } + + if found == nil { + return nil, nil + } + + return t.punctuationTokenizer.Tokenize(unsafeString(found), tokens, state) +} + +func (t *jsonTokenizer) Join(tokens []string, state interface{}) string { + return fmt.Sprintf("%s%s%s", t.varReplace, t.punctuationTokenizer.Join(tokens, state), t.varReplace) +} + +func isVariableField(key []byte) bool { + return bytes.EqualFold(key, []byte("ts")) || + bytes.Equal(key, []byte("t")) || + bytes.EqualFold(key, []byte("traceID")) || + bytes.EqualFold(key, []byte("time")) || + bytes.EqualFold(key, []byte("timestamp")) +} diff --git a/pkg/pattern/drain/line_tokenizer_test.go b/pkg/pattern/drain/line_tokenizer_test.go index 1193c62898bb9..106d5d44b4870 100644 --- a/pkg/pattern/drain/line_tokenizer_test.go +++ b/pkg/pattern/drain/line_tokenizer_test.go @@ -13,8 +13,10 @@ type TestCase struct { want map[string][]string } -const typePunctuation = "punctuation" -const typeSplitting = "splitting" +const ( + typePunctuation = "punctuation" + typeSplitting = "splitting" +) var testCases = []TestCase{ { @@ -133,7 +135,7 @@ func TestTokenizer_Tokenize(t *testing.T) { for _, tt := range tests { for _, tc := range testCases { t.Run(tt.name+":"+tc.name, func(t *testing.T) { - got, _ := tt.tokenizer.Tokenize(tc.line) + got, _ := tt.tokenizer.Tokenize(tc.line, nil, nil) require.Equal(t, tc.want[tt.name], got) }) } @@ -158,7 +160,7 @@ func TestTokenizer_TokenizeAndJoin(t *testing.T) { for _, tt := range tests { for _, tc := range testCases { t.Run(tt.name+":"+tc.name, func(t *testing.T) { - got := tt.tokenizer.Join(tt.tokenizer.Tokenize(tc.line)) + got := tt.tokenizer.Join(tt.tokenizer.Tokenize(tc.line, nil, nil)) require.Equal(t, tc.line, got) }) } @@ -174,8 +176,138 @@ func BenchmarkSplittingTokenizer(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - tokenizer.Tokenize(tc.line) + tokenizer.Tokenize(tc.line, nil, nil) } }) } } + +func TestLogFmtTokenizer(t *testing.T) { + param := DefaultConfig().ParamString + tests := []struct { + name string + line string + want []string + }{ + { + line: `foo=bar baz="this is a message"`, + want: []string{"foo", "bar", "baz", "this is a message"}, + }, + { + line: `foo baz="this is a message"`, + want: []string{"foo", "", "baz", "this is a message"}, + }, + { + line: `foo= baz="this is a message"`, + want: []string{"foo", "", "baz", "this is a message"}, + }, + { + line: `foo baz`, + want: []string{"foo", "", "baz", ""}, + }, + { + line: `ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`, + want: []string{"ts", param, "caller", "scheduler_processor.go:143", "level", "warn", "msg", "error contacting scheduler", "err", "rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"", "addr", "10.0.151.101:9095"}, + }, + } + + tokenizer := newLogfmtTokenizer(param) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, _ := tokenizer.Tokenize(tt.line, nil, nil) + require.Equal(t, tt.want, got) + }) + } +} + +func TestLogFmtTokenizerJoin(t *testing.T) { + tests := []struct { + tokens []string + want string + }{ + { + want: ``, + tokens: []string{}, + }, + { + want: `foo=bar baz="this is a message"`, + tokens: []string{"foo", "bar", "baz", "this is a message"}, + }, + { + want: `foo= baz="this is a message"`, + tokens: []string{"foo", "", "baz", "this is a message"}, + }, + { + want: `foo= baz="this is a message"`, + tokens: []string{"foo", "", "baz", "this is a message"}, + }, + { + want: `foo= baz=`, + tokens: []string{"foo", "", "baz", ""}, + }, + { + want: `foo=`, + tokens: []string{"foo"}, + }, + { + want: `foo= bar=`, + tokens: []string{"foo", "", "bar"}, + }, + { + want: `ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`, + tokens: []string{"ts", "2024-05-30T12:50:36.648377186Z", "caller", "scheduler_processor.go:143", "level", "warn", "msg", "error contacting scheduler", "err", "rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"", "addr", "10.0.151.101:9095"}, + }, + } + + tokenizer := newLogfmtTokenizer("") + + for _, tt := range tests { + t.Run("", func(t *testing.T) { + got := tokenizer.Join(tt.tokens, nil) + require.Equal(t, tt.want, got) + }) + } +} + +func TestJsonTokenizer(t *testing.T) { + param := DefaultConfig().ParamString + tests := []struct { + name string + line string + pattern string + want []string + }{ + { + line: `{"level":30,"time":1719998371869,"pid":17,"hostname":"otel-demo-ops-paymentservice-7c759bf575-55t4p","trace_id":"1425c6df5a4321cf6a7de254de5b8204","span_id":"2ac7a3fc800b80d4","trace_flags":"01","transactionId":"e501032b-3215-4e43-b1db-f4886a906fc5","cardType":"visa","lastFourDigits":"5647","amount":{"units":{"low":656,"high":0,"unsigned":false},"nanos":549999996,"currencyCode":"USD"},"msg":"Transaction complete."}`, + want: []string{"Transaction", "complete."}, + pattern: "<_>Transaction complete.<_>", + }, + { + line: `{"event":{"actor":{"alternateId":"foo@grafana.com","displayName":"Foo bar","id":"dq23","type":"User"},"authenticationContext":{"authenticationStep":0,"externalSessionId":"123d"},"client":{"device":"Computer","geographicalContext":{"city":"Berlin","country":"DE","state":"Land Berlin"},"ipAddress":"0.0.0.0","userAgent":{"browser":"CHROME","os":"Mac OS X","rawUserAgent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36"},"zone":"null"},"debugContext":{"debugData":{"authMethodFirstEnrollment":"123","authMethodFirstType":"foo","authMethodFirstVerificationTime":"2024-07-02T11:28:03.219Z","authMethodSecondEnrollment":"var","authMethodSecondType":"ddd","authMethodSecondVerificationTime":"2024-07-03T06:59:09.151Z","authnRequestId":"1","dtHash":"1","logOnlySecurityData":"{\"risk\":{\"level\":\"LOW\"},\"behaviors\":{\"New Geo-Location\":\"NEGATIVE\",\"New Device\":\"NEGATIVE\",\"New IP\":\"NEGATIVE\",\"New State\":\"NEGATIVE\",\"New Country\":\"NEGATIVE\",\"Velocity\":\"NEGATIVE\",\"New City\":\"NEGATIVE\"}}","requestId":"1","threatSuspected":"false","url":"/foo?"}},"displayMessage":"Evaluation of sign-on policy","eventType":"policy.evaluate_sign_on","legacyEventType":"app.oauth2.token.grant.refresh_token_success","outcome":{"reason":"Sign-on policy evaluation resulted in AUTHENTICATED","result":"ALLOW"},"published":"2024-07-03T09:19:59.973Z","request":{"ipChain":[{"geographicalContext":{"city":"Berlin","country":"Germany","geolocation":{"lat":52.5363,"lon":13.4169},"postalCode":"10435","state":"Land Berlin"},"ip":"95.90.234.241","version":"V4"}]},"securityContext":{"asNumber":3209,"asOrg":"kabel deutschland breitband customer 19","domain":"kabel-deutschland.de","isProxy":false,"isp":"vodafone gmbh"},"severity":"INFO","target":[{"alternateId":"Salesforce.com","detailEntry":{"signOnModeEvaluationResult":"AUTHENTICATED","signOnModeType":"SAML_2_0"},"displayName":"Salesforce.com","id":"0oa5sfmj3hz0mTgoW357","type":"AppInstance"},{"alternateId":"unknown","detailEntry":{"policyRuleFactorMode":"2FA"},"displayName":"Catch-all Rule","id":"1","type":"Rule"}],"transaction":{"detail":{},"id":"1","type":"WEB"},"uuid":"1","version":"0"},"level":"info","msg":"received event","time":"2024-07-03T09:19:59Z"}`, + want: []string{"received", "event"}, + pattern: "<_>received event<_>", + }, + { + line: `{"code":200,"message":"OK","data":{"id":"1","name":"foo"}}`, + want: []string{"OK"}, + pattern: "<_>OK<_>", + }, + { + line: `{"time":"2024-07-03T10:48:10.58330448Z","level":"INFO","msg":"successfully discovered 15 agent IP addresses","git_commit":"1","git_time":"2024-06-26T06:59:04Z","git_modified":true,"go_os":"linux","go_arch":"arm64","process_generation":"ea2d9b41-0314-4ddc-a415-f8af2d80a32c","hostname_fqdn":"foobar","hostname_short":foobar","private_ips":["10.0.132.23"],"num_vcpus":8,"kafka_enabled":true,"service_protocol":"VIRTUALENV_ZONE_LOCAL","module":"agent_resolver","ip_addresses":[{"Hostname":"10.0.100.152","Port":8080},{"Hostname":"10.0.41.210","Port":8080},{"Hostname":"10.0.212.83","Port":8080},{"Hostname":"10.0.145.77","Port":8080},{"Hostname":"10.0.59.71","Port":8080},{"Hostname":"10.0.224.219","Port":8080},{"Hostname":"10.0.103.29","Port":8080},{"Hostname":"10.0.86.220","Port":8080},{"Hostname":"10.0.154.82","Port":8080},{"Hostname":"10.0.9.213","Port":8080},{"Hostname":"10.0.240.157","Port":8080},{"Hostname":"10.0.166.11","Port":8080},{"Hostname":"10.0.230.22","Port":8080},{"Hostname":"10.0.123.239","Port":8080},{"Hostname":"10.0.233.210","Port":8080}]}`, + want: []string{"successfully", "discovered", "15", "agent", "IP", "addresses"}, + pattern: "<_>successfully discovered 15 agent IP addresses<_>", + }, + } + + tokenizer := newJSONTokenizer(param) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, state := tokenizer.Tokenize(tt.line, nil, nil) + require.Equal(t, tt.want, got) + pattern := tokenizer.Join(got, state) + require.Equal(t, tt.pattern, pattern) + }) + } +} diff --git a/pkg/pattern/ingester_querier.go b/pkg/pattern/ingester_querier.go index 5e323f17dc457..1e18852eddaa3 100644 --- a/pkg/pattern/ingester_querier.go +++ b/pkg/pattern/ingester_querier.go @@ -137,7 +137,7 @@ func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int, met pruneConfig.SimTh = 1.0 // Merge & de-dup patterns but don't modify them patternsBefore := len(resp.Series) - d := drain.New(pruneConfig, nil) + d := drain.New(pruneConfig, "", nil) for _, p := range resp.Series { d.TrainPattern(p.GetPattern(), p.Samples) } diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index 21520ae7d26df..a4a65f0cb7c5e 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -66,6 +66,9 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { appendErr := multierror.New() for _, reqStream := range req.Streams { + if reqStream.Entries == nil || len(reqStream.Entries) == 0 { + continue + } s, _, err := i.streams.LoadOrStoreNew(reqStream.Labels, func() (*stream, error) { // add stream @@ -211,7 +214,6 @@ func (i *instance) createStream(_ context.Context, pushReqStream logproto.Stream sortedLabels := i.index.Add(logproto.FromLabelsToLabelAdapters(labels), fp) firstEntryLine := pushReqStream.Entries[0].Line s, err := newStream(fp, sortedLabels, i.metrics, i.chunkMetrics, i.aggregationCfg, i.logger, drain.DetectLogFormat(firstEntryLine), i.instanceID) - if err != nil { return nil, fmt.Errorf("failed to create stream: %w", err) } diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index eae5a2c22eb47..578653580b183 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -60,7 +60,7 @@ func newStream( labels: labels, labelsString: labels.String(), labelHash: labels.Hash(), - patterns: drain.New(drain.DefaultConfig(), &drain.Metrics{ + patterns: drain.New(drain.DefaultConfig(), guessedFormat, &drain.Metrics{ PatternsEvictedTotal: metrics.patternsDiscardedTotal.WithLabelValues(instanceID, guessedFormat), PatternsDetectedTotal: metrics.patternsDetectedTotal.WithLabelValues(instanceID, guessedFormat), TokensPerLine: metrics.tokensPerLine.WithLabelValues(instanceID, guessedFormat),