Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ruler query failure reporting #4335

Merged
merged 5 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* [ENHANCEMENT] Memberlist: optimized receive path for processing ring state updates, to help reduce CPU utilization in large clusters. #4345
* [ENHANCEMENT] Memberlist: expose configuration of memberlist packet compression via `-memberlist.compression=enabled`. #4346
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335

## 1.10.0-rc.0 / 2021-06-28

Expand Down
12 changes: 8 additions & 4 deletions integration/e2e/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@ const (
)

// NewMinio returns minio server, used as a local replacement for S3.
func NewMinio(port int, bktName string) *e2e.HTTPService {
func NewMinio(port int, bktNames ...string) *e2e.HTTPService {
minioKESGithubContent := "https://raw.githubusercontent.com/minio/kes/master"
commands := []string{
"curl -sSL --tlsv1.2 -O '%s/root.key' -O '%s/root.cert'",
"mkdir -p /data/%s && minio server --address :%v --quiet /data",
fmt.Sprintf("curl -sSL --tlsv1.2 -O '%s/root.key' -O '%s/root.cert'", minioKESGithubContent, minioKESGithubContent),
}

for _, bkt := range bktNames {
commands = append(commands, fmt.Sprintf("mkdir -p /data/%s", bkt))
}
commands = append(commands, fmt.Sprintf("minio server --address :%v --quiet /data", port))

m := e2e.NewHTTPService(
fmt.Sprintf("minio-%v", port),
images.Minio,
// Create the "cortex" bucket before starting minio
e2e.NewCommandWithoutEntrypoint("sh", "-c", fmt.Sprintf(strings.Join(commands, " && "), minioKESGithubContent, minioKESGithubContent, bktName, port)),
e2e.NewCommandWithoutEntrypoint("sh", "-c", strings.Join(commands, " && ")),
e2e.NewHTTPReadinessProbe(port, "/minio/health/ready", 200, 200),
port,
)
Expand Down
5 changes: 5 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) err
}

defer res.Body.Close()

if res.StatusCode != 202 {
return fmt.Errorf("unexpected status code: %d", res.StatusCode)
}

return nil
}

Expand Down
160 changes: 160 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,166 @@ func TestRulerAlertmanagerTLS(t *testing.T) {
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_notifications_alertmanagers_discovered"}, e2e.WaitMissingMetrics))
}

func TestRulerMetricsForInvalidQueries(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
flags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(false),
map[string]string{
// Since we're not going to run any rule (our only rule is invalid), we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
// Evaluate rules often, so that we don't need to wait for metrics to show up.
"-ruler.evaluation-interval": "2s",
"-ruler.poll-interval": "2s",
// No delay
"-ruler.evaluation-delay-duration": "0",

"-blocks-storage.tsdb.block-ranges-period": "1h",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": "2h",

// We run single ingester only, no replication.
"-distributor.replication-factor": "1",

// Very low limit so that ruler hits it.
"-querier.max-fetched-chunks-per-query": "5",
// We need this to make limit work.
"-ingester.stream-chunks-when-using-blocks": "true",
},
)

const namespace = "test"
const user = "user"

distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler))

// Wait until both the distributor and ruler have updated the ring. The querier will also watch
// the store-gateway ring if blocks sharding is enabled.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)

// Push some series to Cortex -- enough so that we can hit some limits.
for i := 0; i < 10; i++ {
series, _ := generateSeries("metric", time.Now(), prompb.Label{Name: "foo", Value: fmt.Sprintf("%d", i)})

res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
}

totalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"})
require.NoError(t, err)

// Verify that user-failures don't increase cortex_ruler_queries_failed_total
for groupName, expression := range map[string]string{
// Syntactically correct expression (passes check in ruler), but failing because of invalid regex. This fails in PromQL engine.
"invalid_group": `label_replace(metric, "foo", "$1", "service", "[")`,

// This one fails in querier code, because of limits.
"too_many_chunks_group": `sum(metric)`,
} {
t.Run(groupName, func(t *testing.T) {
require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, "rule", expression), namespace))
m := ruleGroupMatcher(user, namespace, groupName)

// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// Verify that evaluation of the rule failed.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// But these failures were not reported as "failed queries"
sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"})
require.NoError(t, err)
require.Equal(t, float64(0), sum[0])

// Delete rule before checkin "cortex_ruler_queries_total", as we want to reuse value for next test.
require.NoError(t, c.DeleteRuleGroup(namespace, groupName))

// Wait until ruler has unloaded the group. We don't use any matcher, so there should be no groups (in fact, metric disappears).
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_group_rules"}, e2e.SkipMissingMetrics))

// Check that cortex_ruler_queries_total went up since last test.
newTotalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"})
require.NoError(t, err)
require.Greater(t, newTotalQueries[0], totalQueries[0])

// Remember totalQueries for next test.
totalQueries = newTotalQueries
})
}

// Now let's upload a non-failing rule, and make sure that it works.
t.Run("real_error", func(t *testing.T) {
const groupName = "good_rule"
const expression = `sum(metric{foo=~"1|2"})`

require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, "rule", expression), namespace))
m := ruleGroupMatcher(user, namespace, groupName)

// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// Wait until rule group has tried to evaluate the rule, and succeeded.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// Still no failures.
sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"})
require.NoError(t, err)
require.Equal(t, float64(0), sum[0])

// Now let's stop ingester, and recheck metrics. This should increase cortex_ruler_queries_failed_total failures.
require.NoError(t, s.Stop(ingester))

// We should start getting "real" failures now.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_failed_total"}))
})
}

func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher {
return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName))
}

func ruleGroupWithRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup {
// Prepare rule group with invalid rule.
var recordNode = yaml.Node{}
var exprNode = yaml.Node{}

recordNode.SetString(ruleName)
exprNode.SetString(expression)

return rulefmt.RuleGroup{
Name: groupName,
Interval: 10,
Rules: []rulefmt.RuleNode{{
Record: recordNode,
Expr: exprNode,
}},
}
}

func createTestRuleGroup(t *testing.T) rulefmt.RuleGroup {
t.Helper()

Expand Down
2 changes: 1 addition & 1 deletion pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func NewQuerierHandler(

api := v1.NewAPI(
engine,
querier.NewErrorTranslateQueryable(queryable), // Translate errors to errors expected by API.
querier.NewErrorTranslateSampleAndChunkQueryable(queryable), // Translate errors to errors expected by API.
nil, // No remote write support.
exemplarQueryable,
func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} },
Expand Down
72 changes: 52 additions & 20 deletions pkg/querier/error_translate_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,72 +69,103 @@ func TranslateToPromqlAPIError(err error) error {
}
}

