Skip to content

Commit

Permalink
add support for http header to filter down queryables (#10552)
Browse files Browse the repository at this point in the history
* add support for http header to filter down queryables

Signed-off-by: Mauro Stettler <[email protected]>

* changelog

Signed-off-by: Mauro Stettler <[email protected]>

* add spdx license header to new files

Signed-off-by: Mauro Stettler <[email protected]>

---------

Signed-off-by: Mauro Stettler <[email protected]>
  • Loading branch information
replay authored Feb 3, 2025
1 parent 33311cc commit 0dbefd6
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* [ENHANCEMENT] Ingester: add `active_series_additional_custom_trackers` configuration, in addition to the already existing `active_series_custom_trackers`. The `active_series_additional_custom_trackers` configuration allows you to configure additional custom trackers that get merged with `active_series_custom_trackers` at runtime. #10428
* [ENHANCEMENT] Query-frontend: Allow blocking raw http requests with the `blocked_requests` configuration. Requests can be blocked based on their path, method or query parameters #10484
* [ENHANCEMENT] Ingester: Added the following metrics exported by `PostingsForMatchers` cache: #10500 #10525
* [ENHANCEMENT] Add support for the HTTP header `X-Filter-Queryables` which allows callers to decide which queryables should be used by the querier, useful for debugging and testing queryables in isolation. #10552
* `cortex_ingester_tsdb_head_postings_for_matchers_cache_hits_total`
* `cortex_ingester_tsdb_head_postings_for_matchers_cache_misses_total`
* `cortex_ingester_tsdb_head_postings_for_matchers_cache_requests_total`
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1887,6 +1887,17 @@
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "filter_queryables_enabled",
"required": false,
"desc": "If set to true, the header 'X-Filter-Queryables' can be used to filter down the list of queryables that shall be used. This is useful to test and monitor single queryables in isolation.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "querier.filter-queryables-enabled",
"fieldType": "boolean",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "max_concurrent",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2031,6 +2031,8 @@ Usage of ./cmd/mimir/mimir:
How often to query DNS for query-frontend or query-scheduler address. (default 10s)
-querier.enable-query-engine-fallback
[experimental] If set to true and the Mimir query engine is in use, fall back to using the Prometheus query engine for any queries not supported by the Mimir query engine. (default true)
-querier.filter-queryables-enabled
If set to true, the header 'X-Filter-Queryables' can be used to filter down the list of queryables that shall be used. This is useful to test and monitor single queryables in isolation.
-querier.frontend-address string
Address of the query-frontend component, in host:port format. If multiple query-frontends are running, the host should be a DNS resolving to all query-frontend instances. This option should be set only when query-scheduler component is not in use.
-querier.frontend-client.backoff-max-period duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1474,6 +1474,12 @@ store_gateway_client:
# CLI flag: -querier.enable-query-engine-fallback
[enable_query_engine_fallback: <boolean> | default = true]
# (advanced) If set to true, the header 'X-Filter-Queryables' can be used to
# filter down the list of queryables that shall be used. This is useful to test
# and monitor single queryables in isolation.
# CLI flag: -querier.filter-queryables-enabled
[filter_queryables_enabled: <boolean> | default = false]
# The number of workers running in each querier process. This setting limits the
# maximum number of concurrent queries in each querier. The minimum value is
# four; lower values are ignored and set to the minimum
Expand Down
4 changes: 4 additions & 0 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,10 @@ func (t *Mimir) initDistributor() (serv services.Service, err error) {
func (t *Mimir) initQueryable() (serv services.Service, err error) {
registerer := prometheus.WrapRegistererWith(querierEngine, t.Registerer)

if t.Cfg.Querier.FilterQueryablesEnabled {
t.Server.HTTP.Use(querier.FilterQueryablesMiddleware().Wrap)
}

// Create a querier queryable and PromQL engine
t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine, err = querier.New(
t.Cfg.Querier, t.Overrides, t.Distributor, t.AdditionalStorageQueryables, registerer, util_log.Logger, t.ActivityTracker,
Expand Down
59 changes: 59 additions & 0 deletions pkg/querier/filter_queryables.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querier

import (
"context"
"net/http"
"strings"

"github.com/grafana/dskit/middleware"
)

type (
filterQueryablesCtxKeyT int
filterQueryables map[string]struct{}
)

const (
FilterQueryablesHeader = "X-Filter-Queryables"
filterQueryablesCtxKey filterQueryablesCtxKeyT = 0
)

func newFilterQueryables(asString string) filterQueryables {
f := make(filterQueryables)
for _, name := range strings.Split(asString, ",") {
f[strings.Trim(name, " ")] = struct{}{}
}
return f
}

func (f filterQueryables) use(name string) bool {
_, ok := f[name]
return ok
}

func FilterQueryablesMiddleware() middleware.Interface {
return middleware.Func(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if filterQueryables := req.Header.Get(FilterQueryablesHeader); len(filterQueryables) > 0 {
req = req.WithContext(addFilterQueryablesToContext(req.Context(), filterQueryables))
}

next.ServeHTTP(w, req)
})
})
}

func addFilterQueryablesToContext(ctx context.Context, value string) context.Context {
return context.WithValue(ctx, filterQueryablesCtxKey, newFilterQueryables(value))
}

func getFilterQueryablesFromContext(ctx context.Context) (filterQueryables, bool) {
value, ok := ctx.Value(filterQueryablesCtxKey).(filterQueryables)
if !ok {
return nil, false
}

return value, true
}
144 changes: 144 additions & 0 deletions pkg/querier/filter_queryables_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querier

import (
"context"
"testing"
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/util/validation"
)

func TestFilteringQueryablesViaHttpHeader(t *testing.T) {
type testCase struct {
transformContext func(context.Context) context.Context
expectedQuerier1Calls int
expectedQuerier2Calls int
}

runTestCase := func(t *testing.T, tc testCase) {
ctx := context.Background()
logger := log.NewNopLogger()
reg := prometheus.NewPedanticRegistry()
metrics := stats.NewQueryMetrics(reg)
overrides, err := validation.NewOverrides(defaultLimitsConfig(), nil)
require.NoError(t, err)

cfg := Config{}
flagext.DefaultValues(&cfg)

matcher := labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "metric")
expectedMatchers := []*labels.Matcher{matcher}
querier1 := &mockBlocksStorageQuerier{}
querier1.On("Select", mock.Anything, true, mock.Anything, expectedMatchers).Return(storage.EmptySeriesSet())
querier2 := &mockBlocksStorageQuerier{}
querier2.On("Select", mock.Anything, true, mock.Anything, expectedMatchers).Return(storage.EmptySeriesSet())

alwaysApplicable := func(_ context.Context, _ string, _ time.Time, _, _ int64, _ log.Logger, _ ...*labels.Matcher) bool {
return true
}
querierQueryables := []TimeRangeQueryable{{
Queryable: newMockBlocksStorageQueryable(querier1),
IsApplicable: alwaysApplicable,
StorageName: "querier1",
}, {
Queryable: newMockBlocksStorageQueryable(querier2),
IsApplicable: alwaysApplicable,
StorageName: "querier2",
}}

queryable := newQueryable(querierQueryables, cfg, overrides, metrics, logger)
querier, err := queryable.Querier(0, 10)
require.NoError(t, err)

ctx = user.InjectOrgID(ctx, "0")
ctx = tc.transformContext(ctx)
series := querier.Select(ctx, false, nil, matcher)
require.NoError(t, series.Err())

assert.Equal(t, tc.expectedQuerier1Calls, len(querier1.Calls))
assert.Equal(t, tc.expectedQuerier2Calls, len(querier2.Calls))
}

t.Run("do not set header", func(t *testing.T) {
runTestCase(t, testCase{
transformContext: func(ctx context.Context) context.Context {
return ctx
},
expectedQuerier1Calls: 1,
expectedQuerier2Calls: 1,
})
})

t.Run("set header to unknown queryable", func(t *testing.T) {
runTestCase(t, testCase{
transformContext: func(ctx context.Context) context.Context {
return addFilterQueryablesToContext(ctx, "querier3")
},
expectedQuerier1Calls: 0,
expectedQuerier2Calls: 0,
})
})

t.Run("set header to only querier1", func(t *testing.T) {
runTestCase(t, testCase{
transformContext: func(ctx context.Context) context.Context {
return addFilterQueryablesToContext(ctx, "querier1")
},
expectedQuerier1Calls: 1,
expectedQuerier2Calls: 0,
})
})

t.Run("set header to only querier2", func(t *testing.T) {
runTestCase(t, testCase{
transformContext: func(ctx context.Context) context.Context {
return addFilterQueryablesToContext(ctx, "querier2")
},
expectedQuerier1Calls: 0,
expectedQuerier2Calls: 1,
})
})

t.Run("set header to both querier1 and querier2", func(t *testing.T) {
runTestCase(t, testCase{
transformContext: func(ctx context.Context) context.Context {
return addFilterQueryablesToContext(ctx, "querier2,querier1")
},
expectedQuerier1Calls: 1,
expectedQuerier2Calls: 1,
})
})

t.Run("set header to querier1 and querier2 and unknown querier", func(t *testing.T) {
runTestCase(t, testCase{
transformContext: func(ctx context.Context) context.Context {
return addFilterQueryablesToContext(ctx, "querier2,querier1,querier3")
},
expectedQuerier1Calls: 1,
expectedQuerier2Calls: 1,
})
})

t.Run("set header to querier1 and unknown querier, with spaces", func(t *testing.T) {
runTestCase(t, testCase{
transformContext: func(ctx context.Context) context.Context {
return addFilterQueryablesToContext(ctx, "querier1 , querier3")
},
expectedQuerier1Calls: 1,
expectedQuerier2Calls: 0,
})
})
}
26 changes: 22 additions & 4 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type Config struct {
QueryEngine string `yaml:"query_engine" category:"experimental"`
EnableQueryEngineFallback bool `yaml:"enable_query_engine_fallback" category:"experimental"`

FilterQueryablesEnabled bool `yaml:"filter_queryables_enabled" category:"advanced"`

// PromQL engine config.
EngineConfig engine.Config `yaml:",inline"`
}
Expand Down Expand Up @@ -87,6 +89,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.QueryEngine, "querier.query-engine", prometheusEngine, fmt.Sprintf("Query engine to use, either '%v' or '%v'", prometheusEngine, mimirEngine))
f.BoolVar(&cfg.EnableQueryEngineFallback, "querier.enable-query-engine-fallback", true, "If set to true and the Mimir query engine is in use, fall back to using the Prometheus query engine for any queries not supported by the Mimir query engine.")

