Skip to content

Commit

Permalink
Use a CB instead of a rate limtier
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov committed Jan 9, 2025
1 parent b020a8f commit b07366f
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 77 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236
* [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256
* [CHANGE] Query-frontend: Add `topic` label to `cortex_ingest_storage_strong_consistency_requests_total`, `cortex_ingest_storage_strong_consistency_failures_total`, and `cortex_ingest_storage_strong_consistency_wait_duration_seconds` metrics. #10220
* [CHANGE] Ruler: cap the rate of retries for remote query evaluation to 170/sec. This is configurable via `-ruler.query-frontend.max-retries-rate`. #10375
* [CHANGE] Ruler: do not retry remote ruler evaluations when more than 50% of evaluations have failed in the last 5 seconds. #10375
* [ENHANCEMENT] Query Frontend: Return server-side `samples_processed` statistics. #10103
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
* [ENHANCEMENT] Distributor: discard float and histogram samples with duplicated timestamps from each timeseries in a request before the request is forwarded to ingesters. Discarded samples are tracked by the `cortex_discarded_samples_total` metrics with reason `sample_duplicate_timestamp`. #10145
Expand Down
10 changes: 0 additions & 10 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -13216,16 +13216,6 @@
"fieldDefaultValue": "protobuf",
"fieldFlag": "ruler.query-frontend.query-result-response-format",
"fieldType": "string"
},
{
"kind": "field",
"name": "max_retries_rate",
"required": false,
"desc": "Maximum number of retries for failed queries per second.",
"fieldValue": null,
"fieldDefaultValue": 170,
"fieldFlag": "ruler.query-frontend.max-retries-rate",
"fieldType": "float"
}
],
"fieldValue": null,
Expand Down
2 changes: 0 additions & 2 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2985,8 +2985,6 @@ Usage of ./cmd/mimir/mimir:
Override the default minimum TLS version. Allowed values: VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13
-ruler.query-frontend.grpc-client-config.tls-server-name string
Override the expected name on the server certificate.
-ruler.query-frontend.max-retries-rate float
Maximum number of retries for failed queries per second. (default 170)
-ruler.query-frontend.query-result-response-format string
Format to use when retrieving query results from query-frontends. Supported values: json, protobuf (default "protobuf")
-ruler.query-stats-enabled
Expand Down
2 changes: 0 additions & 2 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -737,8 +737,6 @@ Usage of ./cmd/mimir/mimir:
Maximum number of rules per rule group by namespace. Value is a map, where each key is the namespace and value is the number of rules allowed in the namespace (int). On the command line, this map is given in a JSON format. The number of rules specified has the same meaning as -ruler.max-rules-per-rule-group, but only applies for the specific namespace. If specified, it supersedes -ruler.max-rules-per-rule-group. (default {})
-ruler.query-frontend.address string
GRPC listen address of the query-frontend(s). Must be a DNS address (prefixed with dns:///) to enable client side load balancing.
-ruler.query-frontend.max-retries-rate float
Maximum number of retries for failed queries per second. (default 170)
-ruler.query-frontend.query-result-response-format string
Format to use when retrieving query results from query-frontends. Supported values: json, protobuf (default "protobuf")
-ruler.recording-rules-evaluation-enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2103,10 +2103,6 @@ query_frontend:
# CLI flag: -ruler.query-frontend.query-result-response-format
[query_result_response_format: <string> | default = "protobuf"]
# Maximum number of retries for failed queries per second.
# CLI flag: -ruler.query-frontend.max-retries-rate
[max_retries_rate: <float> | default = 170]
tenant_federation:
# Enable rule groups to query against multiple tenants. The tenant IDs
# involved need to be in the rule group's 'source_tenants' field. If this flag
Expand Down
2 changes: 1 addition & 1 deletion pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
if err != nil {
return nil, err
}
remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, t.Cfg.Querier.EngineConfig.Timeout, 1, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware)
remoteQuerier := ruler.NewRemoteQuerier(queryFrontendClient, t.Cfg.Querier.EngineConfig.Timeout, t.Cfg.Ruler.QueryFrontend.QueryResultResponseFormat, t.Cfg.API.PrometheusHTTPPrefix, util_log.Logger, ruler.WithOrgIDMiddleware)