func NewErrorTranslateQueryable(q storage.SampleAndChunkQueryable) storage.SampleAndChunkQueryable {
return errorTranslateQueryable{q}
// ErrTranslateFn is used to translate or wrap error before returning it by functions in
// storage.SampleAndChunkQueryable interface.
// Input error may be nil.
type ErrTranslateFn func(err error) error

func NewErrorTranslateQueryable(q storage.Queryable) storage.Queryable {
return NewErrorTranslateQueryableWithFn(q, TranslateToPromqlAPIError)
}

func NewErrorTranslateQueryableWithFn(q storage.Queryable, fn ErrTranslateFn) storage.Queryable {
return errorTranslateQueryable{q: q, fn: fn}
}

func NewErrorTranslateSampleAndChunkQueryable(q storage.SampleAndChunkQueryable) storage.SampleAndChunkQueryable {
return NewErrorTranslateSampleAndChunkQueryableWithFn(q, TranslateToPromqlAPIError)
}

func NewErrorTranslateSampleAndChunkQueryableWithFn(q storage.SampleAndChunkQueryable, fn ErrTranslateFn) storage.SampleAndChunkQueryable {
return errorTranslateSampleAndChunkQueryable{q: q, fn: fn}
}

type errorTranslateQueryable struct {
q storage.SampleAndChunkQueryable
q storage.Queryable
fn ErrTranslateFn
}

func (e errorTranslateQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
q, err := e.q.Querier(ctx, mint, maxt)
return errorTranslateQuerier{q: q}, TranslateToPromqlAPIError(err)
return errorTranslateQuerier{q: q, fn: e.fn}, e.fn(err)
}

type errorTranslateSampleAndChunkQueryable struct {
q storage.SampleAndChunkQueryable
fn ErrTranslateFn
}

func (e errorTranslateSampleAndChunkQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
q, err := e.q.Querier(ctx, mint, maxt)
return errorTranslateQuerier{q: q, fn: e.fn}, e.fn(err)
}

func (e errorTranslateQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
func (e errorTranslateSampleAndChunkQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) {
q, err := e.q.ChunkQuerier(ctx, mint, maxt)
return errorTranslateChunkQuerier{q: q}, TranslateToPromqlAPIError(err)
return errorTranslateChunkQuerier{q: q, fn: e.fn}, e.fn(err)
}

type errorTranslateQuerier struct {
q storage.Querier
q storage.Querier
fn ErrTranslateFn
}

func (e errorTranslateQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelValues(name, matchers...)
return values, warnings, TranslateToPromqlAPIError(err)
return values, warnings, e.fn(err)
}

func (e errorTranslateQuerier) LabelNames() ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelNames()
return values, warnings, TranslateToPromqlAPIError(err)
return values, warnings, e.fn(err)
}

func (e errorTranslateQuerier) Close() error {
return TranslateToPromqlAPIError(e.q.Close())
return e.fn(e.q.Close())
}

func (e errorTranslateQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
s := e.q.Select(sortSeries, hints, matchers...)
return errorTranslateSeriesSet{s}
return errorTranslateSeriesSet{s: s, fn: e.fn}
}

type errorTranslateChunkQuerier struct {
q storage.ChunkQuerier
q storage.ChunkQuerier
fn ErrTranslateFn
}

func (e errorTranslateChunkQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelValues(name, matchers...)
return values, warnings, TranslateToPromqlAPIError(err)
return values, warnings, e.fn(err)
}

func (e errorTranslateChunkQuerier) LabelNames() ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelNames()
return values, warnings, TranslateToPromqlAPIError(err)
return values, warnings, e.fn(err)
}

func (e errorTranslateChunkQuerier) Close() error {
return TranslateToPromqlAPIError(e.q.Close())
return e.fn(e.q.Close())
}

func (e errorTranslateChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet {
s := e.q.Select(sortSeries, hints, matchers...)
return errorTranslateChunkSeriesSet{s}
return errorTranslateChunkSeriesSet{s: s, fn: e.fn}
}

type errorTranslateSeriesSet struct {
s storage.SeriesSet
s storage.SeriesSet
fn ErrTranslateFn
}

func (e errorTranslateSeriesSet) Next() bool {
Expand All @@ -146,15 +177,16 @@ func (e errorTranslateSeriesSet) At() storage.Series {
}

func (e errorTranslateSeriesSet) Err() error {
return TranslateToPromqlAPIError(e.s.Err())
return e.fn(e.s.Err())
}

func (e errorTranslateSeriesSet) Warnings() storage.Warnings {
return e.s.Warnings()
}

type errorTranslateChunkSeriesSet struct {
s storage.ChunkSeriesSet
s storage.ChunkSeriesSet
fn ErrTranslateFn
}

func (e errorTranslateChunkSeriesSet) Next() bool {
Expand All @@ -166,7 +198,7 @@ func (e errorTranslateChunkSeriesSet) At() storage.ChunkSeries {
}

func (e errorTranslateChunkSeriesSet) Err() error {
return TranslateToPromqlAPIError(e.s.Err())
return e.fn(e.s.Err())
}

func (e errorTranslateChunkSeriesSet) Warnings() storage.Warnings {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/error_translate_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestApiStatusCodes(t *testing.T) {
"error from seriesset": errorTestQueryable{q: errorTestQuerier{s: errorTestSeriesSet{err: tc.err}}},
} {
t.Run(fmt.Sprintf("%s/%d", k, ix), func(t *testing.T) {
r := createPrometheusAPI(errorTranslateQueryable{q: q})
r := createPrometheusAPI(NewErrorTranslateSampleAndChunkQueryable(q))
rec := httptest.NewRecorder()

req := httptest.NewRequest("GET", "/api/v1/query?query=up", nil)
Expand Down
Loading