Skip to content

Commit f68d1f7

Browse files
fix: make detected fields work for both json and proto (#12682)
1 parent ee0020c commit f68d1f7

File tree

9 files changed

+468
-196
lines changed

9 files changed

+468
-196
lines changed

integration/client/client.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func (c *Client) PushOTLPLogLine(line string, timestamp time.Time, logAttributes
111111
return c.pushOTLPLogLine(line, timestamp, logAttributes)
112112
}
113113

114-
func formatTS(ts time.Time) string {
114+
func FormatTS(ts time.Time) string {
115115
return strconv.FormatInt(ts.UnixNano(), 10)
116116
}
117117

@@ -130,7 +130,7 @@ func (c *Client) pushLogLine(line string, timestamp time.Time, structuredMetadat
130130
},
131131
Values: [][]any{
132132
{
133-
formatTS(timestamp),
133+
FormatTS(timestamp),
134134
line,
135135
structuredMetadata,
136136
},
@@ -509,7 +509,7 @@ func (c *Client) RunQuery(ctx context.Context, query string, extraHeaders ...Hea
509509

510510
v := url.Values{}
511511
v.Set("query", query)
512-
v.Set("time", formatTS(c.Now.Add(time.Second)))
512+
v.Set("time", FormatTS(c.Now.Add(time.Second)))
513513

514514
u, err := url.Parse(c.baseURL)
515515
if err != nil {
@@ -568,8 +568,8 @@ func (c *Client) parseResponse(buf []byte, statusCode int) (*Response, error) {
568568
func (c *Client) rangeQueryURL(query string, start, end time.Time) string {
569569
v := url.Values{}
570570
v.Set("query", query)
571-
v.Set("start", formatTS(start))
572-
v.Set("end", formatTS(end))
571+
v.Set("start", FormatTS(start))
572+
v.Set("end", FormatTS(end))
573573

574574
u, err := url.Parse(c.baseURL)
575575
if err != nil {

integration/explore_logs_test.go

+223
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
//go:build integration
2+
3+
package integration
4+
5+
import (
6+
"context"
7+
"encoding/json"
8+
"io"
9+
"net/url"
10+
"testing"
11+
"time"
12+
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
16+
"github.com/grafana/loki/v3/integration/client"
17+
"github.com/grafana/loki/v3/integration/cluster"
18+
)
19+
20+
type DetectedField struct {
21+
Label string `json:"label"`
22+
Type string `json:"type"`
23+
Cardinality uint64 `json:"cardinality"`
24+
}
25+
26+
type DetectedFields []DetectedField
27+
type DetectedFieldResponse struct {
28+
Fields DetectedFields `json:"fields"`
29+
}
30+
31+
func Test_ExploreLogsApis(t *testing.T) {
32+
clu := cluster.New(nil, cluster.SchemaWithTSDBAndTSDB, func(c *cluster.Cluster) {
33+
c.SetSchemaVer("v13")
34+
})
35+
defer func() {
36+
assert.NoError(t, clu.Cleanup())
37+
}()
38+
39+
// run initially the compactor, indexgateway, and distributor.
40+
var (
41+
tCompactor = clu.AddComponent(
42+
"compactor",
43+
"-target=compactor",
44+
"-compactor.compaction-interval=1s",
45+
"-compactor.retention-delete-delay=1s",
46+
// By default, a minute is added to the delete request start time. This compensates for that.
47+
"-compactor.delete-request-cancel-period=-60s",
48+
"-compactor.deletion-mode=filter-and-delete",
49+
)
50+
tIndexGateway = clu.AddComponent(
51+
"index-gateway",
52+
"-target=index-gateway",
53+
)
54+
tDistributor = clu.AddComponent(
55+
"distributor",
56+
"-target=distributor",
57+
)
58+
)
59+
require.NoError(t, clu.Run())
60+
61+
// then, run only the ingester and query scheduler.
62+
var (
63+
tIngester = clu.AddComponent(
64+
"ingester",
65+
"-target=ingester",
66+
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
67+
)
68+
tQueryScheduler = clu.AddComponent(
69+
"query-scheduler",
70+
"-target=query-scheduler",
71+
"-query-scheduler.use-scheduler-ring=false",
72+
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
73+
)
74+
)
75+
require.NoError(t, clu.Run())
76+
77+
// the run querier.
78+
var (
79+
tQuerier = clu.AddComponent(
80+
"querier",
81+
"-target=querier",
82+
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
83+
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
84+
"-common.compactor-address="+tCompactor.HTTPURL(),
85+
)
86+
)
87+
require.NoError(t, clu.Run())
88+
89+
// finally, run the query-frontend.
90+
var (
91+
tQueryFrontend = clu.AddComponent(
92+
"query-frontend",
93+
"-target=query-frontend",
94+
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
95+
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
96+
"-common.compactor-address="+tCompactor.HTTPURL(),
97+
"-querier.per-request-limits-enabled=true",
98+
"-frontend.encoding=protobuf",
99+
"-querier.shard-aggregations=quantile_over_time",
100+
"-frontend.tail-proxy-url="+tQuerier.HTTPURL(),
101+
)
102+
)
103+
require.NoError(t, clu.Run())
104+
105+
tenantID := randStringRunes()
106+
107+
now := time.Now()
108+
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
109+
cliDistributor.Now = now
110+
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
111+
cliIngester.Now = now
112+
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
113+
cliQueryFrontend.Now = now
114+
115+
t.Run("/detected_fields", func(t *testing.T) {
116+
// ingest some log lines
117+
require.NoError(t, cliDistributor.PushLogLine("foo=bar color=red", now.Add(-45*time.Minute), nil, map[string]string{"job": "fake"}))
118+
require.NoError(t, cliDistributor.PushLogLine("foo=bar color=blue", now.Add(-45*time.Minute), nil, map[string]string{"job": "fake"}))
119+
120+
require.NoError(t, cliDistributor.PushLogLine("foo=bar color=red", now.Add(-5*time.Second), nil, map[string]string{"job": "fake"}))
121+
require.NoError(t, cliDistributor.PushLogLine("foo=bar color=purple", now.Add(-5*time.Second), nil, map[string]string{"job": "fake"}))
122+
123+
require.NoError(t, cliDistributor.PushLogLine("foo=bar color=green", now, nil, map[string]string{"job": "fake"}))
124+
require.NoError(t, cliDistributor.PushLogLine("foo=bar color=red", now, nil, map[string]string{"job": "fake"}))
125+
126+
// validate logs are there
127+
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
128+
require.NoError(t, err)
129+
assert.Equal(t, "streams", resp.Data.ResultType)
130+
131+
var lines []string
132+
for _, stream := range resp.Data.Stream {
133+
for _, val := range stream.Values {
134+
lines = append(lines, val[1])
135+
}
136+
}
137+
assert.ElementsMatch(t, []string{"foo=bar color=red", "foo=bar color=blue", "foo=bar color=red", "foo=bar color=purple", "foo=bar color=green", "foo=bar color=red"}, lines)
138+
139+
t.Run("non-split queries", func(t *testing.T) {
140+
start := cliQueryFrontend.Now.Add(-1 * time.Minute)
141+
end := cliQueryFrontend.Now.Add(time.Minute)
142+
143+
v := url.Values{}
144+
v.Set("query", `{job="fake"}`)
145+
v.Set("start", client.FormatTS(start))
146+
v.Set("end", client.FormatTS(end))
147+
148+
u := url.URL{}
149+
u.Path = "/loki/api/v1/detected_fields"
150+
u.RawQuery = v.Encode()
151+
dfResp, err := cliQueryFrontend.Get(u.String())
152+
require.NoError(t, err)
153+
defer dfResp.Body.Close()
154+
155+
buf, err := io.ReadAll(dfResp.Body)
156+
require.NoError(t, err)
157+
158+
var detectedFieldResponse DetectedFieldResponse
159+
err = json.Unmarshal(buf, &detectedFieldResponse)
160+
require.NoError(t, err)
161+
162+
require.Equal(t, 2, len(detectedFieldResponse.Fields))
163+
164+
var fooField, colorField DetectedField
165+
for _, field := range detectedFieldResponse.Fields {
166+
if field.Label == "foo" {
167+
fooField = field
168+
}
169+
170+
if field.Label == "color" {
171+
colorField = field
172+
}
173+
}
174+
175+
require.Equal(t, "string", fooField.Type)
176+
require.Equal(t, "string", colorField.Type)
177+
require.Equal(t, uint64(1), fooField.Cardinality)
178+
require.Equal(t, uint64(3), colorField.Cardinality)
179+
})
180+
181+
t.Run("split queries", func(t *testing.T) {
182+
start := cliQueryFrontend.Now.Add(-24 * time.Hour)
183+
end := cliQueryFrontend.Now.Add(time.Minute)
184+
185+
v := url.Values{}
186+
v.Set("query", `{job="fake"}`)
187+
v.Set("start", client.FormatTS(start))
188+
v.Set("end", client.FormatTS(end))
189+
190+
u := url.URL{}
191+
u.Path = "/loki/api/v1/detected_fields"
192+
u.RawQuery = v.Encode()
193+
dfResp, err := cliQueryFrontend.Get(u.String())
194+
require.NoError(t, err)
195+
defer dfResp.Body.Close()
196+
197+
buf, err := io.ReadAll(dfResp.Body)
198+
require.NoError(t, err)
199+
200+
var detectedFieldResponse DetectedFieldResponse
201+
err = json.Unmarshal(buf, &detectedFieldResponse)
202+
require.NoError(t, err)
203+
204+
require.Equal(t, 2, len(detectedFieldResponse.Fields))
205+
206+
var fooField, colorField DetectedField
207+
for _, field := range detectedFieldResponse.Fields {
208+
if field.Label == "foo" {
209+
fooField = field
210+
}
211+
212+
if field.Label == "color" {
213+
colorField = field
214+
}
215+
}
216+
217+
require.Equal(t, "string", fooField.Type)
218+
require.Equal(t, "string", colorField.Type)
219+
require.Equal(t, uint64(1), fooField.Cardinality)
220+
require.Equal(t, uint64(4), colorField.Cardinality)
221+
})
222+
})
223+
}

0 commit comments

Comments
 (0)