Skip to content

Commit

Permalink
Use more of the cortex frontend module to deduplicate some code and a…
Browse files Browse the repository at this point in the history
…dd the memcached backend (#3114)

* Use more of the cortex frontend module to deduplicate a lot code

Signed-off-by: Krasi Georgiev <[email protected]>

* tidy up the configs

Signed-off-by: Krasi Georgiev <[email protected]>

* review nits

Signed-off-by: Krasi Georgiev <[email protected]>

* use own structs

Signed-off-by: Krasi Georgiev <[email protected]>

* use deducated struct for query range and add validation

Signed-off-by: Krasi Georgiev <[email protected]>

* remove the memcache tests as these are already tested in cortex

Signed-off-by: Krasi Georgiev <[email protected]>

* fixed the tests

Signed-off-by: Krasi Georgiev <[email protected]>

* nit

Signed-off-by: Krasi Georgiev <[email protected]>

* go mod tidy

Signed-off-by: Krasi Georgiev <[email protected]>

* generate docs

Signed-off-by: Krasi Georgiev <[email protected]>

* nits

Signed-off-by: Krasi Georgiev <[email protected]>

* rearange imports, link to cortex docs

Signed-off-by: Krasi Georgiev <[email protected]>

* updated the changelog

Signed-off-by: Krasi Georgiev <[email protected]>

* added missing license header

Signed-off-by: Krasi Georgiev <[email protected]>

* fix tests

Signed-off-by: Krasi Georgiev <[email protected]>

* nits

Signed-off-by: Krasi Georgiev <[email protected]>

* Bartek review changes

Signed-off-by: Krasi Georgiev <[email protected]>

* gen docs after changes

Signed-off-by: Krasi Georgiev <[email protected]>

* attempt to use the thanos memcache configs

Signed-off-by: Krasi Georgiev <[email protected]>

* refactored

Signed-off-by: Krasi Georgiev <[email protected]>

* fix the tests

Signed-off-by: Krasi Georgiev <[email protected]>

* fix invalid pointers

Signed-off-by: Krasi Georgiev <[email protected]>

* nits

Signed-off-by: Krasi Georgiev <[email protected]>

* fix e2e tests

Signed-off-by: Krasi Georgiev <[email protected]>

* add warning for the MaxItemSize

Signed-off-by: Krasi Georgiev <[email protected]>

* review comments

Signed-off-by: Krasi Georgiev <[email protected]>

* gen docs

Signed-off-by: Krasi Georgiev <[email protected]>

* links

Signed-off-by: Krasi Georgiev <[email protected]>
  • Loading branch information
krasi-georgiev authored Sep 18, 2020
1 parent 78c122f commit 17343c1
Show file tree
Hide file tree
Showing 16 changed files with 494 additions and 313 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased

- [#3032](https://github.com/thanos-io/thanos/pull/3032) Query Frontend: Added support for Memacahce cache. Replaced underscores with hyphens in `log_queries_longer_than - > log-queries-longer-than`.

### Added

- [#3184](https://github.com/thanos-io/thanos/pull/3184) Compact: Fix web.prefix-header to use &wc.prefixHeaderName
Expand Down
151 changes: 56 additions & 95 deletions cmd/thanos/query-frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,101 +8,80 @@ import (
"time"

"github.com/NYTimes/gziphandler"
"github.com/cortexproject/cortex/pkg/querier/frontend"
cortexfrontend "github.com/cortexproject/cortex/pkg/querier/frontend"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
cortexvalidation "github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/weaveworks/common/user"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/queryfrontend"
"github.com/thanos-io/thanos/pkg/queryfrontend/cache"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
"github.com/thanos-io/thanos/pkg/server/http/middleware"
"github.com/thanos-io/thanos/pkg/tracing"
"github.com/weaveworks/common/user"
)

type queryFrontendConfig struct {
http httpConfig
queryRangeConfig queryRangeConfig

downstreamURL string
compressResponses bool
LogQueriesLongerThan time.Duration

requestLoggingDecision string
http httpConfig
queryfrontend.Config
}

type queryRangeConfig struct {
respCacheConfig extflag.PathOrContent
cacheMaxFreshness time.Duration
splitInterval model.Duration
maxRetries int
maxQueryParallelism int
maxQueryLength model.Duration

// partialResponseStrategy is the default strategy used
// when parsing thanos query request.
partialResponseStrategy bool
}
func registerQueryFrontend(app *extkingpin.App) {
comp := component.QueryFrontend
cmd := app.Command(comp.String(), "query frontend")
cfg := &queryFrontendConfig{
Config: queryfrontend.Config{
CortexFrontendConfig: &cortexfrontend.Config{},
CortexLimits: &cortexvalidation.Limits{},
CortexResultsCacheConfig: &queryrange.ResultsCacheConfig{},
},
}

func (c *queryRangeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("query-range.split-interval", "Split queries by an interval and execute in parallel, 0 disables it.").
Default("24h").SetValue(&c.splitInterval)
cfg.http.registerFlag(cmd)

cmd.Flag("query-range.split-interval", "Split queries by an interval and execute in parallel, it should be greater than 0 when response-cache-config is configured.").
Default("24h").DurationVar(&cfg.SplitQueriesByInterval)

cmd.Flag("query-range.max-retries-per-request", "Maximum number of retries for a single request; beyond this, the downstream error is returned.").
Default("5").IntVar(&c.maxRetries)
Default("5").IntVar(&cfg.MaxRetries)

cmd.Flag("query-range.max-query-length", "Limit the query time range (end - start time) in the query-frontend, 0 disables it.").
Default("0").SetValue(&c.maxQueryLength)
Default("0").DurationVar(&cfg.CortexLimits.MaxQueryLength)

cmd.Flag("query-range.max-query-parallelism", "Maximum number of queries will be scheduled in parallel by the frontend.").
Default("14").IntVar(&c.maxQueryParallelism)
cmd.Flag("query-range.max-query-parallelism", "Maximum number of queries will be scheduled in parallel by the Frontend.").
Default("14").IntVar(&cfg.CortexLimits.MaxQueryParallelism)

cmd.Flag("query-range.response-cache-max-freshness", "Most recent allowed cacheable result, to prevent caching very recent results that might still be in flux.").
Default("1m").DurationVar(&c.cacheMaxFreshness)
Default("1m").DurationVar(&cfg.CortexLimits.MaxCacheFreshness)

cmd.Flag("query-range.partial-response", "Enable partial response for queries if no partial_response param is specified. --no-query-range.partial-response for disabling.").
Default("true").BoolVar(&c.partialResponseStrategy)
Default("true").BoolVar(&cfg.PartialResponseStrategy)

c.respCacheConfig = *extflag.RegisterPathOrContent(cmd, "query-range.response-cache-config", "YAML file that contains response cache configuration.", false)
}

func (c *queryFrontendConfig) registerFlag(cmd extkingpin.FlagClause) {
c.queryRangeConfig.registerFlag(cmd)
c.http.registerFlag(cmd)
cfg.CachePathOrContent = *extflag.RegisterPathOrContent(cmd, "query-range.response-cache-config", "YAML file that contains response cache configuration.", false)

cmd.Flag("query-frontend.downstream-url", "URL of downstream Prometheus Query compatible API.").
Default("http://localhost:9090").StringVar(&c.downstreamURL)
Default("http://localhost:9090").StringVar(&cfg.CortexFrontendConfig.DownstreamURL)

cmd.Flag("query-frontend.compress-responses", "Compress HTTP responses.").
Default("false").BoolVar(&c.compressResponses)

cmd.Flag("query-frontend.log_queries_longer_than", "Log queries that are slower than the specified duration. "+
"Set to 0 to disable. Set to < 0 to enable on all queries.").Default("0").DurationVar(&c.LogQueriesLongerThan)
Default("false").BoolVar(&cfg.CortexFrontendConfig.CompressResponses)

cmd.Flag("log.request.decision", "Request Logging for logging the start and end of requests. LogFinishCall is enabled by default. LogFinishCall : Logs the finish call of the requests. LogStartAndFinishCall : Logs the start and finish call of the requests. NoLogCall : Disable request logging.").Default("LogFinishCall").EnumVar(&c.requestLoggingDecision, "NoLogCall", "LogFinishCall", "LogStartAndFinishCall")
}
cmd.Flag("query-frontend.log-queries-longer-than", "Log queries that are slower than the specified duration. "+
"Set to 0 to disable. Set to < 0 to enable on all queries.").Default("0").DurationVar(&cfg.CortexFrontendConfig.LogQueriesLongerThan)

func registerQueryFrontend(app *extkingpin.App) {
comp := component.QueryFrontend
cmd := app.Command(comp.String(), "query frontend")
conf := &queryFrontendConfig{}
conf.registerFlag(cmd)
cmd.Flag("log.request.decision", "Request Logging for logging the start and end of requests. LogFinishCall is enabled by default. LogFinishCall : Logs the finish call of the requests. LogStartAndFinishCall : Logs the start and finish call of the requests. NoLogCall : Disable request logging.").Default("LogFinishCall").EnumVar(&cfg.RequestLoggingDecision, "NoLogCall", "LogFinishCall", "LogStartAndFinishCall")

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return runQueryFrontend(g, logger, reg, tracer, conf, comp)
return runQueryFrontend(g, logger, reg, tracer, cfg, comp)
})
}

Expand All @@ -111,54 +90,36 @@ func runQueryFrontend(
logger log.Logger,
reg *prometheus.Registry,
tracer opentracing.Tracer,
conf *queryFrontendConfig,
cfg *queryFrontendConfig,
comp component.Component,
) error {
cacheConfContentYaml, err := cfg.CachePathOrContent.Content()
if err != nil {
return err
}
if len(cacheConfContentYaml) > 0 {
cacheConfig, err := queryfrontend.NewCacheConfig(logger, cacheConfContentYaml)
if err != nil {
return errors.Wrap(err, "initializing the query frontend config")
}
if cfg.CortexResultsCacheConfig.CacheConfig.Memcache.Expiration == 0 {
level.Warn(logger).Log("msg", "memcached cache valid time set to 0, so using a default of 24 hours expiration time")
cfg.CortexResultsCacheConfig.CacheConfig.Memcache.Expiration = 24 * time.Hour
}
cfg.CortexResultsCacheConfig = cacheConfig
}

if len(conf.downstreamURL) == 0 {
return errors.New("downstream URL should be configured")
if err := cfg.Validate(); err != nil {
return errors.Wrap(err, "error validating the config")
}

fe, err := frontend.New(frontend.Config{
DownstreamURL: conf.downstreamURL,
CompressResponses: conf.compressResponses,
LogQueriesLongerThan: conf.LogQueriesLongerThan,
}, logger, reg)
fe, err := cortexfrontend.New(*cfg.CortexFrontendConfig, logger, reg)
if err != nil {
return errors.Wrap(err, "setup query frontend")
}
defer fe.Close()

limits := queryfrontend.NewLimits(
conf.queryRangeConfig.maxQueryParallelism,
time.Duration(conf.queryRangeConfig.maxQueryLength),
conf.queryRangeConfig.cacheMaxFreshness,
)

respCacheContentYaml, err := conf.queryRangeConfig.respCacheConfig.Content()
if err != nil {
return errors.Wrap(err, "get content of response cache configuration")
}

var cacheConfig *queryrange.ResultsCacheConfig
if len(respCacheContentYaml) > 0 {
cacheConfig, err = cache.NewResponseCacheConfig(respCacheContentYaml)
if err != nil {
return errors.Wrap(err, "create response cache")
}
}

codec := queryfrontend.NewThanosCodec(conf.queryRangeConfig.partialResponseStrategy)
tripperWare, err := queryfrontend.NewTripperWare(
limits,
cacheConfig,
codec,
queryrange.PrometheusResponseExtractor{},
time.Duration(conf.queryRangeConfig.splitInterval),
conf.queryRangeConfig.maxRetries,
reg,
logger,
)
tripperWare, err := queryfrontend.NewTripperware(cfg.Config, reg, logger)
if err != nil {
return errors.Wrap(err, "setup query range middlewares")
}
Expand All @@ -173,16 +134,16 @@ func runQueryFrontend(

// Configure Request Logging for HTTP calls.
opts := []logging.Option{logging.WithDecider(func() logging.Decision {
return logging.LogDecision[conf.requestLoggingDecision]
return logging.LogDecision[cfg.RequestLoggingDecision]
})}
logMiddleware := logging.NewHTTPServerMiddleware(logger, opts...)
ins := extpromhttp.NewInstrumentationMiddleware(reg)

// Start metrics HTTP server.
{
srv := httpserver.New(logger, reg, comp, httpProbe,
httpserver.WithListen(conf.http.bindAddress),
httpserver.WithGracePeriod(time.Duration(conf.http.gracePeriod)),
httpserver.WithListen(cfg.http.bindAddress),
httpserver.WithGracePeriod(time.Duration(cfg.http.gracePeriod)),
)

instr := func(f http.HandlerFunc) http.HandlerFunc {
Expand Down
43 changes: 32 additions & 11 deletions docs/components/query-frontend.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ For more information please check out [initial design proposal](https://thanos.i
### Splitting

Query Frontend splits a long query into multiple short queries based on the configured `--query-range.split-interval` flag. The default value of `--query-range.split-interval`
is `24h`. Set it to `0` disables query splitting, but please note that caching is also disabled in this case.
is `24h`. When caching is enabled it should be greater than `0`.

There are some benefits from query splitting:

Expand All @@ -44,9 +44,11 @@ Query Frontend supports a retry mechanism to retry query when HTTP requests are
### Caching

Query Frontend supports caching query results and reuses them on subsequent queries. If the cached results are incomplete,
Query Frontend calculates the required subqueries and executes them in parallel on downstream queriers. Query Frontend can optionally align queries with their step parameter to improve the cacheability of the query results.
Query Frontend calculates the required subqueries and executes them in parallel on downstream queriers.
Query Frontend can optionally align queries with their step parameter to improve the cacheability of the query results.
Currently, in-memory cache (fifo cache) and memcached are supported.

Currently, only in-memory cache (fifo cache) is supported. An example config:
#### In-memory

[embedmd]:# (../flags/config_response_cache_in_memory.txt yaml)
```yaml
Expand All @@ -57,9 +59,27 @@ config:
validity: 0s
```
#### Memcached
[embedmd]:# (../flags/config_response_cache_memcached.txt yaml)
```yaml
type: MEMCACHED
config:
addresses: []
timeout: 0s
max_idle_connections: 0
max_async_concurrency: 0
max_async_buffer_size: 0
max_get_multi_concurrency: 0
max_item_size: 0
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
expiration: 0s
```
### Slow Query Log
Query Frontend supports `--query-frontend.log_queries_longer_than` flag to log queries running longer some duration.
Query Frontend supports `--query-frontend.log-queries-longer-than` flag to log queries running longer than some duration.

## Naming

Expand Down Expand Up @@ -89,9 +109,14 @@ Flags:
priority). Content of YAML file with tracing
configuration. See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--http-grace-period=2m Time to wait after an interrupt received for HTTP
Server.
--query-range.split-interval=24h
Split queries by an interval and execute in
parallel, 0 disables it.
parallel, it should be greater than 0 when
response-cache-config is configured.
--query-range.max-retries-per-request=5
Maximum number of retries for a single request;
beyond this, the downstream error is returned.
Expand All @@ -100,7 +125,7 @@ Flags:
the query-frontend, 0 disables it.
--query-range.max-query-parallelism=14
Maximum number of queries will be scheduled in
parallel by the frontend.
parallel by the Frontend.
--query-range.response-cache-max-freshness=1m
Most recent allowed cacheable result, to prevent
caching very recent results that might still be in
Expand All @@ -117,15 +142,11 @@ Flags:
'query-range.response-cache-config-file' flag
(lower priority). Content of YAML file that
contains response cache configuration.
--http-address="0.0.0.0:10902"
Listen host:port for HTTP endpoints.
--http-grace-period=2m Time to wait after an interrupt received for HTTP
Server.
--query-frontend.downstream-url="http://localhost:9090"
URL of downstream Prometheus Query compatible API.
--query-frontend.compress-responses
Compress HTTP responses.
--query-frontend.log_queries_longer_than=0
--query-frontend.log-queries-longer-than=0
Log queries that are slower than the specified
duration. Set to 0 to disable. Set to < 0 to
enable on all queries.
Expand Down
Loading

0 comments on commit 17343c1

Please sign in to comment.