Skip to content

Commit 6c6410c

Browse files
fix: only allow aggregated metric queries from logs drilldown app (#16670)
1 parent cd11367 commit 6c6410c

File tree

3 files changed

+181
-0
lines changed

3 files changed

+181
-0
lines changed

pkg/querier/limits/validation.go

+52
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,27 @@ package limits
22

33
import (
44
"context"
5+
"fmt"
56
"net/http"
7+
"strings"
68
"time"
79

810
"github.com/go-kit/log/level"
911
"github.com/grafana/dskit/httpgrpc"
1012
"github.com/grafana/dskit/tenant"
1113
"github.com/prometheus/common/model"
1214

15+
"github.com/grafana/loki/v3/pkg/loghttp/push"
1316
"github.com/grafana/loki/v3/pkg/logql"
17+
"github.com/grafana/loki/v3/pkg/util/httpreq"
1418
"github.com/grafana/loki/v3/pkg/util/spanlogger"
1519
util_validation "github.com/grafana/loki/v3/pkg/util/validation"
1620
)
1721

22+
const logsDrilldownAppName = "grafana-lokiexplore-app"
23+
1824
var nowFunc = func() time.Time { return time.Now() }
25+
var ErrAggMetricsDrilldownOnly = fmt.Errorf("aggregated metric queries can only be accessed from Logs Drilldown")
1926

2027
func ValidateQueryRequest(ctx context.Context, req logql.QueryParams, limits Limits) (time.Time, time.Time, error) {
2128
userID, err := tenant.TenantID(ctx)
@@ -38,6 +45,51 @@ func ValidateQueryRequest(ctx context.Context, req logql.QueryParams, limits Lim
3845
return ValidateQueryTimeRangeLimits(ctx, userID, limits, req.GetStart(), req.GetEnd())
3946
}
4047

48+
// ValidateAggregatedMetricQuery checks if the query is accessing __aggregated_metric__ streams
49+
// and ensures that only queries from Grafana Explore Logs can access them.
50+
func ValidateAggregatedMetricQuery(ctx context.Context, req logql.QueryParams) error {
51+
selector, err := req.LogSelector()
52+
if err != nil {
53+
return err
54+
}
55+
56+
// Check if the query targets aggregated metrics
57+
isAggregatedMetricQuery := false
58+
matchers := selector.Matchers()
59+
60+
for _, matcher := range matchers {
61+
if matcher.Name == push.AggregatedMetricLabel {
62+
isAggregatedMetricQuery = true
63+
break
64+
}
65+
}
66+
67+
if !isAggregatedMetricQuery {
68+
return nil
69+
}
70+
71+
tags := httpreq.ExtractQueryTagsFromContext(ctx)
72+
kvs := httpreq.TagsToKeyValues(tags)
73+
74+
// KVs is an []interface{} of key value pairs, so iterate by keys
75+
for i := 0; i < len(kvs); i += 2 {
76+
current, ok := kvs[i].(string)
77+
if !ok {
78+
continue
79+
}
80+
81+
next, ok := kvs[i+1].(string)
82+
if !ok {
83+
continue
84+
}
85+
86+
if current == "source" && strings.EqualFold(next, logsDrilldownAppName) {
87+
return nil
88+
}
89+
}
90+
return ErrAggMetricsDrilldownOnly
91+
}
92+
4193
func ValidateQueryTimeRangeLimits(ctx context.Context, userID string, limits TimeRangeLimits, from, through time.Time) (time.Time, time.Time, error) {
4294
now := nowFunc()
4395
// Clamp the time range based on the max query lookback.

pkg/querier/limits/validation_test.go

+121
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ import (
66
"time"
77

88
"github.com/stretchr/testify/require"
9+
10+
"github.com/grafana/loki/v3/pkg/logproto"
11+
"github.com/grafana/loki/v3/pkg/logql"
12+
"github.com/grafana/loki/v3/pkg/logql/syntax"
13+
"github.com/grafana/loki/v3/pkg/querier/plan"
14+
"github.com/grafana/loki/v3/pkg/util/httpreq"
915
)
1016

1117
type fakeTimeLimits struct {
@@ -51,3 +57,118 @@ func Test_validateQueryTimeRangeLimits(t *testing.T) {
5157
})
5258
}
5359
}
60+
61+
func TestValidateAggregatedMetricQuery(t *testing.T) {
62+
makeReqAndAST := func(queryStr string) logql.QueryParams {
63+
now := time.Now()
64+
expr, err := syntax.ParseExpr(queryStr)
65+
if err != nil {
66+
panic(err)
67+
}
68+
switch expr.(type) {
69+
case syntax.SampleExpr:
70+
return logql.SelectSampleParams{SampleQueryRequest: &logproto.SampleQueryRequest{
71+
Selector: queryStr,
72+
Start: now.Add(-time.Hour),
73+
End: now,
74+
Plan: &plan.QueryPlan{AST: expr},
75+
},
76+
}
77+
default:
78+
return logql.SelectLogParams{QueryRequest: &logproto.QueryRequest{
79+
Selector: queryStr,
80+
Start: now.Add(-time.Hour),
81+
End: now,
82+
Direction: logproto.BACKWARD,
83+
Plan: &plan.QueryPlan{
84+
AST: expr,
85+
},
86+
},
87+
}
88+
}
89+
}
90+
91+
tcs := []struct {
92+
desc string
93+
req logql.QueryParams
94+
queryTags string
95+
expectedError error
96+
}{
97+
{
98+
desc: "normal query, no error",
99+
req: makeReqAndAST(`{foo="bar"}`),
100+
queryTags: "",
101+
expectedError: nil,
102+
},
103+
{
104+
desc: "aggregated metric query from explore, no error",
105+
req: makeReqAndAST(`{__aggregated_metric__="service-name"}`),
106+
queryTags: "source=" + logsDrilldownAppName,
107+
expectedError: nil,
108+
},
109+
{
110+
desc: "query tags are case insensitive",
111+
req: makeReqAndAST(`{__aggregated_metric__="service-name"}`),
112+
queryTags: "Source=" + logsDrilldownAppName,
113+
expectedError: nil,
114+
},
115+
{
116+
desc: "aggregated metric query from explore, multiple selectors, no error",
117+
req: makeReqAndAST(`{app="service-name", __aggregated_metric__="true"}`),
118+
queryTags: "source=" + logsDrilldownAppName,
119+
expectedError: nil,
120+
},
121+
{
122+
desc: "aggregated metric query from explore, multiple selectors, filter, no error",
123+
req: makeReqAndAST(`{app="service-name", __aggregated_metric__="true"} |= "test"`),
124+
queryTags: "source=" + logsDrilldownAppName,
125+
expectedError: nil,
126+
},
127+
{
128+
desc: "aggregated metrics metric query from explore, multiple selectors, filter, no error",
129+
req: makeReqAndAST(`sum by (service_name)(count_over_time({app="service-name", __aggregated_metric__="true"} |= "test" [5m]))`),
130+
queryTags: "source=" + logsDrilldownAppName,
131+
expectedError: nil,
132+
},
133+
{
134+
desc: "aggregated metric query from other source, blocked",
135+
req: makeReqAndAST(`{__aggregated_metric__="service-name"}`),
136+
queryTags: "source=other-app",
137+
expectedError: ErrAggMetricsDrilldownOnly,
138+
},
139+
{
140+
desc: "aggregated metric query with no source, blocked",
141+
req: makeReqAndAST(`{__aggregated_metric__="service-name"}`),
142+
queryTags: "",
143+
expectedError: ErrAggMetricsDrilldownOnly,
144+
},
145+
{
146+
desc: "aggregated metric query with no source, multiple selectors, blocked",
147+
req: makeReqAndAST(`{app="service-name", __aggregated_metric__="true"}`),
148+
queryTags: "",
149+
expectedError: ErrAggMetricsDrilldownOnly,
150+
},
151+
{
152+
desc: "aggregated metrics metric query with no source, multiple selectors, filter, blocked",
153+
req: makeReqAndAST(`sum by (service_name)(count_over_time({app="service-name", __aggregated_metric__="true"} |= "test" [5m]))`),
154+
queryTags: "",
155+
expectedError: ErrAggMetricsDrilldownOnly,
156+
},
157+
}
158+
159+
for _, tc := range tcs {
160+
t.Run(tc.desc, func(t *testing.T) {
161+
ctx := context.Background()
162+
if tc.queryTags != "" {
163+
ctx = httpreq.InjectQueryTags(ctx, tc.queryTags)
164+
}
165+
166+
err := ValidateAggregatedMetricQuery(ctx, tc.req)
167+
if tc.expectedError != nil {
168+
require.ErrorIs(t, err, tc.expectedError)
169+
} else {
170+
require.NoError(t, err)
171+
}
172+
})
173+
}
174+
}

pkg/querier/querier.go

+8
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,14 @@ func (q *SingleTenantQuerier) SelectLogs(ctx context.Context, params logql.Selec
154154
return nil, err
155155
}
156156

157+
err = querier_limits.ValidateAggregatedMetricQuery(ctx, params)
158+
if err != nil {
159+
if errors.Is(err, querier_limits.ErrAggMetricsDrilldownOnly) {
160+
return iter.NoopEntryIterator, nil
161+
}
162+
return nil, err
163+
}
164+
157165
params.QueryRequest.Deletes, err = deletion.DeletesForUserQuery(ctx, params.Start, params.End, q.deleteGetter)
158166
if err != nil {
159167
level.Error(spanlogger.FromContext(ctx)).Log("msg", "failed loading deletes for user", "err", err)

0 commit comments

Comments
 (0)