From 7033091bc9f60505e1492779c91f0d0c0e42dfc7 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Tue, 7 Jan 2025 15:16:02 +0100 Subject: [PATCH] feat: Detect fields based on per-tenant configuration and put them into structured metadata at ingest time (#15188) This PR introduces a new feature that allows for extraction of "fields" into structured metadata at ingest time. Fields can either be regular labels, structured metadata keys, or keys from `logfmt` or `json` formatted log lines. The fields are defined in a per-tenant configuration as `map[string][]string`, where the key is the target key of the structured metadata, and the value is the list of source fields in given order and the order given above. Example configuration: ```yaml limits_config: discover_generic_fields: fields: trace_id: - "trace_id" - "TRACE_ID" - "traceID" - "TraceID" org_id: - "org_id" - "tenant_id" - "user_id" ``` While parsing of log lines comes with a certain penalty at ingest time (increased latency and CPU usage on distributors), the idea is to extract certain fields once to avoid parsing the log lines every single time at query time. This is mainly useful in combination with bloom filters. **JSONpath support** Should the value of the config map support jsonpath expression, such as ``` limits_config: discover_generic_fields: fields: ticket_id: - "message.ticket.id" ``` Where the log line looks like this: ```json {"timestamp": 1733128051000, "message": {"ticket": {"id": "2024-d95f87018cdb1f10"}}} ``` --- Signed-off-by: Christian Haudum --- docs/sources/shared/configuration.md | 6 + pkg/distributor/distributor.go | 26 ++- ...{level_detection.go => field_detection.go} | 131 ++++++++----- ...ection_test.go => field_detection_test.go} | 181 +++++++++++++++++- pkg/distributor/limits.go | 1 + pkg/distributor/validator.go | 2 + pkg/logql/log/parser.go | 12 +- pkg/validation/limits.go | 18 +- pkg/validation/limits_test.go | 23 ++- 9 files changed, 327 insertions(+), 73 deletions(-) rename pkg/distributor/{level_detection.go => field_detection.go} (58%) rename pkg/distributor/{level_detection_test.go => field_detection_test.go} (73%) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 1917c3fbda41d..91b560e106c87 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3400,6 +3400,12 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v # CLI flag: -validation.increment-duplicate-timestamps [increment_duplicate_timestamp: | default = false] +# Experimental: Detect fields from stream labels, structured metadata, or +# json/logfmt formatted log line and put them into structured metadata of the +# log entry. +discover_generic_fields: + [fields: ] + # If no service_name label exists, Loki maps a single label from the configured # list to service_name. If none of the configured labels exist in the stream, # label is set to unknown_service. Empty list disables setting the label. diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index ac0dc19476208..729ad8953d3a0 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "net/http" + "runtime/pprof" "slices" "sort" "strconv" @@ -460,8 +461,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log now := time.Now() validationContext := d.validator.getValidationContextForTime(now, tenantID) - levelDetector := newLevelDetector(validationContext) - shouldDiscoverLevels := levelDetector.shouldDiscoverLogLevels() + fieldDetector := newFieldDetector(validationContext) + shouldDiscoverLevels := fieldDetector.shouldDiscoverLogLevels() + shouldDiscoverGenericFields := fieldDetector.shouldDiscoverGenericFields() shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID) maybeShardByRate := func(stream logproto.Stream, pushSize int) { @@ -547,10 +549,22 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } } if shouldDiscoverLevels { - logLevel, ok := levelDetector.extractLogLevel(lbs, structuredMetadata, entry) - if ok { - entry.StructuredMetadata = append(entry.StructuredMetadata, logLevel) - } + pprof.Do(ctx, pprof.Labels("action", "discover_log_level"), func(_ context.Context) { + logLevel, ok := fieldDetector.extractLogLevel(lbs, structuredMetadata, entry) + if ok { + entry.StructuredMetadata = append(entry.StructuredMetadata, logLevel) + } + }) + } + if shouldDiscoverGenericFields { + pprof.Do(ctx, pprof.Labels("action", "discover_generic_fields"), func(_ context.Context) { + for field, hints := range fieldDetector.validationContext.discoverGenericFields { + extracted, ok := fieldDetector.extractGenericField(field, hints, lbs, structuredMetadata, entry) + if ok { + entry.StructuredMetadata = append(entry.StructuredMetadata, extracted) + } + } + }) } stream.Entries[n] = entry diff --git a/pkg/distributor/level_detection.go b/pkg/distributor/field_detection.go similarity index 58% rename from pkg/distributor/level_detection.go rename to pkg/distributor/field_detection.go index 0a80e67e38b14..0735a9695ec76 100644 --- a/pkg/distributor/level_detection.go +++ b/pkg/distributor/field_detection.go @@ -13,6 +13,8 @@ import ( "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" + "github.com/grafana/loki/v3/pkg/logql/log/jsonexpr" "github.com/grafana/loki/v3/pkg/logql/log/logfmt" "github.com/grafana/loki/v3/pkg/util/constants" ) @@ -31,46 +33,43 @@ var ( errorAbbrv = []byte("err") critical = []byte("critical") fatal = []byte("fatal") + + defaultAllowedLevelFields = []string{"level", "LEVEL", "Level", "severity", "SEVERITY", "Severity", "lvl", "LVL", "Lvl"} ) -func allowedLabelsForLevel(allowedFields []string) map[string]struct{} { +func allowedLabelsForLevel(allowedFields []string) []string { if len(allowedFields) == 0 { - return map[string]struct{}{ - "level": {}, "LEVEL": {}, "Level": {}, - "severity": {}, "SEVERITY": {}, "Severity": {}, - "lvl": {}, "LVL": {}, "Lvl": {}, - } - } - allowedFieldsMap := make(map[string]struct{}, len(allowedFields)) - for _, field := range allowedFields { - allowedFieldsMap[field] = struct{}{} + return defaultAllowedLevelFields } - return allowedFieldsMap + return allowedFields } -type LevelDetector struct { - validationContext validationContext - allowedLabels map[string]struct{} +type FieldDetector struct { + validationContext validationContext + allowedLevelLabels []string } -func newLevelDetector(validationContext validationContext) *LevelDetector { - logLevelFields := validationContext.logLevelFields - return &LevelDetector{ - validationContext: validationContext, - allowedLabels: allowedLabelsForLevel(logLevelFields), +func newFieldDetector(validationContext validationContext) *FieldDetector { + return &FieldDetector{ + validationContext: validationContext, + allowedLevelLabels: allowedLabelsForLevel(validationContext.logLevelFields), } } -func (l *LevelDetector) shouldDiscoverLogLevels() bool { +func (l *FieldDetector) shouldDiscoverLogLevels() bool { return l.validationContext.allowStructuredMetadata && l.validationContext.discoverLogLevels } -func (l *LevelDetector) extractLogLevel(labels labels.Labels, structuredMetadata labels.Labels, entry logproto.Entry) (logproto.LabelAdapter, bool) { - levelFromLabel, hasLevelLabel := l.hasAnyLevelLabels(labels) +func (l *FieldDetector) shouldDiscoverGenericFields() bool { + return l.validationContext.allowStructuredMetadata && len(l.validationContext.discoverGenericFields) > 0 +} + +func (l *FieldDetector) extractLogLevel(labels labels.Labels, structuredMetadata labels.Labels, entry logproto.Entry) (logproto.LabelAdapter, bool) { + levelFromLabel, hasLevelLabel := labelsContainAny(labels, l.allowedLevelLabels) var logLevel string if hasLevelLabel { logLevel = levelFromLabel - } else if levelFromMetadata, ok := l.hasAnyLevelLabels(structuredMetadata); ok { + } else if levelFromMetadata, ok := labelsContainAny(structuredMetadata, l.allowedLevelLabels); ok { logLevel = levelFromMetadata } else { logLevel = l.detectLogLevelFromLogEntry(entry, structuredMetadata) @@ -85,16 +84,33 @@ func (l *LevelDetector) extractLogLevel(labels labels.Labels, structuredMetadata }, true } -func (l *LevelDetector) hasAnyLevelLabels(labels labels.Labels) (string, bool) { - for lbl := range l.allowedLabels { - if labels.Has(lbl) { - return labels.Get(lbl), true +func (l *FieldDetector) extractGenericField(name string, hints []string, labels labels.Labels, structuredMetadata labels.Labels, entry logproto.Entry) (logproto.LabelAdapter, bool) { + + var value string + if v, ok := labelsContainAny(labels, hints); ok { + value = v + } else if v, ok := labelsContainAny(structuredMetadata, hints); ok { + value = v + } else { + value = l.detectGenericFieldFromLogEntry(entry, hints) + } + + if value == "" { + return logproto.LabelAdapter{}, false + } + return logproto.LabelAdapter{Name: name, Value: value}, true +} + +func labelsContainAny(labels labels.Labels, names []string) (string, bool) { + for _, name := range names { + if labels.Has(name) { + return labels.Get(name), true } } return "", false } -func (l *LevelDetector) detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.Labels) string { +func (l *FieldDetector) detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.Labels) string { // otlp logs have a severity number, using which we are defining the log levels. // Significance of severity number is explained in otel docs here https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber if otlpSeverityNumberTxt := structuredMetadata.Get(push.OTLPSeverityNumber); otlpSeverityNumberTxt != "" { @@ -123,13 +139,24 @@ func (l *LevelDetector) detectLogLevelFromLogEntry(entry logproto.Entry, structu return l.extractLogLevelFromLogLine(entry.Line) } -func (l *LevelDetector) extractLogLevelFromLogLine(log string) string { - logSlice := unsafe.Slice(unsafe.StringData(log), len(log)) +func (l *FieldDetector) detectGenericFieldFromLogEntry(entry logproto.Entry, hints []string) string { + lineBytes := unsafe.Slice(unsafe.StringData(entry.Line), len(entry.Line)) + var v []byte + if isJSON(entry.Line) { + v = getValueUsingJSONParser(lineBytes, hints) + } else if isLogFmt(lineBytes) { + v = getValueUsingLogfmtParser(lineBytes, hints) + } + return string(v) +} + +func (l *FieldDetector) extractLogLevelFromLogLine(log string) string { + lineBytes := unsafe.Slice(unsafe.StringData(log), len(log)) var v []byte if isJSON(log) { - v = l.getValueUsingJSONParser(logSlice) - } else if isLogFmt(logSlice) { - v = l.getValueUsingLogfmtParser(logSlice) + v = getValueUsingJSONParser(lineBytes, l.allowedLevelLabels) + } else if isLogFmt(lineBytes) { + v = getValueUsingLogfmtParser(lineBytes, l.allowedLevelLabels) } else { return detectLevelFromLogLine(log) } @@ -154,24 +181,42 @@ func (l *LevelDetector) extractLogLevelFromLogLine(log string) string { } } -func (l *LevelDetector) getValueUsingLogfmtParser(line []byte) []byte { +func getValueUsingLogfmtParser(line []byte, hints []string) []byte { d := logfmt.NewDecoder(line) + // In order to have the same behaviour as the JSON field extraction, + // the full line needs to be parsed to extract all possible matching fields. + pos := len(hints) // the index of the hint that matches + var res []byte for !d.EOL() && d.ScanKeyval() { - if _, ok := l.allowedLabels[string(d.Key())]; ok { - return d.Value() + k := unsafe.String(unsafe.SliceData(d.Key()), len(d.Key())) + for x, hint := range hints { + if strings.EqualFold(k, hint) && x < pos { + res, pos = d.Value(), x + // If there is only a single hint, or the matching hint is the first one, + // we can stop parsing the rest of the line and return early. + if x == 0 { + return res + } + } } } - return nil + return res } -func (l *LevelDetector) getValueUsingJSONParser(log []byte) []byte { - for allowedLabel := range l.allowedLabels { - l, _, _, err := jsonparser.Get(log, allowedLabel) - if err == nil { - return l +func getValueUsingJSONParser(line []byte, hints []string) []byte { + var res []byte + for _, allowedLabel := range hints { + parsed, err := jsonexpr.Parse(allowedLabel, false) + if err != nil { + continue + } + l, _, _, err := jsonparser.Get(line, log.JSONPathToStrings(parsed)...) + if err != nil { + continue } + return l } - return nil + return res } func isLogFmt(line []byte) bool { diff --git a/pkg/distributor/level_detection_test.go b/pkg/distributor/field_detection_test.go similarity index 73% rename from pkg/distributor/level_detection_test.go rename to pkg/distributor/field_detection_test.go index ec674e8bf871b..b053ab0a76071 100644 --- a/pkg/distributor/level_detection_test.go +++ b/pkg/distributor/field_detection_test.go @@ -6,6 +6,7 @@ import ( "github.com/grafana/dskit/flagext" ring_client "github.com/grafana/dskit/ring/client" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/plog" @@ -120,7 +121,7 @@ func Test_DetectLogLevels(t *testing.T) { } func Test_detectLogLevelFromLogEntry(t *testing.T) { - ld := newLevelDetector( + ld := newFieldDetector( validationContext{ discoverLogLevels: true, allowStructuredMetadata: true, @@ -285,7 +286,7 @@ func Test_detectLogLevelFromLogEntry(t *testing.T) { } func Test_detectLogLevelFromLogEntryWithCustomLabels(t *testing.T) { - ld := newLevelDetector( + ld := newFieldDetector( validationContext{ discoverLogLevels: true, allowStructuredMetadata: true, @@ -391,7 +392,7 @@ func Benchmark_extractLogLevelFromLogLine(b *testing.B) { "Wm3 S7if5qCXPzvuMZ2 gNHdst Z39s9uNc58QBDeYRW umyIF BDqEdqhE tAs2gidkqee3aux8b NLDb7 ZZLekc0cQZ GUKQuBg2pL2y1S " + "RJtBuW ABOqQHLSlNuUw ZlM2nGS2 jwA7cXEOJhY 3oPv4gGAz Uqdre16MF92C06jOH dayqTCK8XmIilT uvgywFSfNadYvRDQa " + "iUbswJNcwqcr6huw LAGrZS8NGlqqzcD2wFU rm Uqcrh3TKLUCkfkwLm 5CIQbxMCUz boBrEHxvCBrUo YJoF2iyif4xq3q yk " - ld := &LevelDetector{ + ld := &FieldDetector{ validationContext: validationContext{ discoverLogLevels: true, allowStructuredMetadata: true, @@ -407,7 +408,7 @@ func Benchmark_extractLogLevelFromLogLine(b *testing.B) { func Benchmark_optParseExtractLogLevelFromLogLineJson(b *testing.B) { logLine := `{"msg": "something" , "level": "error", "id": "1"}` - ld := newLevelDetector( + ld := newFieldDetector( validationContext{ discoverLogLevels: true, allowStructuredMetadata: true, @@ -422,7 +423,7 @@ func Benchmark_optParseExtractLogLevelFromLogLineJson(b *testing.B) { func Benchmark_optParseExtractLogLevelFromLogLineLogfmt(b *testing.B) { logLine := `FOO=bar MSG="message with keyword error but it should not get picked up" LEVEL=inFO` - ld := newLevelDetector( + ld := newFieldDetector( validationContext{ discoverLogLevels: true, allowStructuredMetadata: true, @@ -434,3 +435,173 @@ func Benchmark_optParseExtractLogLevelFromLogLineLogfmt(b *testing.B) { require.Equal(b, constants.LogLevelInfo, level) } } + +func Test_DetectGenericFields_Enabled(t *testing.T) { + t.Run("disabled if map is empty", func(t *testing.T) { + detector := newFieldDetector( + validationContext{ + discoverGenericFields: make(map[string][]string, 0), + allowStructuredMetadata: true, + }) + require.False(t, detector.shouldDiscoverGenericFields()) + }) + t.Run("disabled if structured metadata is not allowed", func(t *testing.T) { + detector := newFieldDetector( + validationContext{ + discoverGenericFields: map[string][]string{"trace_id": {"trace_id", "TRACE_ID"}}, + allowStructuredMetadata: false, + }) + require.False(t, detector.shouldDiscoverGenericFields()) + }) + t.Run("enabled if structured metadata is allowed and map is not empty", func(t *testing.T) { + detector := newFieldDetector( + validationContext{ + discoverGenericFields: map[string][]string{"trace_id": {"trace_id", "TRACE_ID"}}, + allowStructuredMetadata: true, + }) + require.True(t, detector.shouldDiscoverGenericFields()) + }) +} + +func Test_DetectGenericFields(t *testing.T) { + + detector := newFieldDetector( + validationContext{ + discoverGenericFields: map[string][]string{ + "trace_id": {"trace_id"}, + "org_id": {"org_id", "user_id", "tenant_id"}, + "product_id": {"product.id"}, // jsonpath + }, + allowStructuredMetadata: true, + }) + + for _, tc := range []struct { + name string + labels labels.Labels + entry logproto.Entry + expected push.LabelsAdapter + }{ + { + name: "no match", + labels: labels.Labels{ + {Name: "env", Value: "prod"}, + }, + entry: push.Entry{ + Line: "log line does not match", + StructuredMetadata: push.LabelsAdapter{}, + }, + expected: push.LabelsAdapter{}, + }, + { + name: "stream label matches", + labels: labels.Labels{ + {Name: "trace_id", Value: "8c5f2ecbade6f01d"}, + {Name: "tenant_id", Value: "fake"}, + }, + entry: push.Entry{ + Line: "log line does not match", + StructuredMetadata: push.LabelsAdapter{}, + }, + expected: push.LabelsAdapter{ + {Name: "trace_id", Value: "8c5f2ecbade6f01d"}, + {Name: "org_id", Value: "fake"}, + }, + }, + { + name: "metadata matches", + labels: labels.Labels{ + {Name: "env", Value: "prod"}, + }, + entry: push.Entry{ + Line: "log line does not match", + StructuredMetadata: push.LabelsAdapter{ + {Name: "trace_id", Value: "8c5f2ecbade6f01d"}, + {Name: "user_id", Value: "fake"}, + }, + }, + expected: push.LabelsAdapter{ + {Name: "trace_id", Value: "8c5f2ecbade6f01d"}, + {Name: "org_id", Value: "fake"}, + }, + }, + { + name: "logline (logfmt) matches", + labels: labels.Labels{ + {Name: "env", Value: "prod"}, + }, + entry: push.Entry{ + Line: `msg="this log line matches" trace_id="8c5f2ecbade6f01d" org_id=fake duration=1h`, + StructuredMetadata: push.LabelsAdapter{}, + }, + expected: push.LabelsAdapter{ + {Name: "trace_id", Value: "8c5f2ecbade6f01d"}, + {Name: "org_id", Value: "fake"}, + }, + }, + { + name: "logline (logfmt) matches multiple fields", + labels: labels.Labels{ + {Name: "env", Value: "prod"}, + }, + entry: push.Entry{ + Line: `msg="this log line matches" tenant_id="fake_a" org_id=fake_b duration=1h`, + StructuredMetadata: push.LabelsAdapter{}, + }, + expected: push.LabelsAdapter{ + {Name: "org_id", Value: "fake_b"}, // first field from configuration that matches takes precedence + }, + }, + { + name: "logline (json) matches", + labels: labels.Labels{ + {Name: "env", Value: "prod"}, + }, + entry: push.Entry{ + Line: `{"msg": "this log line matches", "trace_id": "8c5f2ecbade6f01d", "org_id": "fake", "duration": "1s"}`, + StructuredMetadata: push.LabelsAdapter{}, + }, + expected: push.LabelsAdapter{ + {Name: "trace_id", Value: "8c5f2ecbade6f01d"}, + {Name: "org_id", Value: "fake"}, + }, + }, + { + name: "logline (json) matches multiple fields", + labels: labels.Labels{ + {Name: "env", Value: "prod"}, + }, + entry: push.Entry{ + Line: `{"msg": "this log line matches", "tenant_id": "fake_a", "org_id": "fake_b", "duration": "1s"}`, + StructuredMetadata: push.LabelsAdapter{}, + }, + expected: push.LabelsAdapter{ + {Name: "org_id", Value: "fake_b"}, // first field from configuration that matches takes precedence + }, + }, + { + name: "logline matches jsonpath", + labels: labels.Labels{ + {Name: "env", Value: "prod"}, + }, + entry: push.Entry{ + Line: `{"product": {"details": "product details", "id": "P2024/01"}}`, + StructuredMetadata: push.LabelsAdapter{}, + }, + expected: push.LabelsAdapter{ + {Name: "product_id", Value: "P2024/01"}, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + extracted := push.LabelsAdapter{} + metadata := logproto.FromLabelAdaptersToLabels(tc.entry.StructuredMetadata) + for name, hints := range detector.validationContext.discoverGenericFields { + field, ok := detector.extractGenericField(name, hints, tc.labels, metadata, tc.entry) + if ok { + extracted = append(extracted, field) + } + } + require.ElementsMatch(t, tc.expected, extracted) + }) + } +} diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index 22ae540beaf7e..23c99367d1afa 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -23,6 +23,7 @@ type Limits interface { IncrementDuplicateTimestamps(userID string) bool DiscoverServiceName(userID string) []string + DiscoverGenericFields(userID string) map[string][]string DiscoverLogLevels(userID string) bool LogLevelFields(userID string) []string diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index 67c16f14b0420..878255d661669 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -45,6 +45,7 @@ type validationContext struct { incrementDuplicateTimestamps bool discoverServiceName []string + discoverGenericFields map[string][]string discoverLogLevels bool logLevelFields []string @@ -73,6 +74,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val discoverServiceName: v.DiscoverServiceName(userID), discoverLogLevels: v.DiscoverLogLevels(userID), logLevelFields: v.LogLevelFields(userID), + discoverGenericFields: v.DiscoverGenericFields(userID), allowStructuredMetadata: v.AllowStructuredMetadata(userID), maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID), maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID), diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index 50d973eb8b7db..66f7e68bdae6a 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -571,7 +571,7 @@ func NewJSONExpressionParser(expressions []LabelExtractionExpr) (*JSONExpression } ids = append(ids, exp.Identifier) - paths = append(paths, pathsToString(path)) + paths = append(paths, JSONPathToStrings(path)) } return &JSONExpressionParser{ @@ -581,17 +581,17 @@ func NewJSONExpressionParser(expressions []LabelExtractionExpr) (*JSONExpression }, nil } -func pathsToString(paths []interface{}) []string { - stingPaths := make([]string, 0, len(paths)) +func JSONPathToStrings(paths []interface{}) []string { + stringPaths := make([]string, 0, len(paths)) for _, p := range paths { switch v := p.(type) { case int: - stingPaths = append(stingPaths, fmt.Sprintf("[%d]", v)) + stringPaths = append(stringPaths, fmt.Sprintf("[%d]", v)) case string: - stingPaths = append(stingPaths, v) + stringPaths = append(stringPaths, v) } } - return stingPaths + return stringPaths } func (j *JSONExpressionParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte, bool) { diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 23c3b4939f84d..ad84bbc0af72e 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -84,9 +84,12 @@ type Limits struct { MaxLineSize flagext.ByteSize `yaml:"max_line_size" json:"max_line_size"` MaxLineSizeTruncate bool `yaml:"max_line_size_truncate" json:"max_line_size_truncate"` IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"` - DiscoverServiceName []string `yaml:"discover_service_name" json:"discover_service_name"` - DiscoverLogLevels bool `yaml:"discover_log_levels" json:"discover_log_levels"` - LogLevelFields []string `yaml:"log_level_fields" json:"log_level_fields"` + + // Metadata field extraction + DiscoverGenericFields FieldDetectorConfig `yaml:"discover_generic_fields" json:"discover_generic_fields" doc:"description=Experimental: Detect fields from stream labels, structured metadata, or json/logfmt formatted log line and put them into structured metadata of the log entry."` + DiscoverServiceName []string `yaml:"discover_service_name" json:"discover_service_name"` + DiscoverLogLevels bool `yaml:"discover_log_levels" json:"discover_log_levels"` + LogLevelFields []string `yaml:"log_level_fields" json:"log_level_fields"` // Ingester enforced limits. UseOwnedStreamCount bool `yaml:"use_owned_stream_count" json:"use_owned_stream_count"` @@ -244,6 +247,10 @@ type Limits struct { S3SSEKMSEncryptionContext string `yaml:"s3_sse_kms_encryption_context" json:"s3_sse_kms_encryption_context" doc:"nocli|description=S3 server-side encryption KMS encryption context. If unset and the key ID override is set, the encryption context will not be provided to S3. Ignored if the SSE type override is not set."` } +type FieldDetectorConfig struct { + Fields map[string][]string `yaml:"fields,omitempty" json:"fields,omitempty"` +} + type StreamRetention struct { Period model.Duration `yaml:"period" json:"period" doc:"description:Retention period applied to the log lines matching the selector."` Priority int `yaml:"priority" json:"priority" doc:"description:The larger the value, the higher the priority."` @@ -428,7 +435,6 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { ) l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f) - f.IntVar(&l.VolumeMaxSeries, "limits.volume-max-series", 1000, "The default number of aggregated series or labels that can be returned from a log-volume endpoint") f.BoolVar(&l.AllowStructuredMetadata, "validation.allow-structured-metadata", true, "Allow user to send structured metadata (non-indexed labels) in push payload.") @@ -996,6 +1002,10 @@ func (o *Overrides) IncrementDuplicateTimestamps(userID string) bool { return o.getOverridesForUser(userID).IncrementDuplicateTimestamp } +func (o *Overrides) DiscoverGenericFields(userID string) map[string][]string { + return o.getOverridesForUser(userID).DiscoverGenericFields.Fields +} + func (o *Overrides) DiscoverServiceName(userID string) []string { return o.getOverridesForUser(userID).DiscoverServiceName } diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index ce412305ceb79..bb955d8a87679 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -214,6 +214,7 @@ ruler_remote_write_headers: foo: "bar" `, exp: Limits{ + DiscoverGenericFields: FieldDetectorConfig{}, RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"foo": "bar"}}, DiscoverServiceName: []string{}, LogLevelFields: []string{}, @@ -234,8 +235,9 @@ ruler_remote_write_headers: ruler_remote_write_headers: `, exp: Limits{ - DiscoverServiceName: []string{}, - LogLevelFields: []string{}, + DiscoverGenericFields: FieldDetectorConfig{}, + DiscoverServiceName: []string{}, + LogLevelFields: []string{}, // Rest from new defaults StreamRetention: []StreamRetention{ { @@ -254,8 +256,9 @@ retention_stream: selector: '{foo="bar"}' `, exp: Limits{ - DiscoverServiceName: []string{}, - LogLevelFields: []string{}, + DiscoverGenericFields: FieldDetectorConfig{}, + DiscoverServiceName: []string{}, + LogLevelFields: []string{}, StreamRetention: []StreamRetention{ { Period: model.Duration(24 * time.Hour), @@ -274,9 +277,10 @@ retention_stream: reject_old_samples: true `, exp: Limits{ - RejectOldSamples: true, - DiscoverServiceName: []string{}, - LogLevelFields: []string{}, + RejectOldSamples: true, + DiscoverGenericFields: FieldDetectorConfig{}, + DiscoverServiceName: []string{}, + LogLevelFields: []string{}, // Rest from new defaults RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}}, @@ -295,8 +299,9 @@ reject_old_samples: true query_timeout: 5m `, exp: Limits{ - DiscoverServiceName: []string{}, - LogLevelFields: []string{}, + DiscoverGenericFields: FieldDetectorConfig{}, + DiscoverServiceName: []string{}, + LogLevelFields: []string{}, QueryTimeout: model.Duration(5 * time.Minute),