Skip to content

Commit 1b32fc1

Browse files
svennergrshantanualsi
authored andcommitted
feat(detectedFields): add parser to response (#12872)
1 parent 27c9842 commit 1b32fc1

File tree

6 files changed

+251
-178
lines changed

6 files changed

+251
-178
lines changed

pkg/loghttp/detected.go

+1
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ type DetectedField struct {
1111
Label string `json:"label,omitempty"`
1212
Type logproto.DetectedFieldType `json:"type,omitempty"`
1313
Cardinality uint64 `json:"cardinality,omitempty"`
14+
Parser string `json:"parser,omitempty"`
1415
}

pkg/logproto/logproto.pb.go

+226-170
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/logproto/logproto.proto

+2-1
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,8 @@ message DetectedField {
471471
string label = 1;
472472
string type = 2 [(gogoproto.casttype) = "DetectedFieldType"];
473473
uint64 cardinality = 3;
474-
bytes sketch = 4 [(gogoproto.jsontag) = "sketch,omitempty"];
474+
string parser = 4;
475+
bytes sketch = 5 [(gogoproto.jsontag) = "sketch,omitempty"];
475476
}
476477

477478
message DetectedLabelsRequest {

pkg/querier/querier.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -1108,6 +1108,7 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
11081108
Type: v.fieldType,
11091109
Cardinality: v.Estimate(),
11101110
Sketch: sketch,
1111+
Parser: v.parser,
11111112
}
11121113

11131114
fieldCount++
@@ -1124,13 +1125,19 @@ type parsedFields struct {
11241125
sketch *hyperloglog.Sketch
11251126
isTypeDetected bool
11261127
fieldType logproto.DetectedFieldType
1128+
parser string
11271129
}
11281130

1129-
func newParsedFields() *parsedFields {
1131+
func newParsedFields(parser *string) *parsedFields {
1132+
p := ""
1133+
if parser != nil {
1134+
p = *parser
1135+
}
11301136
return &parsedFields{
11311137
sketch: hyperloglog.New(),
11321138
isTypeDetected: false,
11331139
fieldType: logproto.DetectedFieldString,
1140+
parser: p,
11341141
}
11351142
}
11361143

@@ -1185,11 +1192,12 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
11851192
"msg", fmt.Sprintf("looking for detected fields in stream %d with %d lines", stream.Hash, len(stream.Entries)))
11861193

11871194
for _, entry := range stream.Entries {
1188-
detected := parseLine(entry.Line)
1195+
detected, parser := parseLine(entry.Line)
11891196
for k, vals := range detected {
11901197
df, ok := detectedFields[k]
11911198
if !ok && fieldCount < limit {
1192-
df = newParsedFields()
1199+
1200+
df = newParsedFields(parser)
11931201
detectedFields[k] = df
11941202
fieldCount++
11951203
}
@@ -1217,17 +1225,19 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
12171225
return detectedFields
12181226
}
12191227

1220-
func parseLine(line string) map[string][]string {
1228+
func parseLine(line string) (map[string][]string, *string) {
1229+
parser := "logfmt"
12211230
logFmtParser := logql_log.NewLogfmtParser(true, false)
1222-
jsonParser := logql_log.NewJSONParser()
12231231

12241232
lbls := logql_log.NewBaseLabelsBuilder().ForLabels(labels.EmptyLabels(), 0)
12251233
_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls)
12261234
if !logfmtSuccess || lbls.HasErr() {
1235+
parser = "json"
1236+
jsonParser := logql_log.NewJSONParser()
12271237
lbls.Reset()
12281238
_, jsonSuccess := jsonParser.Process(0, []byte(line), lbls)
12291239
if !jsonSuccess || lbls.HasErr() {
1230-
return map[string][]string{}
1240+
return map[string][]string{}, nil
12311241
}
12321242
}
12331243

@@ -1249,7 +1259,7 @@ func parseLine(line string) map[string][]string {
12491259
result[lbl] = vals
12501260
}
12511261

1252-
return result
1262+
return result, &parser
12531263
}
12541264

12551265
// streamsForFieldDetection reads the streams from the iterator and returns them sorted.

pkg/storage/detected/fields.go

+3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
type UnmarshaledDetectedField struct {
1010
Label string
1111
Type logproto.DetectedFieldType
12+
Parser string
1213
Sketch *hyperloglog.Sketch
1314
}
1415

@@ -22,6 +23,7 @@ func UnmarshalDetectedField(f *logproto.DetectedField) (*UnmarshaledDetectedFiel
2223
return &UnmarshaledDetectedField{
2324
Label: f.Label,
2425
Type: f.Type,
26+
Parser: f.Parser,
2527
Sketch: sketch,
2628
}, nil
2729
}
@@ -77,6 +79,7 @@ func MergeFields(
7779
Label: field.Label,
7880
Type: field.Type,
7981
Cardinality: field.Sketch.Estimate(),
82+
Parser: field.Parser,
8083
Sketch: nil,
8184
}
8285
result = append(result, detectedField)

pkg/storage/detected/fields_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ func Test_MergeFields(t *testing.T) {
3434
Type: logproto.DetectedFieldString,
3535
Cardinality: 1,
3636
Sketch: marshalledFooSketch,
37+
Parser: "logfmt",
3738
},
3839
{
3940
Label: "bar",
@@ -65,6 +66,7 @@ func Test_MergeFields(t *testing.T) {
6566

6667
assert.Equal(t, logproto.DetectedFieldString, foo.Type)
6768
assert.Equal(t, uint64(3), foo.Cardinality)
69+
assert.Equal(t, "logfmt", foo.Parser)
6870
})
6971

7072
t.Run("returns up to limit number of fields", func(t *testing.T) {

0 commit comments

Comments
 (0)