embeddedQueryable = prom_remote.NewSampleAndChunkQueryableClient(
remoteQuerier,
Expand Down
101 changes: 63 additions & 38 deletions pkg/ruler/remotequerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"
"time"

"github.com/failsafe-go/failsafe-go/circuitbreaker"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
Expand All @@ -31,7 +32,6 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"

Expand All @@ -55,6 +55,10 @@ const (

formatJSON = "json"
formatProtobuf = "protobuf"

cbRetryFailurePercentage = 50 // 50%
cbRetryFailureThreshold = 10 // 10 failed queries
cbPeriod = 5 * time.Second
)

var allFormats = []string{formatJSON, formatProtobuf}
Expand All @@ -68,8 +72,6 @@ type QueryFrontendConfig struct {
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the rulers and query-frontends."`

QueryResultResponseFormat string `yaml:"query_result_response_format"`

MaxRetriesRate float64 `yaml:"max_retries_rate"`
}

func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -83,7 +85,6 @@ func (c *QueryFrontendConfig) RegisterFlags(f *flag.FlagSet) {
c.GRPCClientConfig.RegisterFlagsWithPrefix("ruler.query-frontend.grpc-client-config", f)

f.StringVar(&c.QueryResultResponseFormat, "ruler.query-frontend.query-result-response-format", formatProtobuf, fmt.Sprintf("Format to use when retrieving query results from query-frontends. Supported values: %s", strings.Join(allFormats, ", ")))
f.Float64Var(&c.MaxRetriesRate, "ruler.query-frontend.max-retries-rate", 170, "Maximum number of retries for failed queries per second.")
}

func (c *QueryFrontendConfig) Validate() error {
Expand Down Expand Up @@ -119,7 +120,7 @@ type Middleware func(ctx context.Context, req *httpgrpc.HTTPRequest) error
// RemoteQuerier executes read operations against a httpgrpc.HTTPClient.
type RemoteQuerier struct {
client httpgrpc.HTTPClient
retryLimiter *rate.Limiter
retryCB circuitbreaker.CircuitBreaker[struct{}]
timeout time.Duration
middlewares []Middleware
promHTTPPrefix string
Expand All @@ -135,16 +136,19 @@ var protobufDecoderInstance = protobufDecoder{}
func NewRemoteQuerier(
client httpgrpc.HTTPClient,
timeout time.Duration,
maxRetryRate float64, // maxRetryRate is the maximum number of retries for failed queries per second.
preferredQueryResultResponseFormat string,
prometheusHTTPPrefix string,
logger log.Logger,
middlewares ...Middleware,
) *RemoteQuerier {
retryCB := circuitbreaker.Builder[struct{}]().
WithDelay(cbPeriod). // disable retries for 5s when the CB opens
WithFailureRateThreshold(cbRetryFailurePercentage, cbRetryFailureThreshold, cbPeriod).
Build()
return &RemoteQuerier{
client: client,
timeout: timeout,
retryLimiter: rate.NewLimiter(rate.Limit(maxRetryRate), 1),
retryCB: retryCB,
middlewares: middlewares,
promHTTPPrefix: prometheusHTTPPrefix,
logger: logger,
Expand Down Expand Up @@ -313,6 +317,13 @@ func (q *RemoteQuerier) createRequest(ctx context.Context, query string, ts time
return req, nil
}

// sendRequest sends the request and retries if possible. It attempts each query 3 times. Only server errors are retried.
// It also respects a circuit breaker for retries. All evaluations are attempted at least once, but retries aren't guaranteed.
// Retries are disabled when the CB is open. The CB opens and closes based on the outcomes of all requests, not only retries.
// The goal is to
// - have a more predictable load on the remote query-frontend
// - not shed query evaluations unnecessarily
// - limit the added load of retries when most queries are failing already.
func (q *RemoteQuerier) sendRequest(ctx context.Context, req *httpgrpc.HTTPRequest, logger log.Logger) (*httpgrpc.HTTPResponse, error) {
// Ongoing request may be cancelled during evaluation due to some transient error or server shutdown,
// so we'll keep retrying until we get a successful response or backoff is terminated.
Expand All @@ -325,44 +336,25 @@ func (q *RemoteQuerier) sendRequest(ctx context.Context, req *httpgrpc.HTTPReque

for {
resp, err := q.client.Handle(ctx, req)
if err == nil {
// Responses with status codes 4xx should always be considered erroneous.
// These errors shouldn't be retried because it is expected that
// running the same query gives rise to the same 4xx error.
if resp.Code/100 == 4 {
return nil, httpgrpc.ErrorFromHTTPResponse(resp)
}
return resp, nil
}

// Bail out if the error is known to be not retriable.
switch code := grpcutil.ErrorToStatusCode(err); code {
case codes.ResourceExhausted:
// In case the server is configured with "grpc-max-send-msg-size-bytes",
// and the response exceeds this limit, there is no point retrying the request.
// This is a special case, refer to grafana/mimir#7216.
if strings.Contains(err.Error(), "message larger than max") {
return nil, err
}
default:
// In case the error was a wrapped HTTPResponse, its code represents HTTP status;
// 4xx errors shouldn't be retried because it is expected that
// running the same query gives rise to the same 4xx error.
if code/100 == 4 {
return nil, err
}
canRetry, err := parseRemoteEvalError(err, resp)
if err == nil || !canRetry {
// An error we can't retry is the same as a successful request. For example, invalid promQL queries.
q.retryCB.RecordSuccess()
return resp, err
}
q.retryCB.RecordFailure()

if !retry.Ongoing() {
return nil, err
}
retry.Wait()

// The backoff wait spreads the retries. We check the rate limiter only after that spread.
// That should give queries a bigger chance at passing through the rate limiter
// in cases of short error bursts and the rate of failed queries being larger than the rate limit.
if !q.retryLimiter.Allow() {
return nil, fmt.Errorf("exhausted global retry budget for remote ruler execution (ruler.query-frontend.max-retries-rate); last error was: %w", err)
// The backoff wait spreads the retries. We check the circuit breaker (CB) only after that spread.
// That should give queries a bigger chance at passing through the CB
// in cases when the CB closed of half-closed while we were waiting.
if !q.retryCB.TryAcquirePermit() {
return nil, fmt.Errorf("remote evaluation retries have been disabled because there were too many failed evaluations recently (more than %d%% in the last %s); last error was: %w",
cbRetryFailurePercentage, cbPeriod, err)
}
level.Warn(logger).Log("msg", "failed to remotely evaluate query expression, will retry", "err", err)

Expand All @@ -373,6 +365,39 @@ func (q *RemoteQuerier) sendRequest(ctx context.Context, req *httpgrpc.HTTPReque
}
}

// parseRemoteEvalError returns the error to be propagated and a boolean indicating whether the error is retriable.
// If the error is nil, then it is not considered retriable.
func parseRemoteEvalError(err error, resp *httpgrpc.HTTPResponse) (bool, error) {
if err == nil {
// Responses with status codes 4xx should always be considered erroneous.
// These errors shouldn't be retried because it is expected that
// running the same query gives rise to the same 4xx error.
if resp.Code/100 == 4 {
return false, httpgrpc.ErrorFromHTTPResponse(resp)
}
return false, nil
}

// Bail out if the error is known to be not retriable.
switch code := grpcutil.ErrorToStatusCode(err); code {
case codes.ResourceExhausted:
// In case the server is configured with "grpc-max-send-msg-size-bytes",
// and the response exceeds this limit, there is no point retrying the request.
// This is a special case, refer to grafana/mimir#7216.
if strings.Contains(err.Error(), "message larger than max") {
return false, err
}
default:
// In case the error was a wrapped HTTPResponse, its code represents HTTP status;
// 4xx errors shouldn't be retried because it is expected that
// running the same query gives rise to the same 4xx error.
if code/100 == 4 {
return false, err
}
}
return true, err
}

// WithOrgIDMiddleware attaches 'X-Scope-OrgID' header value to the outgoing request by inspecting the passed context.
// In case the expression to evaluate corresponds to a federated rule, the ExtractTenantIDs function will take care
// of normalizing and concatenating source tenants by separating them with a '|' character.
Expand Down
Loading

0 comments on commit b07366f

Please sign in to comment.