Skip to content

Commit 29bc778

Browse files
trevorwhitneypascal-sochacki
authored andcommitted
feat: add _extracted suffix to detected fields conflicts (grafana#13993)
1 parent 0b0ca64 commit 29bc778

File tree

3 files changed

+86
-26
lines changed

3 files changed

+86
-26
lines changed

pkg/querier/querier.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -1199,6 +1199,11 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p
11991199
emtpyparser := ""
12001200

12011201
for _, stream := range streams {
1202+
streamLbls, err := syntax.ParseLabels(stream.Labels)
1203+
if err != nil {
1204+
streamLbls = labels.EmptyLabels()
1205+
}
1206+
12021207
for _, entry := range stream.Entries {
12031208
structuredMetadata := getStructuredMetadata(entry)
12041209
for k, vals := range structuredMetadata {
@@ -1226,7 +1231,7 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p
12261231
}
12271232
}
12281233

1229-
detected, parser := parseLine(entry.Line)
1234+
detected, parser := parseLine(entry.Line, streamLbls)
12301235
for k, vals := range detected {
12311236
df, ok := detectedFields[k]
12321237
if !ok && fieldCount < limit {
@@ -1283,11 +1288,11 @@ func getStructuredMetadata(entry push.Entry) map[string][]string {
12831288
return result
12841289
}
12851290

1286-
func parseLine(line string) (map[string][]string, *string) {
1291+
func parseLine(line string, streamLbls labels.Labels) (map[string][]string, *string) {
12871292
parser := "logfmt"
12881293
logFmtParser := logql_log.NewLogfmtParser(true, false)
12891294

1290-
lbls := logql_log.NewBaseLabelsBuilder().ForLabels(labels.EmptyLabels(), 0)
1295+
lbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, 0)
12911296
_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls)
12921297
if !logfmtSuccess || lbls.HasErr() {
12931298
parser = "json"
@@ -1301,6 +1306,10 @@ func parseLine(line string) (map[string][]string, *string) {
13011306

13021307
parsedLabels := map[string]map[string]struct{}{}
13031308
for _, lbl := range lbls.LabelsResult().Labels() {
1309+
// skip indexed labels, as we only want detected fields
1310+
if streamLbls.Has(lbl.Name) {
1311+
continue
1312+
}
13041313
if values, ok := parsedLabels[lbl.Name]; ok {
13051314
values[lbl.Value] = struct{}{}
13061315
} else {

pkg/querier/querier_mock_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ func mockStreamWithLabels(from int, quantity int, labels string) logproto.Stream
575575
}
576576

577577
func mockLogfmtStream(from int, quantity int) logproto.Stream {
578-
return mockLogfmtStreamWithLabels(from, quantity, `{type="test"}`)
578+
return mockLogfmtStreamWithLabels(from, quantity, `{type="test", name="foo"}`)
579579
}
580580

581581
func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Stream {
@@ -586,7 +586,7 @@ func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Str
586586
entries = append(entries, logproto.Entry{
587587
Timestamp: time.Unix(int64(i), 0),
588588
Line: fmt.Sprintf(
589-
`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t`,
589+
`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`,
590590
i,
591591
i,
592592
(i * 10),

pkg/querier/querier_test.go

+72-21
Original file line numberDiff line numberDiff line change
@@ -1777,15 +1777,16 @@ func TestQuerier_DetectedFields(t *testing.T) {
17771777
detectedFields := resp.Fields
17781778
// log lines come from querier_mock_test.go
17791779
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
1780-
assert.Len(t, detectedFields, 7)
1780+
assert.Len(t, detectedFields, 8)
17811781
expectedCardinality := map[string]uint64{
1782-
"message": 5,
1783-
"count": 5,
1784-
"fake": 1,
1785-
"bytes": 5,
1786-
"duration": 5,
1787-
"percent": 5,
1788-
"even": 2,
1782+
"message": 5,
1783+
"count": 5,
1784+
"fake": 1,
1785+
"bytes": 5,
1786+
"duration": 5,
1787+
"percent": 5,
1788+
"even": 2,
1789+
"name_extracted": 1,
17891790
}
17901791
for _, d := range detectedFields {
17911792
card := expectedCardinality[d.Label]
@@ -1821,17 +1822,18 @@ func TestQuerier_DetectedFields(t *testing.T) {
18211822
detectedFields := resp.Fields
18221823
// log lines come from querier_mock_test.go
18231824
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
1824-
assert.Len(t, detectedFields, 9)
1825+
assert.Len(t, detectedFields, 10)
18251826
expectedCardinality := map[string]uint64{
1826-
"variable": 5,
1827-
"constant": 1,
1828-
"message": 5,
1829-
"count": 5,
1830-
"fake": 1,
1831-
"bytes": 5,
1832-
"duration": 5,
1833-
"percent": 5,
1834-
"even": 2,
1827+
"variable": 5,
1828+
"constant": 1,
1829+
"message": 5,
1830+
"count": 5,
1831+
"fake": 1,
1832+
"bytes": 5,
1833+
"duration": 5,
1834+
"percent": 5,
1835+
"even": 2,
1836+
"name_extracted": 1,
18351837
}
18361838
for _, d := range detectedFields {
18371839
card := expectedCardinality[d.Label]
@@ -1867,7 +1869,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
18671869
detectedFields := resp.Fields
18681870
// log lines come from querier_mock_test.go
18691871
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
1870-
assert.Len(t, detectedFields, 7)
1872+
assert.Len(t, detectedFields, 8)
18711873

18721874
var messageField, countField, bytesField, durationField, floatField, evenField *logproto.DetectedField
18731875
for _, field := range detectedFields {
@@ -1923,7 +1925,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
19231925
detectedFields := resp.Fields
19241926
// log lines come from querier_mock_test.go
19251927
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
1926-
assert.Len(t, detectedFields, 9)
1928+
assert.Len(t, detectedFields, 10)
19271929

19281930
var messageField, countField, bytesField, durationField, floatField, evenField, constantField, variableField *logproto.DetectedField
19291931
for _, field := range detectedFields {
@@ -1955,7 +1957,56 @@ func TestQuerier_DetectedFields(t *testing.T) {
19551957
assert.Equal(t, []string{"logfmt"}, evenField.Parsers)
19561958
assert.Equal(t, []string{""}, constantField.Parsers)
19571959
assert.Equal(t, []string{""}, variableField.Parsers)
1958-
})
1960+
},
1961+
)
1962+
1963+
t.Run(
1964+
"adds _extracted suffix to detected fields that conflict with indexed labels",
1965+
func(t *testing.T) {
1966+
store := newStoreMock()
1967+
store.On("SelectLogs", mock.Anything, mock.Anything).
1968+
Return(mockLogfmtStreamIterator(1, 2), nil)
1969+
1970+
queryClient := newQueryClientMock()
1971+
queryClient.On("Recv").
1972+
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStreamWithStructuredMetadata(1, 2)}), nil)
1973+
1974+
ingesterClient := newQuerierClientMock()
1975+
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
1976+
Return(queryClient, nil)
1977+
1978+
querier, err := newQuerier(
1979+
conf,
1980+
mockIngesterClientConfig(),
1981+
newIngesterClientMockFactory(ingesterClient),
1982+
mockReadRingWithOneActiveIngester(),
1983+
&mockDeleteGettter{},
1984+
store, limits)
1985+
require.NoError(t, err)
1986+
1987+
resp, err := querier.DetectedFields(ctx, &request)
1988+
require.NoError(t, err)
1989+
1990+
detectedFields := resp.Fields
1991+
// log lines come from querier_mock_test.go
1992+
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
1993+
assert.Len(t, detectedFields, 10)
1994+
1995+
var nameField *logproto.DetectedField
1996+
for _, field := range detectedFields {
1997+
switch field.Label {
1998+
case "name_extracted":
1999+
nameField = field
2000+
}
2001+
}
2002+
2003+
assert.NotNil(t, nameField)
2004+
assert.Equal(t, "name_extracted", nameField.Label)
2005+
assert.Equal(t, logproto.DetectedFieldString, nameField.Type)
2006+
assert.Equal(t, []string{"logfmt"}, nameField.Parsers)
2007+
assert.Equal(t, uint64(1), nameField.Cardinality)
2008+
},
2009+
)
19592010
}
19602011

19612012
func BenchmarkQuerierDetectedFields(b *testing.B) {

0 commit comments

Comments
 (0)