Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework SLO metrics #2840

Merged
merged 14 commits into from
Aug 25, 2023
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@
* [BUGFIX] Fix panic in metrics summary api [#2738](https://github.com/grafana/tempo/pull/2738) (@mdisibio)
* [BUGFIX] Fix node role auth IDMSv1 [#2760](https://github.com/grafana/tempo/pull/2760) (@coufalja)
* [BUGFIX] Only search ingester blocks that fall within the request time range. [#2783](https://github.com/grafana/tempo/pull/2783) (@joe-elliott)
* [BUGFIX] Align tempo_query_frontend_queries_total and tempo_query_frontend_queries_within_slo_total. [#2840](https://github.com/grafana/tempo/pull/2840) (@joe-elliott)
This query will now correctly tell you %age of requests that are within SLO:
```
sum(rate(tempo_query_frontend_queries_within_slo_total{}[1m])) by (op)
/
sum(rate(tempo_query_frontend_queries_total{}[1m])) by (op)
```
**BREAKING CHANGE** Removed: tempo_query_frontend_queries_total{op="searchtags|metrics"}.
* [CHANGE] Overrides module refactor [#2688](https://github.com/grafana/tempo/pull/2688) (@mapno)
Added new `defaults` block to the overrides' module. Overrides change to indented syntax.
Old config:
Expand All @@ -38,7 +46,7 @@ defaults:
forwarders: ['foo']
metrics_generator:
processors: [service-graphs, span-metrics]
```
```

## v2.2.1 / 2023-08-??

Expand Down
7 changes: 7 additions & 0 deletions example/docker-compose/shared/tempo.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
server:
http_listen_port: 3200

query_frontend:
search:
duration_slo: 5s
throughput_bytes_slo: 1.073741824e+09
trace_by_id:
duration_slo: 5s

distributor:
receivers: # this configuration will listen on all ports and protocols that tempo is capable of.
jaeger: # the receives all come from the OpenTelemetry collector. more configuration information can
Expand Down
29 changes: 5 additions & 24 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,13 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/tempodb"
)

const (
traceByIDOp = "traces"
searchOp = "search"
searchTagsOp = "searchtags"
metricsOp = "metrics"
)

type streamingSearchHandler func(req *tempopb.SearchRequest, srv tempopb.StreamingQuerier_SearchServer) error

type QueryFrontend struct {
Expand Down Expand Up @@ -60,12 +52,6 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
return nil, fmt.Errorf("query backend after should be less than or equal to query ingester until")
}

queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "query_frontend_queries_total",
Help: "Total queries received per tenant.",
}, []string{"tenant", "op", "status"})

retryWare := newRetryWare(cfg.MaxRetries, registerer)

// tracebyid middleware
Expand All @@ -75,21 +61,16 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo

spanMetricsMiddleware := MergeMiddlewares(newSpanMetricsMiddleware(), retryWare)

traceByIDCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": traceByIDOp})
searchCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": searchOp})
searchTagsCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": searchTagsOp})
spanMetricsCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": metricsOp})

traces := traceByIDMiddleware.Wrap(next)
search := searchMiddleware.Wrap(next)
searchTags := searchTagsMiddleware.Wrap(next)
metrics := spanMetricsMiddleware.Wrap(next)

return &QueryFrontend{
TraceByIDHandler: newHandler(traces, traceByIDCounter, logger),
SearchHandler: newHandler(search, searchCounter, logger),
SearchTagsHandler: newHandler(searchTags, searchTagsCounter, logger),
SpanMetricsSummaryHandler: newHandler(metrics, spanMetricsCounter, logger),
TraceByIDHandler: newHandler(traces, traceByIDSLOPostHook(cfg.TraceByID.SLO), nil, logger),
SearchHandler: newHandler(search, searchSLOPostHook(cfg.Search.SLO), searchSLOPreHook, logger),
SearchTagsHandler: newHandler(searchTags, nil, nil, logger),
SpanMetricsSummaryHandler: newHandler(metrics, nil, nil, logger),
streamingSearch: newSearchStreamingHandler(cfg, o, retryWare.Wrap(next), reader, apiPrefix, logger),
logger: logger,
}, nil
Expand Down Expand Up @@ -191,7 +172,7 @@ func newTraceByIDMiddleware(cfg Config, logger log.Logger) Middleware {
// newSearchMiddleware creates a new frontend middleware to handle search and search tags requests.
func newSearchMiddleware(cfg Config, o overrides.Interface, reader tempodb.Reader, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
searchRT := NewRoundTripper(next, newSearchSharder(reader, o, cfg.Search.Sharder, cfg.Search.SLO, newSearchProgress, logger))
searchRT := NewRoundTripper(next, newSearchSharder(reader, o, cfg.Search.Sharder, newSearchProgress, logger))

return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
// backend search queries require sharding, so we pass through a special roundtripper
Expand Down
5 changes: 5 additions & 0 deletions modules/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ import (
"github.com/stretchr/testify/require"
)

var testSLOcfg = SLOConfig{
ThroughputBytesSLO: 0,
DurationSLO: 0,
}

type mockNextTripperware struct{}

func (s *mockNextTripperware) RoundTrip(_ *http.Request) (*http.Response, error) {
Expand Down
46 changes: 27 additions & 19 deletions modules/frontend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"io"
"net/http"
"strconv"
"strings"
"time"

Expand All @@ -16,7 +15,6 @@ import (
"github.com/grafana/dskit/tracing"
"github.com/grafana/dskit/user"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
)

const (
Expand All @@ -32,20 +30,25 @@ var (
errRequestEntityTooLarge = httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "http: request body too large")
)

type handlerPostHook func(ctx context.Context, resp *http.Response, tenant string, latency time.Duration, err error)
type handlerPreHook func(ctx context.Context) context.Context

// handler exists to wrap a roundtripper with an HTTP handler. It wraps all
// frontend endpoints and should only contain functionality that is common to all.
type handler struct {
roundTripper http.RoundTripper
logger log.Logger
queriesPerTenant *prometheus.CounterVec
roundTripper http.RoundTripper
logger log.Logger
post handlerPostHook
pre handlerPreHook
}

// newHandler creates a handler
func newHandler(rt http.RoundTripper, queriesPerTenant *prometheus.CounterVec, logger log.Logger) http.Handler {
func newHandler(rt http.RoundTripper, post handlerPostHook, pre handlerPreHook, logger log.Logger) http.Handler {
return &handler{
roundTripper: rt,
logger: logger,
queriesPerTenant: queriesPerTenant,
roundTripper: rt,
logger: logger,
post: post,
pre: pre,
}
}

Expand All @@ -60,27 +63,31 @@ func (f *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
orgID, _ := user.ExtractOrgID(ctx)
traceID, _ := tracing.ExtractTraceID(ctx)

var statusCode int
defer func(status int) {
f.queriesPerTenant.WithLabelValues(orgID, strconv.Itoa(status)).Inc()
}(statusCode)

// add orgid to existing spans
span := opentracing.SpanFromContext(r.Context())
if span != nil {
span.SetTag("orgID", orgID)
}

if f.pre != nil {
ctx = f.pre(ctx)
r = r.WithContext(ctx)
}
resp, err := f.roundTripper.RoundTrip(r)
elapsed := time.Since(start)
if f.post != nil {
f.post(ctx, resp, orgID, elapsed, err)
}

if err != nil {
statusCode = http.StatusInternalServerError
statusCode := http.StatusInternalServerError
err = writeError(w, err)
level.Info(f.logger).Log(
"tenant", orgID,
"method", r.Method,
"traceID", traceID,
"url", r.URL.RequestURI(),
"duration", time.Since(start).String(),
"duration", elapsed.String(),
"response_size", 0,
"status", statusCode,
"err", err.Error(),
Expand All @@ -89,14 +96,14 @@ func (f *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

if resp == nil {
statusCode = http.StatusInternalServerError
statusCode := http.StatusInternalServerError
err = writeError(w, errors.New(NilResponseError))
level.Info(f.logger).Log(
"tenant", orgID,
"method", r.Method,
"traceID", traceID,
"url", r.URL.RequestURI(),
"duration", time.Since(start).String(),
"duration", elapsed.String(),
"response_size", 0,
"status", statusCode,
"err", err.Error(),
Expand All @@ -113,6 +120,7 @@ func (f *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// request/response logging
var contentLength int64
var statusCode int
if httpResp, ok := httpgrpc.HTTPResponseFromError(err); ok {
statusCode = int(httpResp.Code)
contentLength = int64(len(httpResp.Body))
Expand All @@ -126,7 +134,7 @@ func (f *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
"method", r.Method,
"traceID", traceID,
"url", r.URL.RequestURI(),
"duration", time.Since(start).String(),
"duration", elapsed.String(),
"response_size", contentLength,
"status", statusCode,
)
Expand Down
15 changes: 14 additions & 1 deletion modules/frontend/search_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pkg/errors"
"go.uber.org/atomic"

"github.com/grafana/dskit/user"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
Expand Down Expand Up @@ -94,6 +95,8 @@ func (p *diffSearchProgress) finalResult() *shardedSearchResults {

// newSearchStreamingHandler returns a handler that streams results from the HTTP handler
func newSearchStreamingHandler(cfg Config, o overrides.Interface, downstream http.RoundTripper, reader tempodb.Reader, apiPrefix string, logger log.Logger) streamingSearchHandler {
postSLOHook := searchSLOPostHook(cfg.Search.SLO)

downstreamPath := path.Join(apiPrefix, api.PathSearch)
return func(req *tempopb.SearchRequest, srv tempopb.StreamingQuerier_SearchServer) error {
// build search request and propagate context
Expand All @@ -110,6 +113,12 @@ func newSearchStreamingHandler(cfg Config, o overrides.Interface, downstream htt
return fmt.Errorf("build search request failed: %w", err)
}
ctx := srv.Context()

// SLOS - start timer and prep context
start := time.Now()
tenant, _ := user.ExtractOrgID(ctx)
ctx = searchSLOPreHook(ctx)

httpReq = httpReq.WithContext(ctx)

// streaming search only accepts requests with backend components
Expand All @@ -125,7 +134,7 @@ func newSearchStreamingHandler(cfg Config, o overrides.Interface, downstream htt
return p
}
// build roundtripper
rt := NewRoundTripper(downstream, newSearchSharder(reader, o, cfg.Search.Sharder, cfg.Search.SLO, fn, logger))
rt := NewRoundTripper(downstream, newSearchSharder(reader, o, cfg.Search.Sharder, fn, logger))

type roundTripResult struct {
resp *http.Response
Expand All @@ -138,13 +147,17 @@ func newSearchStreamingHandler(cfg Config, o overrides.Interface, downstream htt
resp, err := rt.RoundTrip(httpReq)
resultChan <- roundTripResult{resp, err}
close(resultChan)

// SLOs record results
postSLOHook(ctx, resp, tenant, time.Since(start), err)
}()

// collect and return results
for {
select {
// handles context canceled or other errors
case <-ctx.Done():
postSLOHook(ctx, nil, tenant, time.Since(start), ctx.Err())
return ctx.Err()
// stream results as they come in
case <-time.After(500 * time.Millisecond):
Expand Down
39 changes: 10 additions & 29 deletions modules/frontend/searchsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,6 @@ var (
}, []string{"tenant", "op"})

searchThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": searchOp})

sloQueriesPerTenant = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "query_frontend_queries_within_slo_total",
Help: "Total Queries within SLO per tenant",
}, []string{"tenant", "op"})

sloTraceByIDCounter = sloQueriesPerTenant.MustCurryWith(prometheus.Labels{"op": traceByIDOp})
sloSearchCounter = sloQueriesPerTenant.MustCurryWith(prometheus.Labels{"op": searchOp})
)

type searchSharder struct {
Expand All @@ -58,7 +49,6 @@ type searchSharder struct {
progress searchProgressFactory

cfg SearchSharderConfig
sloCfg SLOConfig
logger log.Logger
}

Expand All @@ -78,14 +68,13 @@ type backendReqMsg struct {
}

// newSearchSharder creates a sharding middleware for search
func newSearchSharder(reader tempodb.Reader, o overrides.Interface, cfg SearchSharderConfig, sloCfg SLOConfig, progress searchProgressFactory, logger log.Logger) Middleware {
func newSearchSharder(reader tempodb.Reader, o overrides.Interface, cfg SearchSharderConfig, progress searchProgressFactory, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
return searchSharder{
next: next,
reader: reader,
overrides: o,
cfg: cfg,
sloCfg: sloCfg,
logger: logger,

progress: progress,
Expand All @@ -108,15 +97,15 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
// adjust limit based on config
searchReq.Limit = adjustLimit(searchReq.Limit, s.cfg.DefaultLimit, s.cfg.MaxLimit)

ctx := r.Context()
tenantID, err := user.ExtractOrgID(ctx)
requestCtx := r.Context()
tenantID, err := user.ExtractOrgID(requestCtx)
if err != nil {
return &http.Response{
StatusCode: http.StatusBadRequest,
Body: io.NopCloser(strings.NewReader(err.Error())),
}, nil
}
span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.ShardSearch")
span, ctx := opentracing.StartSpanFromContext(requestCtx, "frontend.ShardSearch")
defer span.Finish()

reqStart := time.Now()
Expand Down Expand Up @@ -277,16 +266,19 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
return nil, err
}

recordSearchSLO(s.sloCfg, tenantID, reqTime, throughput)
// see slos.go for why we need to record throughput here
addThroughputToContext(requestCtx, throughput)

return &http.Response{
resp := &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{
api.HeaderContentType: {api.HeaderAcceptJSON},
},
Body: io.NopCloser(strings.NewReader(bodyString)),
ContentLength: int64(len([]byte(bodyString))),
}, nil
}

return resp, nil
}

// blockMetas returns all relevant blockMetas given a start/end
Expand Down Expand Up @@ -504,17 +496,6 @@ func adjustLimit(limit, defaultLimit, maxLimit uint32) uint32 {
return limit
}

func recordSearchSLO(sloCfg SLOConfig, tenantID string, reqTime time.Duration, throughput float64) {
// only capture if SLOConfig is set
if sloCfg.DurationSLO != 0 && sloCfg.ThroughputBytesSLO != 0 {
if reqTime < sloCfg.DurationSLO || throughput > sloCfg.ThroughputBytesSLO {
// query is within SLO if query returned 200 within DurationSLO seconds OR
// processed ThroughputBytesSLO bytes/s data
sloSearchCounter.WithLabelValues(tenantID).Inc()
}
}
}

// maxDuration returns the max search duration allowed for this tenant.
func (s *searchSharder) maxDuration(tenantID string) time.Duration {
// check overrides first, if no overrides then grab from our config
Expand Down
Loading