Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

r319: ruler: cap the number of remote eval retries #10388

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* [CHANGE] Querier: The `.` pattern in regular expressions in PromQL matches newline characters. With this change regular expressions like `.*` match strings that include `\n`. To maintain the old behaviour, you will have to change regular expressions by replacing all `.` patterns with `[^\n]`, e.g. `foo[^\n]*`. This upgrades PromQL compatibility from Prometheus 2.0 to 3.0. #9844
* [CHANGE] Querier: Lookback and range selectors are left open and right closed (previously left closed and right closed). This change affects queries when the evaluation time perfectly aligns with the sample timestamps. For example assume querying a timeseries with evenly spaced samples exactly 1 minute apart. Previously, a range query with `5m` would usually return 5 samples, or 6 samples if the query evaluation aligns perfectly with a scrape. Now, queries like this will always return 5 samples. This upgrades PromQL compatibility from Prometheus 2.0 to 3.0. #9844
* [CHANGE] Querier: promql(native histograms): Introduce exponential interpolation. #9844
* [CHANGE] Ruler: cap the rate of retries for remote query evaluation to 170/sec. This is configurable via `-ruler.query-frontend.max-retries-rate`. #10375
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #9367 #9368 #9398 #9399 #9403 #9417 #9418 #9419 #9420 #9482 #9504 #9505 #9507 #9518 #9531 #9532 #9533 #9553 #9558 #9588 #9589 #9639 #9641 #9642 #9651 #9664 #9681 #9717 #9719 #9724 #9874 #9929 #9998 #10007 #10010 #10046 #10047 #10048 #10050
* [FEATURE] Distributor: Add support for `lz4` OTLP compression. #9763
* [FEATURE] Query-frontend: added experimental configuration options `query-frontend.cache-errors` and `query-frontend.results-cache-ttl-for-errors` to allow non-transient responses to be cached. When set to `true` error responses from hitting limits or bad data are cached for a short TTL. #9028
Expand Down
10 changes: 10 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -13226,6 +13226,16 @@
"fieldDefaultValue": "protobuf",
"fieldFlag": "ruler.query-frontend.query-result-response-format",
"fieldType": "string"
},
{
"kind": "field",
"name": "max_retries_rate",
"required": false,
"desc": "Maximum number of retries for failed queries per second.",
"fieldValue": null,
"fieldDefaultValue": 170,
"fieldFlag": "ruler.query-frontend.max-retries-rate",
"fieldType": "float"
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2987,6 +2987,8 @@ Usage of ./cmd/mimir/mimir:
Override the default minimum TLS version. Allowed values: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13
-ruler.query-frontend.grpc-client-config.tls-server-name string
Override the expected name on the server certificate.
-ruler.query-frontend.max-retries-rate float
Maximum number of retries for failed queries per second. (default 170)
-ruler.query-frontend.query-result-response-format string
Format to use when retrieving query results from query-frontends. Supported values: json, protobuf (default "protobuf")
-ruler.query-stats-enabled
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,8 @@ Usage of ./cmd/mimir/mimir:
Maximum number of rules per rule group by namespace. Value is a map, where each key is the namespace and value is the number of rules allowed in the namespace (int). On the command line, this map is given in a JSON format. The number of rules specified has the same meaning as -ruler.max-rules-per-rule-group, but only applies for the specific namespace. If specified, it supersedes -ruler.max-rules-per-rule-group. (default {})
-ruler.query-frontend.address string
GRPC listen address of the query-frontend(s). Must be a DNS address (prefixed with dns:///) to enable client side load balancing.
-ruler.query-frontend.max-retries-rate float
Maximum number of retries for failed queries per second. (default 170)
-ruler.query-frontend.query-result-response-format string
Format to use when retrieving query results from query-frontends. Supported values: json, protobuf (default "protobuf")
-ruler.recording-rules-evaluation-enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2110,6 +2110,10 @@ query_frontend:
# CLI flag: -ruler.query-frontend.query-result-response-format
[query_result_response_format: <string> | default = "protobuf"]

# Maximum number of retries for failed queries per second.
# CLI flag: -ruler.query-frontend.max-retries-rate
[max_retries_rate: <float> | default = 170]

tenant_federation:
# Enable rule groups to query against multiple tenants. The tenant IDs
# involved need to be in the rule group's 'source_tenants' field. If this flag
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
if err != nil {
return nil, err
}
remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, t.Cfg.Querier.EngineConfig.Timeout, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware)
remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, t.Cfg.Querier.EngineConfig.Timeout, 1, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware)