f.BoolVar(&cfg.FilterQueryablesEnabled, "querier.filter-queryables-enabled", false, "If set to true, the header 'X-Filter-Queryables' can be used to filter down the list of queryables that shall be used. This is useful to test and monitor single queryables in isolation.")

cfg.EngineConfig.RegisterFlags(f)
}

Expand Down Expand Up @@ -138,7 +142,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, quer
queryables = append(queryables, TimeRangeQueryable{
Queryable: NewDistributorQueryable(distributor, limits, queryMetrics, logger),
StorageName: "ingester",
IsApplicable: func(_ context.Context, tenantID string, now time.Time, _, queryMaxT int64, _ ...*labels.Matcher) bool {
IsApplicable: func(_ context.Context, tenantID string, now time.Time, _, queryMaxT int64, _ log.Logger, _ ...*labels.Matcher) bool {
return ShouldQueryIngesters(limits.QueryIngestersWithin(tenantID), now, queryMaxT)
},
})
Expand Down Expand Up @@ -234,15 +238,15 @@ func newQueryable(
// TimeRangeQueryable is a Queryable that is aware of when it is applicable.
type TimeRangeQueryable struct {
storage.Queryable
IsApplicable func(ctx context.Context, tenantID string, now time.Time, queryMinT, queryMaxT int64, matchers ...*labels.Matcher) bool
IsApplicable func(ctx context.Context, tenantID string, now time.Time, queryMinT, queryMaxT int64, logger log.Logger, matchers ...*labels.Matcher) bool
StorageName string
}

func NewStoreGatewayTimeRangeQueryable(q storage.Queryable, querierConfig Config) TimeRangeQueryable {
return TimeRangeQueryable{
Queryable: q,
StorageName: "store-gateway",
IsApplicable: func(_ context.Context, _ string, now time.Time, queryMinT, _ int64, _ ...*labels.Matcher) bool {
IsApplicable: func(_ context.Context, _ string, now time.Time, queryMinT, _ int64, _ log.Logger, _ ...*labels.Matcher) bool {
return ShouldQueryBlockStore(querierConfig.QueryStoreAfter, now, queryMinT)
},
}
Expand All @@ -261,6 +265,9 @@ type multiQuerier struct {
}

func (mq multiQuerier) getQueriers(ctx context.Context, matchers ...*labels.Matcher) (context.Context, []storage.Querier, error) {
spanLog, ctx := spanlogger.NewWithLogger(ctx, mq.logger, "multiQuerier.getQueriers")
defer spanLog.Span.Finish()

now := time.Now()

tenantID, err := tenant.TenantID(ctx)
Expand All @@ -282,8 +289,19 @@ func (mq multiQuerier) getQueriers(ctx context.Context, matchers ...*labels.Matc
}

var queriers []storage.Querier
useQueryables, filterUsedQueryables := getFilterQueryablesFromContext(ctx)
for _, queryable := range mq.queryables {
if queryable.IsApplicable(ctx, tenantID, now, mq.minT, mq.maxT, matchers...) {
if filterUsedQueryables {
if !useQueryables.use(queryable.StorageName) {
level.Debug(spanLog).Log("queryable_name", queryable.StorageName, "use_queryable", false)
// Skip this queryable if it's not in the list of queryables to use.
continue
}
}

isApplicable := queryable.IsApplicable(ctx, tenantID, now, mq.minT, mq.maxT, mq.logger, matchers...)
level.Debug(spanLog).Log("queryable_name", queryable.StorageName, "use_queryable", true, "is_applicable", isApplicable)
if isApplicable {
q, err := queryable.Querier(mq.minT, mq.maxT)
if err != nil {
return nil, nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func TestQuerier(t *testing.T) {
db, through := mockTSDB(t, model.Time(0), int(chunks*samplesPerChunk), sampleRate, chunkOffset, int(samplesPerChunk), q.valueType)
dbQueryable := TimeRangeQueryable{
Queryable: db,
IsApplicable: func(_ context.Context, _ string, _ time.Time, _, _ int64, _ ...*labels.Matcher) bool {
IsApplicable: func(_ context.Context, _ string, _ time.Time, _, _ int64, _ log.Logger, _ ...*labels.Matcher) bool {
return true
},
}
Expand Down

0 comments on commit 0dbefd6

Please sign in to comment.