Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enforce max series for metrics queries #4525

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [CHANGE] **BREAKING CHANGE** Enforce max attribute size at event, link, and instrumentation scope. Make config per-tenant.
Renamed max_span_attr_byte to max_attribute_bytes
[#4633](https://github.com/grafana/tempo/pull/4633) (@ie-pham)
* [CHANGE] Enforce max series in response for metrics queries [#4525](https://github.com/grafana/tempo/pull/4525) (@ie-pham)
ie-pham marked this conversation as resolved.
Show resolved Hide resolved
* [ENHANCEMENT] Update minio to version [#4341](https://github.com/grafana/tempo/pull/4568) (@javiermolinar)
* [ENHANCEMENT] Prevent queries in the ingester from blocking flushing traces to disk and memory spikes. [#4483](https://github.com/grafana/tempo/pull/4483) (@joe-elliott)
* [ENHANCEMENT] Update tempo operational dashboard for new block-builder and v2 traces api [#4559](https://github.com/grafana/tempo/pull/4559) (@mdisibio)
Expand Down
7 changes: 7 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,9 @@ query_frontend:
# Maximun number of exemplars per range query. Limited to 100.
[max_exemplars: <int> | default = 100 ]

# Maximum number of time series returned for a metrics query.
[max_response_series: <int> | default = 1000]

# query_backend_after controls where the query-frontend searches for traces.
# Time ranges older than query_backend_after will be searched in the backend/object storage only.
# Time ranges between query_backend_after and now will be queried from the metrics-generators.
Expand Down Expand Up @@ -838,6 +841,10 @@ querier:
# the address of the query frontend to connect to, and process queries
# Example: "frontend_address: query-frontend-discovery.default.svc.cluster.local:9095"
[frontend_address: <string>]

metrics:
# Maximum number of time series returned for a metrics query.
[max_response_series: <int> | default = 1000]
```

It also queries compacted blocks that fall within the (2 * BlocklistPoll) range where the value of Blocklist poll duration
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ querier:
metrics:
concurrent_blocks: 2
time_overlap_cutoff: 0.2
max_response_series: 1000
max_concurrent_queries: 20
frontend_worker:
frontend_address: ""
Expand Down Expand Up @@ -332,6 +333,7 @@ query_frontend:
query_backend_after: 30m0s
interval: 5m0s
max_exemplars: 100
max_response_series: 1000
multi_tenant_queries_enabled: true
response_consumers: 10
weights:
Expand Down
81 changes: 81 additions & 0 deletions integration/e2e/config-query-range-max-series.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
target: all
stream_over_http_enabled: true

server:
http_listen_port: 3200
log_level: debug

query_frontend:
search:
query_backend_after: 0 # setting these both to 0 will force all range searches to hit the backend
query_ingesters_until: 0
metrics:
max_response_series: 3


distributor:
receivers:
jaeger:
protocols:
grpc:
endpoint: "tempo:14250"
otlp:
protocols:
grpc:
endpoint: "tempo:4317"
zipkin:
endpoint: "tempo:9411"
log_received_spans:
enabled: true

metrics_generator:
processor:
local_blocks:
flush_check_period: 1s
max_block_duration: 10s
flush_to_storage: true
storage:
path: /var/tempo
remote_write:
- url: http://tempo_e2e-prometheus:9090/api/v1/write
send_exemplars: true
traces_storage:
path: /var/tempo/generator/traces

ingester:
lifecycler:
address: 127.0.0.1
ring:
kvstore:
store: inmemory
replication_factor: 1
final_sleep: 0s
trace_idle_period: 1s
max_block_bytes: 1
max_block_duration: 2s
complete_block_timeout: 20s
flush_check_period: 1s

storage:
trace:
backend: local
local:
path: /var/tempo
pool:
max_workers: 10
queue_depth: 100
block:
version: vParquet4
blocklist_poll: 1s

overrides:
defaults:
metrics_generator:
processors: [ local-blocks ]
user_configurable_overrides:
enabled: true
poll_interval: 10s
client:
backend: local
local:
path: /var/tempo/overrides
71 changes: 70 additions & 1 deletion integration/e2e/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,22 @@ import (
"io"
"net/http"
"net/url"
"strings"
"testing"
"time"

"github.com/gogo/protobuf/jsonpb"

"github.com/grafana/e2e"
"github.com/grafana/tempo/integration/util"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/stretchr/testify/require"
)

const configQueryRange = "config-query-range.yaml"
const (
configQueryRange = "config-query-range.yaml"
configQueryRangeMaxSeries = "config-query-range-max-series.yaml"
)

// Set debugMode to true to print the response body
var debugMode = false
Expand Down Expand Up @@ -65,6 +71,69 @@ sendLoop:
require.Equal(t, 400, res.StatusCode)
}

func TestQueryRangeMaxSeries(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()

require.NoError(t, util.CopyFileToSharedDir(s, configQueryRangeMaxSeries, "config.yaml"))
tempo := util.NewTempoAllInOne()
require.NoError(t, s.StartAndWaitReady(tempo))

jaegerClient, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, jaegerClient)

ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
sendLoop:
for {
select {
case <-ticker.C:
require.NoError(t, jaegerClient.EmitBatch(context.Background(), util.MakeThriftBatch()))
case <-timer.C:
break sendLoop
}
}

// Wait for traces to be flushed to blocks
require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"tempo_metrics_generator_processor_local_blocks_spans_total"}, e2e.WaitMissingMetrics))
require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"tempo_metrics_generator_processor_local_blocks_cut_blocks"}, e2e.WaitMissingMetrics))

query := "{} | rate() by (span:id)"
url := fmt.Sprintf(
"http://%s/api/metrics/query_range?q=%s&start=%d&end=%d&step=%s",
tempo.Endpoint(3200),
url.QueryEscape(query),
time.Now().Add(-5*time.Minute).UnixNano(),
time.Now().Add(time.Minute).UnixNano(),
"5s",
)

req, err := http.NewRequest(http.MethodGet, url, nil)
require.NoError(t, err)

res, err := http.DefaultClient.Do(req)
require.NoError(t, err)

// Read body and print it
body, err := io.ReadAll(res.Body)
require.NoError(t, err)
fmt.Println(string(body))

queryRangeRes := &tempopb.QueryRangeResponse{}
readBody := strings.NewReader(string(body))
err = new(jsonpb.Unmarshaler).Unmarshal(readBody, queryRangeRes)
require.NotNil(t, queryRangeRes)

// max series is 3 so we should get a partial response with 3 series
require.Equal(t, tempopb.PartialStatus_PARTIAL, queryRangeRes.GetStatus())
require.Equal(t, "Response exceeds maximum series of 3, a partial response is returned", queryRangeRes.GetMessage())
require.Equal(t, 3, len(queryRangeRes.GetSeries()))
}

func callQueryRange(t *testing.T, endpoint, query string, printBody bool) {
res := doRequest(t, endpoint, query)
require.Equal(t, http.StatusOK, res.StatusCode)
Expand Down
19 changes: 15 additions & 4 deletions modules/frontend/combiner/metrics_query_range.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package combiner

import (
"fmt"
"math"
"slices"
"sort"
Expand All @@ -14,8 +15,8 @@ import (
var _ GRPCCombiner[*tempopb.QueryRangeResponse] = (*genericCombiner[*tempopb.QueryRangeResponse])(nil)

// NewQueryRange returns a query range combiner.
func NewQueryRange(req *tempopb.QueryRangeRequest) (Combiner, error) {
combiner, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeFinal)
func NewQueryRange(req *tempopb.QueryRangeRequest, maxSeries int) (Combiner, error) {
combiner, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeFinal, maxSeries)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -45,6 +46,13 @@ func NewQueryRange(req *tempopb.QueryRangeRequest) (Combiner, error) {
if resp == nil {
resp = &tempopb.QueryRangeResponse{}
}
if maxSeries > 0 && len(resp.Series) >= maxSeries {
// Truncating the final response because even if we bail as soon as len(resp.Series) >= maxSeries
// it's possible that the last response pushed us over the max series limit.
resp.Series = resp.Series[:maxSeries]
ie-pham marked this conversation as resolved.
Show resolved Hide resolved
resp.Status = tempopb.PartialStatus_PARTIAL
resp.Message = fmt.Sprintf("Response exceeds maximum series of %d, a partial response is returned", maxSeries)
}
sortResponse(resp)
attachExemplars(req, resp)

Expand All @@ -65,15 +73,18 @@ func NewQueryRange(req *tempopb.QueryRangeRequest) (Combiner, error) {

return diff, nil
},
quit: func(resp *tempopb.QueryRangeResponse) bool {
Copy link
Member

@electron0zero electron0zero Feb 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test for early exit from the combiner?

return combiner.MaxSeriesReached()
},
}

initHTTPCombiner(c, api.HeaderAcceptJSON)

return c, nil
}

func NewTypedQueryRange(req *tempopb.QueryRangeRequest) (GRPCCombiner[*tempopb.QueryRangeResponse], error) {
c, err := NewQueryRange(req)
func NewTypedQueryRange(req *tempopb.QueryRangeRequest, maxSeries int) (GRPCCombiner[*tempopb.QueryRangeResponse], error) {
c, err := NewQueryRange(req, maxSeries)
if err != nil {
return nil, err
}
Expand Down
104 changes: 104 additions & 0 deletions modules/frontend/combiner/metrics_query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,110 @@ func TestDiffExemplars(t *testing.T) {
}
}

func TestQueryRangemaxSeriesShouldQuit(t *testing.T) {
start := uint64(1100 * time.Second)
end := uint64(1300 * time.Second)
step := traceql.DefaultQueryRangeStep(start, end)

req := &tempopb.QueryRangeRequest{
Query: "{} | rate()",
Start: start,
End: end,
Step: step,
}

maxSeries := 4
queryRangeCombiner, err := NewQueryRange(req, maxSeries)
require.NoError(t, err)

// add 3 series, should not quit
resp := &tempopb.QueryRangeResponse{
Metrics: &tempopb.SearchMetrics{
InspectedTraces: 1,
InspectedBytes: 1,
},
Series: []*tempopb.TimeSeries{
{
PromLabels: "foo",
Labels: []v1.KeyValue{
{Key: "foo", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "bar"}}},
},
Samples: []tempopb.Sample{
{
TimestampMs: 1200_000,
Value: 2,
},
},
},
{
PromLabels: "boo",
Labels: []v1.KeyValue{
{Key: "boo", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "bar"}}},
},
Samples: []tempopb.Sample{
{
TimestampMs: 1200_000,
Value: 2,
},
},
},
{
PromLabels: "moo",
Labels: []v1.KeyValue{
{Key: "moo", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "bar"}}},
},
Samples: []tempopb.Sample{
{
TimestampMs: 1200_000,
Value: 2,
},
},
},
},
}

queryRangeCombiner.AddResponse(toHTTPResponse(t, resp, 200))
require.False(t, queryRangeCombiner.ShouldQuit())

// add 4th & 5th series, should quit
secondResp := &tempopb.QueryRangeResponse{
Metrics: &tempopb.SearchMetrics{
InspectedTraces: 1,
InspectedBytes: 1,
},
Series: []*tempopb.TimeSeries{
{
PromLabels: "goo",
Labels: []v1.KeyValue{
{Key: "foo", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "bar"}}},
},
Samples: []tempopb.Sample{
{
TimestampMs: 1200_000,
Value: 2,
},
},
},
{
PromLabels: "poo",
Labels: []v1.KeyValue{
{Key: "foo", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "bar"}}},
},
Samples: []tempopb.Sample{
{
TimestampMs: 1200_000,
Value: 2,
},
},
},
},
}

queryRangeCombiner.AddResponse(toHTTPResponse(t, secondResp, 200))
require.True(t, queryRangeCombiner.ShouldQuit())

}

func BenchmarkDiffSeriesAndMarshal(b *testing.B) {
prev, curr := seriesWithTenPercentDiff()

Expand Down
Loading
Loading