embeddedQueryable = prom_remote.NewSampleAndChunkQueryableClient(
remoteQuerier,
Expand Down
27 changes: 22 additions & 5 deletions pkg/ruler/remotequerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"golang.org/x/exp/slices"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

Expand Down Expand Up @@ -67,6 +68,8 @@ type QueryFrontendConfig struct {
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the rulers and query-frontends."`

QueryResultResponseFormat string `yaml:"query_result_response_format"`

MaxRetriesRate float64 `yaml:"max_retries_rate"`
}

func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -80,6 +83,7 @@ func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) {
c.GRPCClientConfig.RegisterFlagsWithPrefix("ruler.query-frontend.grpc-client-config", f)

f.StringVar(&c.QueryResultResponseFormat, "ruler.query-frontend.query-result-response-format", formatProtobuf, fmt.Sprintf("Format to use when retrieving query results from query-frontends. Supported values: %s", strings.Join(allFormats, ", ")))
f.Float64Var(&c.MaxRetriesRate, "ruler.query-frontend.max-retries-rate", 170, "Maximum number of retries for failed queries per second.")
}

func (c *QueryFrontendConfig) Validate() error {
Expand Down Expand Up @@ -115,6 +119,7 @@ type Middleware func(ctx context.Context, req *httpgrpc.HTTPRequest) error
// RemoteQuerier executes read operations against a httpgrpc.HTTPClient.
type RemoteQuerier struct {
client httpgrpc.HTTPClient
retryLimiter *rate.Limiter
timeout time.Duration
middlewares []Middleware
promHTTPPrefix string
Expand All @@ -130,6 +135,7 @@ var protobufDecoderInstance = protobufDecoder{}
func NewRemoteQuerier(
client httpgrpc.HTTPClient,
timeout time.Duration,
maxRetryRate float64, // maxRetryRate is the maximum number of retries for failed queries per second.
preferredQueryResultResponseFormat string,
prometheusHTTPPrefix string,
logger log.Logger,
Expand All @@ -138,6 +144,7 @@ func NewRemoteQuerier(
return &RemoteQuerier{
client: client,
timeout: timeout,
retryLimiter: rate.NewLimiter(rate.Limit(maxRetryRate), 1),
middlewares: middlewares,
promHTTPPrefix: prometheusHTTPPrefix,
logger: logger,
Expand Down Expand Up @@ -349,12 +356,22 @@ func (q *RemoteQuerier) sendRequest(ctx context.Context, req *httpgrpc.HTTPReque
if !retry.Ongoing() {
return nil, err
}
level.Warn(logger).Log("msg", "failed to remotely evaluate query expression, will retry", "err", err)
retry.Wait()

// Avoid masking last known error if context was cancelled while waiting.
if ctx.Err() != nil {
return nil, fmt.Errorf("%s while retrying request, last err was: %w", ctx.Err(), err)
retryReservation := q.retryLimiter.Reserve()
if !retryReservation.OK() {
// This should only happen if we've misconfigured the limiter.
return nil, fmt.Errorf("couldn't reserve a retry token")
}
// We want to wait at least the time for the backoff, but also don't want to exceed the rate limit.
// All of this is capped to the max backoff, so that we are less likely to overrun into the next evaluation.
retryDelay := max(retry.NextDelay(), min(retryConfig.MaxBackoff, retryReservation.Delay()))
level.Warn(logger).Log("msg", "failed to remotely evaluate query expression, will retry", "err", err, "retry_delay", retryDelay)
select {
case <-time.After(retryDelay):
case <-ctx.Done():
retryReservation.Cancel()
// Avoid masking last known error if context was cancelled while waiting.
return nil, fmt.Errorf("%s while retrying request, last error was: %w", ctx.Err(), err)
}
}
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/ruler/remotequerier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should issue a remote read request", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.NoError(t, err)

Expand All @@ -76,7 +76,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should not inject the read consistency header if none is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.NoError(t, err)

Expand All @@ -86,7 +86,7 @@ func TestRemoteQuerier_Read(t *testing.T) {
t.Run("should inject the read consistency header if it is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())

ctx := api.ContextWithReadConsistencyLevel(context.Background(), api.ReadConsistencyStrong)
_, err := q.Read(ctx, &prompb.Query{}, false)
Expand All @@ -101,7 +101,7 @@ func TestRemoteQuerier_ReadReqTimeout(t *testing.T) {
<-ctx.Done()
return nil, ctx.Err()
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, 1, formatJSON, "/prometheus", log.NewNopLogger())

_, err := q.Read(context.Background(), &prompb.Query{}, false)
require.Error(t, err)
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run(fmt.Sprintf("format = %s", format), func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, format, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, format, "/prometheus", log.NewNopLogger())
_, err := q.Query(context.Background(), "qs", tm)
require.NoError(t, err)

Expand All @@ -165,7 +165,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run("should not inject the read consistency header if none is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
_, err := q.Query(context.Background(), "qs", tm)
require.NoError(t, err)

Expand All @@ -175,7 +175,7 @@ func TestRemoteQuerier_Query(t *testing.T) {
t.Run("should inject the read consistency header if it is defined in the context", func(t *testing.T) {
client, inReq := setup()

q := NewRemoteQuerier(client, time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(client, time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())

ctx := api.ContextWithReadConsistencyLevel(context.Background(), api.ReadConsistencyStrong)
_, err := q.Query(ctx, "qs", tm)
Expand Down Expand Up @@ -276,20 +276,20 @@ func TestRemoteQuerier_QueryRetryOnFailure(t *testing.T) {
}
return testCase.response, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())
require.Equal(t, int64(0), count.Load())
_, err := q.Query(ctx, "qs", time.Now())
if testCase.err == nil {
if testCase.expectedError == nil {
require.NoError(t, err)
} else {
require.Error(t, err)
require.EqualError(t, err, testCase.expectedError.Error())
require.ErrorContains(t, err, testCase.expectedError.Error())
}
require.Equal(t, int64(1), count.Load())
} else {
require.Error(t, err)
require.EqualError(t, err, testCase.expectedError.Error())
require.ErrorContains(t, err, testCase.expectedError.Error())
if testCase.expectedRetries {
require.Greater(t, count.Load(), int64(1))
} else {
Expand Down Expand Up @@ -405,7 +405,7 @@ func TestRemoteQuerier_QueryJSONDecoding(t *testing.T) {
Body: []byte(scenario.body),
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
actual, err := q.Query(context.Background(), "qs", tm)
Expand Down Expand Up @@ -678,7 +678,7 @@ func TestRemoteQuerier_QueryProtobufDecoding(t *testing.T) {
Body: b,
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatProtobuf, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatProtobuf, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
actual, err := q.Query(context.Background(), "qs", tm)
Expand All @@ -701,7 +701,7 @@ func TestRemoteQuerier_QueryUnknownResponseContentType(t *testing.T) {
Body: []byte("some body content"),
}, nil
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
_, err := q.Query(context.Background(), "qs", tm)
Expand All @@ -713,7 +713,7 @@ func TestRemoteQuerier_QueryReqTimeout(t *testing.T) {
<-ctx.Done()
return nil, ctx.Err()
}
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, formatJSON, "/prometheus", log.NewNopLogger())
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Second, 1, formatJSON, "/prometheus", log.NewNopLogger())

tm := time.Unix(1649092025, 515834)
_, err := q.Query(context.Background(), "qs", tm)
Expand Down Expand Up @@ -771,7 +771,7 @@ func TestRemoteQuerier_StatusErrorResponses(t *testing.T) {
return testCase.resp, testCase.err
}
logger := newLoggerWithCounter()
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, formatJSON, "/prometheus", logger)
q := NewRemoteQuerier(mockHTTPGRPCClient(mockClientFn), time.Minute, 1, formatJSON, "/prometheus", logger)

tm := time.Unix(1649092025, 515834)

Expand Down
Loading