Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Drain uses different tokenizer based on log format #13384

Merged
merged 13 commits into from
Jul 4, 2024
Next Next commit
feat: Drain uses different tokenizer based on log format
  • Loading branch information
cyriltovena committed Jul 2, 2024
commit 2efa5712d52169d17c54e064f83f1174ed310b0f
11 changes: 8 additions & 3 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func DefaultConfig() *Config {
}
}

func New(config *Config, metrics *Metrics) *Drain {
func New(config *Config, format string, metrics *Metrics) *Drain {
if config.LogClusterDepth < 3 {
panic("depth argument must be at least 3")
}
Expand All @@ -156,13 +156,18 @@ func New(config *Config, metrics *Metrics) *Drain {
if metrics != nil {
evictFn = func(int, *LogCluster) { metrics.PatternsEvictedTotal.Inc() }
}

var tokenizer LineTokenizer
if format == FormatLogfmt {
tokenizer = newLogfmtTokenizer(config.ParamString)
} else {
tokenizer = newPunctuationTokenizer()
}
d := &Drain{
config: config,
rootNode: createNode(),
idToCluster: createLogClusterCache(config.MaxClusters, evictFn),
metrics: metrics,
tokenizer: newPunctuationTokenizer(),
tokenizer: tokenizer,
maxAllowedLineLength: 3000,
}
return d
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/drain/drain_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func BenchmarkDrain_TrainExtractsPatterns(b *testing.B) {
line := scanner.Text()
lines = append(lines, line)
}
drain := New(DefaultConfig(), nil)
drain := New(DefaultConfig(), DetectLogFormat(lines[0]), nil)

b.ReportAllocs()
b.ResetTimer()
Expand Down
36 changes: 18 additions & 18 deletions pkg/pattern/drain/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
format string
}{
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: `testdata/agent-logfmt.txt`,
format: FormatLogfmt,
patterns: []string{
Expand Down Expand Up @@ -62,7 +62,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: `testdata/ingester-logfmt.txt`,
format: FormatLogfmt,
patterns: []string{
Expand All @@ -72,7 +72,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: `testdata/drone-json.txt`,
format: FormatJSON,
patterns: []string{
Expand All @@ -85,7 +85,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: "testdata/distributor-logfmt.txt",
format: FormatLogfmt,
patterns: []string{
Expand All @@ -97,7 +97,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: "testdata/journald.txt",
format: FormatUnknown,
patterns: []string{
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: "testdata/kafka.txt",
format: FormatUnknown,
patterns: []string{
Expand All @@ -225,7 +225,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: "testdata/kubernetes.txt",
format: FormatUnknown,
patterns: []string{
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: "testdata/vault.txt",
format: FormatUnknown,
patterns: []string{
Expand All @@ -284,7 +284,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: "testdata/calico.txt",
format: FormatUnknown,
patterns: []string{
Expand Down Expand Up @@ -366,7 +366,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
},
},
{
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputFile: "testdata/grafana-ruler.txt",
format: FormatLogfmt,
patterns: []string{
Expand Down Expand Up @@ -462,7 +462,7 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) {
}{
{
name: "should match each line against a pattern",
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputLines: []string{
"test test test test",
"test test test test",
Expand All @@ -472,7 +472,7 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) {
},
{
name: "should also match newlines",
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputLines: []string{
`test test test test
`,
Expand Down Expand Up @@ -509,7 +509,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
}{
{
name: "should extract patterns that all lines match",
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputLines: []string{
"test 1 test test",
"test 2 test test",
Expand All @@ -519,7 +519,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "should extract patterns that match if line ends with newlines",
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputLines: []string{
`test 1 test test
`,
Expand All @@ -533,7 +533,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "should extract patterns that match if line ends with empty space",
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputLines: []string{
`test 1 test test `,
`test 2 test test `,
Expand All @@ -543,7 +543,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "should extract patterns that match if line starts with empty space",
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputLines: []string{
` test 1 test test`,
` test 2 test test`,
Expand All @@ -553,7 +553,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
},
{
name: "Scheduler patterns are matchable",
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputLines: []string{
`ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`,
`ts=2024-05-30T12:50:36.350575929Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`,
Expand Down Expand Up @@ -651,7 +651,7 @@ func TestDrain_PruneTreeClearsOldBranches(t *testing.T) {
}{
{
name: "should prune old branches",
drain: New(DefaultConfig(), nil),
drain: New(DefaultConfig(), "", nil),
inputLines: []string{
"test test test A",
"test test test B",
Expand Down
56 changes: 56 additions & 0 deletions pkg/pattern/drain/line_tokenizer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package drain

import (
"bytes"
"strings"
"unicode"

gologfmt "github.com/go-logfmt/logfmt"
"github.com/grafana/loki/v3/pkg/logql/log/logfmt"
)

type LineTokenizer interface {
Expand Down Expand Up @@ -131,3 +135,55 @@
}
return strBuilder.String()
}

type logfmtTokenizer struct {
dec *logfmt.Decoder
varReplace string
}

func newLogfmtTokenizer(varReplace string) *logfmtTokenizer {
return &logfmtTokenizer{
dec: logfmt.NewDecoder(nil),
varReplace: varReplace,
}
}

func (t *logfmtTokenizer) Tokenize(line string) ([]string, interface{}) {
var tokens []string
t.dec.Reset([]byte(line))
for !t.dec.EOL() && t.dec.ScanKeyval() {
key := t.dec.Key()
if isTimeStampField(key) {
tokens = append(tokens, string(t.dec.Key()), t.varReplace)

continue
}
tokens = append(tokens, string(t.dec.Key()), string(t.dec.Value()))
}
if t.dec.Err() != nil {
return nil, nil
}
return tokens, nil
}

func (t *logfmtTokenizer) Join(tokens []string, state interface{}) string {

Check warning on line 169 in pkg/pattern/drain/line_tokenizer.go

View workflow job for this annotation

GitHub Actions / check / golangciLint

unused-parameter: parameter 'state' seems to be unused, consider removing or renaming it as _ (revive)
if len(tokens) == 0 {
return ""
}
if len(tokens)%2 == 1 {
tokens = append(tokens, "")
}
buf := bytes.NewBuffer(make([]byte, 0, 1024))
enc := gologfmt.NewEncoder(buf)
for i := 0; i < len(tokens); i += 2 {
k, v := tokens[i], tokens[i+1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this handle multi-word values? Are they a single token?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct they are.

if err := enc.EncodeKeyval(k, v); err != nil {
return ""
}
}
return buf.String()
}

func isTimeStampField(key []byte) bool {
return bytes.EqualFold(key, []byte("ts")) || bytes.EqualFold(key, []byte("time")) || bytes.EqualFold(key, []byte("timestamp"))
}
94 changes: 92 additions & 2 deletions pkg/pattern/drain/line_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ type TestCase struct {
want map[string][]string
}

const typePunctuation = "punctuation"
const typeSplitting = "splitting"
const (
typePunctuation = "punctuation"
typeSplitting = "splitting"
)

var testCases = []TestCase{
{
Expand Down Expand Up @@ -162,3 +164,91 @@ func BenchmarkSplittingTokenizer(b *testing.B) {
})
}
}

func TestLogFmtTokenizer(t *testing.T) {
param := DefaultConfig().ParamString
tests := []struct {
name string
line string
want []string
}{
{
line: `foo=bar baz="this is a message"`,
want: []string{"foo", "bar", "baz", "this is a message"},
},
{
line: `foo baz="this is a message"`,
want: []string{"foo", "", "baz", "this is a message"},
},
{
line: `foo= baz="this is a message"`,
want: []string{"foo", "", "baz", "this is a message"},
},
{
line: `foo baz`,
want: []string{"foo", "", "baz", ""},
},
{
line: `ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`,
want: []string{"ts", param, "caller", "scheduler_processor.go:143", "level", "warn", "msg", "error contacting scheduler", "err", "rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"", "addr", "10.0.151.101:9095"},
},
}

tokenizer := newLogfmtTokenizer(param)

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := tokenizer.Tokenize(tt.line)
require.Equal(t, tt.want, got)
})
}
}

func TestLogFmtTokenizerJoin(t *testing.T) {
tests := []struct {
tokens []string
want string
}{
{
want: ``,
tokens: []string{},
},
{
want: `foo=bar baz="this is a message"`,
tokens: []string{"foo", "bar", "baz", "this is a message"},
},
{
want: `foo= baz="this is a message"`,
tokens: []string{"foo", "", "baz", "this is a message"},
},
{
want: `foo= baz="this is a message"`,
tokens: []string{"foo", "", "baz", "this is a message"},
},
{
want: `foo= baz=`,
tokens: []string{"foo", "", "baz", ""},
},
{
want: `foo=`,
tokens: []string{"foo"},
},
{
want: `foo= bar=`,
tokens: []string{"foo", "", "bar"},
},
{
want: `ts=2024-05-30T12:50:36.648377186Z caller=scheduler_processor.go:143 level=warn msg="error contacting scheduler" err="rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"" addr=10.0.151.101:9095`,
tokens: []string{"ts", "2024-05-30T12:50:36.648377186Z", "caller", "scheduler_processor.go:143", "level", "warn", "msg", "error contacting scheduler", "err", "rpc error: code = Unavailable desc = connection error: desc = \"error reading server preface: EOF\"", "addr", "10.0.151.101:9095"},
},
}

tokenizer := newLogfmtTokenizer("")

for _, tt := range tests {
t.Run("", func(t *testing.T) {
got := tokenizer.Join(tt.tokens, nil)
require.Equal(t, tt.want, got)
})
}
}
2 changes: 1 addition & 1 deletion pkg/pattern/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int, met
pruneConfig.SimTh = 1.0 // Merge & de-dup patterns but don't modify them

patternsBefore := len(resp.Series)
d := drain.New(pruneConfig, nil)
d := drain.New(pruneConfig, "", nil)
for _, p := range resp.Series {
d.TrainPattern(p.GetPattern(), p.Samples)
}
Expand Down
Loading
Loading