Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ruler: cap the number of remote eval retries #10375

Merged
merged 7 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

## main / unreleased

* [CHANGE] Query-frontend: Add `topic` label to `cortex_ingest_storage_strong_consistency_requests_total`, `cortex_ingest_storage_strong_consistency_failures_total`, and `cortex_ingest_storage_strong_consistency_wait_duration_seconds` metrics. #10220

### Grafana Mimir

* [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236
* [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256
* [CHANGE] Query-frontend: Add `topic` label to `cortex_ingest_storage_strong_consistency_requests_total`, `cortex_ingest_storage_strong_consistency_failures_total`, and `cortex_ingest_storage_strong_consistency_wait_duration_seconds` metrics. #10220
* [CHANGE] Ruler: cap the rate of retries for remote query evaluation to 170/sec. This is configurable via `-ruler.query-frontend.max-retries-rate`. #10375
* [ENHANCEMENT] Query Frontend: Return server-side `samples_processed` statistics. #10103
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145
Expand Down
10 changes: 10 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -13216,6 +13216,16 @@
"fieldDefaultValue": "protobuf",
"fieldFlag": "ruler.query-frontend.query-result-response-format",
"fieldType": "string"
},
{
"kind": "field",
"name": "max_retries_rate",
"required": false,
"desc": "Maximum number of retries for failed queries per second.",
"fieldValue": null,
"fieldDefaultValue": 170,
"fieldFlag": "ruler.query-frontend.max-retries-rate",
"fieldType": "float"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2985,6 +2985,8 @@ Usage of ./cmd/mimir/mimir:
Override the default minimum TLS version. Allowed values: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13
-ruler.query-frontend.grpc-client-config.tls-server-name string
Override the expected name on the server certificate.
-ruler.query-frontend.max-retries-rate float
Maximum number of retries for failed queries per second. (default 170)
-ruler.query-frontend.query-result-response-format string
Format to use when retrieving query results from query-frontends. Supported values: json, protobuf (default "protobuf")
-ruler.query-stats-enabled
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,8 @@ Usage of ./cmd/mimir/mimir:
Maximum number of rules per rule group by namespace. Value is a map, where each key is the namespace and value is the number of rules allowed in the namespace (int). On the command line, this map is given in a JSON format. The number of rules specified has the same meaning as -ruler.max-rules-per-rule-group, but only applies for the specific namespace. If specified, it supersedes -ruler.max-rules-per-rule-group. (default {})
-ruler.query-frontend.address string
GRPC listen address of the query-frontend(s). Must be a DNS address (prefixed with dns:///) to enable client side load balancing.
-ruler.query-frontend.max-retries-rate float
Maximum number of retries for failed queries per second. (default 170)
-ruler.query-frontend.query-result-response-format string
Format to use when retrieving query results from query-frontends. Supported values: json, protobuf (default "protobuf")
-ruler.recording-rules-evaluation-enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2103,6 +2103,10 @@ query_frontend:
# CLI flag: -ruler.query-frontend.query-result-response-format
[query_result_response_format: <string> | default = "protobuf"]

# Maximum number of retries for failed queries per second.
# CLI flag: -ruler.query-frontend.max-retries-rate
[max_retries_rate: <float> | default = 170]

tenant_federation:
# Enable rule groups to query against multiple tenants. The tenant IDs
# involved need to be in the rule group's 'source_tenants' field. If this flag
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
if err != nil {
return nil, err
}
remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, t.Cfg.Querier.EngineConfig.Timeout, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware)
remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, t.Cfg.Querier.EngineConfig.Timeout, 1, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware)

embeddedQueryable = prom_remote.NewSampleAndChunkQueryableClient(
remoteQuerier,
Expand Down
18 changes: 16 additions & 2 deletions pkg/ruler/remotequerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

Expand Down Expand Up @@ -67,6 +68,8 @@ type QueryFrontendConfig struct {
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the rulers and query-frontends."`

QueryResultResponseFormat string `yaml:"query_result_response_format"`

MaxRetriesRate float64 `yaml:"max_retries_rate"`
}

func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -80,6 +83,7 @@ func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) {
c.GRPCClientConfig.RegisterFlagsWithPrefix("ruler.query-frontend.grpc-client-config", f)

f.StringVar(&c.QueryResultResponseFormat, "ruler.query-frontend.query-result-response-format", formatProtobuf, fmt.Sprintf("Format to use when retrieving query results from query-frontends. Supported values: %s", strings.Join(allFormats, ", ")))
f.Float64Var(&c.MaxRetriesRate, "ruler.query-frontend.max-retries-rate", 170, "Maximum number of retries for failed queries per second.")
}

func (c *QueryFrontendConfig) Validate() error {
Expand Down Expand Up @@ -115,6 +119,7 @@ type Middleware func(ctx context.Context, req *httpgrpc.HTTPRequest) error
// RemoteQuerier executes read operations against a httpgrpc.HTTPClient.
type RemoteQuerier struct {
client httpgrpc.HTTPClient
retryLimiter *rate.Limiter
timeout time.Duration
middlewares []Middleware
promHTTPPrefix string
Expand All @@ -130,6 +135,7 @@ var protobufDecoderInstance = protobufDecoder{}
func NewRemoteQuerier(
client httpgrpc.HTTPClient,
timeout time.Duration,
maxRetryRate float64, // maxRetryRate is the maximum number of retries for failed queries per second.
preferredQueryResultResponseFormat string,
prometheusHTTPPrefix string,
logger log.Logger,
Expand All @@ -138,6 +144,7 @@ func NewRemoteQuerier(
return &RemoteQuerier{
client: client,
timeout: timeout,
retryLimiter: rate.NewLimiter(rate.Limit(maxRetryRate), 1),
middlewares: middlewares,
promHTTPPrefix: prometheusHTTPPrefix,
logger: logger,
Expand Down Expand Up @@ -349,12 +356,19 @@ func (q *RemoteQuerier) sendRequest(ctx context.Context, req *httpgrpc.HTTPReque
if !retry.Ongoing() {
return nil, err
}
level.Warn(logger).Log("msg", "failed to remotely evaluate query expression, will retry", "err", err)
retry.Wait()

// The backoff wait spreads the retries. We check the rate limiter only after that spread.
// That should give queries a bigger chance at passing through the rate limiter
// in cases of short error bursts and the rate of failed queries being larger than the rate limit.
if !q.retryLimiter.Allow() {
return nil, fmt.Errorf("exhausted global retry budget for remote ruler execution (ruler.query-frontend.max-retries-rate); last error was: %w", err)
}
level.Warn(logger).Log("msg", "failed to remotely evaluate query expression, will retry", "err", err)

// Avoid masking last known error if context was cancelled while waiting.
if ctx.Err() != nil {
return nil, fmt.Errorf("%s while retrying request, last err was: %w", ctx.Err(), err)
return nil, fmt.Errorf("%s while retrying request, last error was: %w", ctx.Err(), err)
}
}
}
Expand Down
58 changes: 43 additions & 15 deletions pkg/ruler/remotequerier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should issue a remote read request", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.NoError(t, err)

Expand All @@ -76,7 +76,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should not inject the read consistency header if none is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.NoError(t, err)

Expand All @@ -86,7 +86,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should inject the read consistency header if it is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())

ctx := api.ContextWithReadConsistencyLevel(context.Background(), api.ReadConsistencyStrong)
_, err := q.Read(ctx, &prompb.Query{}, false)
Expand All @@ -101,7 +101,7 @@ func TestRemoteQuerier_ReadReqTimeout(t *testing.T) {
<-ctx.Done()
return nil, ctx.Err()
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, 1, formatJSON, "/prometheus", log.NewNopLogger())

_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.Error(t, err)
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run(fmt.Sprintf("format = %s", format), func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, format, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, format, "/prometheus", log.NewNopLogger())
_, err := q.Query(context.Background(), "qs", tm)
require.NoError(t, err)

Expand All @@ -165,7 +165,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run("should not inject the read consistency header if none is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
_, err := q.Query(context.Background(), "qs", tm)
require.NoError(t, err)

Expand All @@ -175,7 +175,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run("should inject the read consistency header if it is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())

ctx := api.ContextWithReadConsistencyLevel(context.Background(), api.ReadConsistencyStrong)
_, err := q.Query(ctx, "qs", tm)
Expand Down Expand Up @@ -276,20 +276,20 @@ func TestRemoteQuerier_QueryRetryOnFailure(t *testing.T) {
}
return testCase.response, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
require.Equal(t, int64(0), count.Load())
_, err := q.Query(ctx, "qs", time.Now())
if testCase.err == nil {
if testCase.expectedError == nil {
require.NoError(t, err)
} else {
require.Error(t, err)
require.EqualError(t, err, testCase.expectedError.Error())
require.ErrorContains(t, err, testCase.expectedError.Error())
}
require.Equal(t, int64(1), count.Load())
} else {
require.Error(t, err)
require.EqualError(t, err, testCase.expectedError.Error())
require.ErrorContains(t, err, testCase.expectedError.Error())
if testCase.expectedRetries {
require.Greater(t, count.Load(), int64(1))
} else {
Expand All @@ -299,6 +299,34 @@ func TestRemoteQuerier_QueryRetryOnFailure(t *testing.T) {
})
}
}
func TestRemoteQuerier_QueryRetryBudgetExhaustion(t *testing.T) {
ctx := context.Background()
attempts := &atomic.Int64{}

// Mock client that always returns a 500 error to trigger retries
mockClientFn := func(context.Context, *httpgrpc.HTTPRequest, ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) {
attempts.Add(1)
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "some error")
}

// Create querier with very low retry rate to trigger budget exhaustion quickly
q := NewRemoteQuerier(
mockHTTPGRPCClient(mockClientFn),
time.Minute, // timeout
0.0001, // retry rate
formatJSON,
"/prometheus",
log.NewNopLogger(),
)

_, err := q.Query(ctx, "test_query", time.Now())

require.Error(t, err)
require.ErrorContains(t, err, "exhausted global retry budget for remote ruler execution (ruler.query-frontend.max-retries-rate)")

// Should have attempted at least one retry before exhausting budget
require.GreaterOrEqual(t, attempts.Load(), int64(2))
}

func TestRemoteQuerier_QueryJSONDecoding(t *testing.T) {
scenarios := map[string]struct {
Expand Down Expand Up @@ -405,7 +433,7 @@ func TestRemoteQuerier_QueryJSONDecoding(t *testing.T) {
Body: []byte(scenario.body),
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
actual, err := q.Query(context.Background(), "qs", tm)
Expand Down Expand Up @@ -678,7 +706,7 @@ func TestRemoteQuerier_QueryProtobufDecoding(t *testing.T) {
Body: b,
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatProtobuf, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatProtobuf, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
actual, err := q.Query(context.Background(), "qs", tm)
Expand All @@ -701,7 +729,7 @@ func TestRemoteQuerier_QueryUnknownResponseContentType(t *testing.T) {
Body: []byte("some body content"),
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
_, err := q.Query(context.Background(), "qs", tm)
Expand All @@ -713,7 +741,7 @@ func TestRemoteQuerier_QueryReqTimeout(t *testing.T) {
<-ctx.Done()
return nil, ctx.Err()
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, 1, formatJSON, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
_, err := q.Query(context.Background(), "qs", tm)
Expand Down Expand Up @@ -771,7 +799,7 @@ func TestRemoteQuerier_StatusErrorResponses(t *testing.T) {
return testCase.resp, testCase.err
}
logger := newLoggerWithCounter()
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", logger)
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", logger)

tm := time.Unix(1649092025, 515834)

Expand Down
Loading