Skip to content

Commit

Permalink
Add exemplars to TraceQL metrics (#3824)
Browse files Browse the repository at this point in the history
* Add exemplars to TraceQL metric queries

Add query hint

Fix test

Might work now

fmt

Almost there

A few fixes

Doc fix

Drop percentage

More more

Consolidating

fmt

Fix tests

More changes

chlog

Minor fix

Exemplars in compare() and other fixes

Fix test

Improvements

Bucketize

ups

Cleaning up

Stuff

Change benchmark

Clean up

Exemplar iterator

Ditch predicate approach

* Quit early predicate improvements

* Docs

* Post-review improvements

* Consolidate hint checking

* Empty exemplars cleanup

* Not a map anymore

* Split observe() and observeExemplar()

* Minor fixes

* Consolidate exemplar sampling

* Some improvements

* Reduce allocs

* Remove unnecessary func

* Review comments

* Pass callback in Condition

* Comment

* Fix tests
  • Loading branch information
mapno authored Aug 2, 2024
1 parent 2ea9d04 commit 5a6f140
Show file tree
Hide file tree
Showing 31 changed files with 1,492 additions and 471 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [FEATURE] Add new compare() metrics function [#3695](https://github.com/grafana/tempo/pull/3695) (@mdisibio)
* [FEATURE] Add new api `/api/metrics/query` for instant metrics queries [#3859](https://github.com/grafana/tempo/pull/3859) (@mdisibio)
* [FEATURE] Add a `q` parameter to `/api/v2/serach/tags` for tag name filtering [#3822](https://github.com/grafana/tempo/pull/3822) (@joe-elliott)
* [FEATURE] Add exemplars to TraceQL metrics [#3824](https://github.com/grafana/tempo/pull/3824) (@mapno)
* [ENHANCEMENT] Implement arrays for traceql.Static with reused fields [#3827](https://github.com/grafana/tempo/pull/3827) (@stoewer)
* [ENHANCEMENT] Tag value lookup use protobuf internally for improved latency [#3731](https://github.com/grafana/tempo/pull/3731) (@mdisibio)
* [ENHANCEMENT] TraceQL metrics queries use protobuf internally for improved latency [#3745](https://github.com/grafana/tempo/pull/3745) (@mdisibio)
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM alpine:3.20 as certs
FROM alpine:3.20 AS certs
RUN apk --update add ca-certificates
ARG TARGETARCH
COPY bin/linux/tempo-${TARGETARCH} /tempo
Expand Down
14 changes: 14 additions & 0 deletions docs/sources/tempo/traceql/metrics-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,18 @@ To demonstrate this flexibility, consider this nonsensical quantile on `span.htt

```
{ name = "GET /:endpoint" } | quantile_over_time(span.http.status_code, .99, .9, .5)
```

## Exemplars

Exemplars are a powerful feature of TraceQL metrics.
They allow you to see an exact trace that contributed to a given metric value.
This is particularly useful when you want to understand why a given metric is high or low.

Exemplars are available in TraceQL metrics for all functions.
To get exemplars, you need to configure it in the query-frontend with the parameter `query_frontend.metrics.exemplars`,
or pass a query hint in your query.

```
{ name = "GET /:endpoint" } | quantile_over_time(duration, .99) by (span.http.target) with (exemplars=true)
```
14 changes: 6 additions & 8 deletions integration/e2e/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,14 @@ docker logs $(docker container ls -f name=tempo_e2e-tempo -q) -f
**How to debug Tempo while running an integration test**

1. Build latest debug image
```sh
make docker-tempo-debug
```
```sh
make docker-tempo-debug
```
2. Use the function ``NewTempoAllInOneDebug`` in your test to spin a Tempo instance with debug capabilities
3. Set a breakpoint after ``require.NoError(t, s.StartAndWaitReady(tempo))`` and before the action you want debug
4. Get the port of Delve debugger inside the container
```sh
```sh
docker ps --format '{{.Ports}}'
# 0.0.0.0:53467->2345
```
# 0.0.0.0:53467->2345
```
5. Run the debugger against that port as is specified [here](https://github.com/grafana/tempo/tree/main/example/docker-compose/debug)


78 changes: 78 additions & 0 deletions integration/e2e/config-query-range.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
target: all
stream_over_http_enabled: true

server:
http_listen_port: 3200
log_level: debug

query_frontend:
search:
query_backend_after: 0 # setting these both to 0 will force all range searches to hit the backend
query_ingesters_until: 0
metrics:
exemplars: true
rf1_read_path: true

distributor:
receivers:
jaeger:
protocols:
grpc:
otlp:
protocols:
grpc:
zipkin:
log_received_spans:
enabled: true

metrics_generator:
processor:
local_blocks:
flush_check_period: 1s
max_block_duration: 10s
flush_to_storage: true
storage:
path: /var/tempo
remote_write:
- url: http://tempo_e2e-prometheus:9090/api/v1/write
send_exemplars: true
traces_storage:
path: /var/tempo/generator/traces

ingester:
lifecycler:
address: 127.0.0.1
ring:
kvstore:
store: inmemory
replication_factor: 1
final_sleep: 0s
trace_idle_period: 1s
max_block_bytes: 1
max_block_duration: 2s
complete_block_timeout: 20s
flush_check_period: 1s

storage:
trace:
backend: local
local:
path: /var/tempo
pool:
max_workers: 10
queue_depth: 100
block:
version: vParquet4
blocklist_poll: 1s

overrides:
defaults:
metrics_generator:
processors: [ local-blocks ]
user_configurable_overrides:
enabled: true
poll_interval: 10s
client:
backend: local
local:
path: /var/tempo/overrides
101 changes: 101 additions & 0 deletions integration/e2e/query_range_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package e2e

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"testing"
"time"

"github.com/grafana/e2e"
"github.com/grafana/tempo/integration/util"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/stretchr/testify/require"
)

const configQueryRange = "config-query-range.yaml"

// Set debugMode to true to print the response body
var debugMode = false

func TestQueryRangeExemplars(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()

require.NoError(t, util.CopyFileToSharedDir(s, configQueryRange, "config.yaml"))
tempo := util.NewTempoAllInOne()
require.NoError(t, s.StartAndWaitReady(tempo))

jaegerClient, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, jaegerClient)

ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
sendLoop:
for {
select {
case <-ticker.C:
require.NoError(t, jaegerClient.EmitBatch(context.Background(), util.MakeThriftBatch()))
case <-timer.C:
break sendLoop
}
}

// Wait for traces to be flushed to blocks
require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"tempo_metrics_generator_processor_local_blocks_spans_total"}, e2e.WaitMissingMetrics))
require.NoError(t, tempo.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"tempo_metrics_generator_processor_local_blocks_cut_blocks"}, e2e.WaitMissingMetrics))

for _, query := range []string{
"{} | rate()",
"{} | compare({status=error})",
} {
t.Run(query, func(t *testing.T) {
callQueryRange(t, tempo.Endpoint(3200), query, debugMode)
})
}
}

func callQueryRange(t *testing.T, endpoint, query string, printBody bool) {
url := buildURL(endpoint, fmt.Sprintf("%s with(exemplars=true)", query))
req, err := http.NewRequest(http.MethodGet, url, nil)
require.NoError(t, err)

res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, res.StatusCode)

// Read body and print it
body, err := io.ReadAll(res.Body)
require.NoError(t, err)
if printBody {
fmt.Println(string(body))
}

queryRangeRes := tempopb.QueryRangeResponse{}
require.NoError(t, json.Unmarshal(body, &queryRangeRes))
require.NotNil(t, queryRangeRes)
require.GreaterOrEqual(t, len(queryRangeRes.GetSeries()), 1)
exemplarCount := 0
for _, series := range queryRangeRes.GetSeries() {
exemplarCount += len(series.GetExemplars())
}
require.GreaterOrEqual(t, exemplarCount, 1)
}

func buildURL(endpoint, query string) string {
return fmt.Sprintf(
"http://%s/api/metrics/query_range?query=%s&start=%d&end=%d&step=%s",
endpoint,
url.QueryEscape(query),
time.Now().Add(-5*time.Minute).UnixNano(),
time.Now().Add(time.Minute).UnixNano(),
"5s",
)
}
2 changes: 1 addition & 1 deletion integration/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewTempoAllInOneDebug(extraArgs ...string) *e2e.HTTPService {
2345, // delve port
)
env := map[string]string{
"DEBUG_BLOCK": "0",
"DEBUG_BLOCK": "1",
}
s.SetEnvVars(env)

Expand Down
3 changes: 3 additions & 0 deletions modules/frontend/combiner/metrics_query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,8 @@ func sortResponse(res *tempopb.QueryRangeResponse) {
sort.Slice(series.Samples, func(i, j int) bool {
return series.Samples[i].TimestampMs < series.Samples[j].TimestampMs
})
sort.Slice(series.Exemplars, func(i, j int) bool {
return series.Exemplars[i].TimestampMs < series.Exemplars[j].TimestampMs
})
}
}
2 changes: 2 additions & 0 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
TargetBytesPerRequest: defaultTargetBytesPerRequest,
Interval: 5 * time.Minute,
RF1ReadPath: false,
Exemplars: false, // TODO: Remove?
MaxExemplars: 100,
},
SLO: slo,
}
Expand Down
21 changes: 21 additions & 0 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type QueryRangeSharderConfig struct {
QueryBackendAfter time.Duration `yaml:"query_backend_after,omitempty"`
Interval time.Duration `yaml:"interval,omitempty"`
RF1ReadPath bool `yaml:"rf1_read_path,omitempty"`
Exemplars bool `yaml:"exemplars,omitempty"`
MaxExemplars int `yaml:"max_exemplars,omitempty"`
}

// newAsyncQueryRangeSharder creates a sharding middleware for search
Expand Down Expand Up @@ -274,6 +276,7 @@ func (s *queryRangeSharder) buildShardedBackendRequests(ctx context.Context, ten
}

shards := uint32(math.Ceil(float64(totalBlockSize) / float64(targetBytesPerRequest)))
exemplars := max(s.exemplarsPerShard(shards), 1)

for i := uint32(1); i <= shards; i++ {

Expand All @@ -282,6 +285,7 @@ func (s *queryRangeSharder) buildShardedBackendRequests(ctx context.Context, ten
shardR.End = thisEnd
shardR.ShardID = i
shardR.ShardCount = shards
shardR.Exemplars = exemplars
httpReq := s.toUpstreamRequest(ctx, shardR, parent, tenantID)

pipelineR := pipeline.NewHTTPRequest(httpReq)
Expand All @@ -306,6 +310,13 @@ func (s *queryRangeSharder) buildShardedBackendRequests(ctx context.Context, ten
}
}

func (s *queryRangeSharder) exemplarsPerShard(total uint32) uint32 {
if !s.cfg.Exemplars {
return 0
}
return uint32(math.Ceil(float64(s.cfg.MaxExemplars)*1.2)) / total
}

func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, _ float64, targetBytesPerRequest int, _ time.Duration, reqCh chan pipeline.Request) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
// request without start or end, search only in generator
if searchReq.Start == 0 || searchReq.End == 0 {
Expand Down Expand Up @@ -357,6 +368,7 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s

queryHash := hashForQueryRangeRequest(&searchReq)

exemplars := s.exemplarsPerShard(uint32(len(metas)))
for _, m := range metas {
if m.EndTime.Before(m.StartTime) {
// Ignore blocks with bad timings from debugging
Expand Down Expand Up @@ -403,6 +415,7 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
Size_: m.Size,
FooterSize: m.FooterSize,
DedicatedColumns: dc,
Exemplars: max(exemplars/(m.TotalRecords/uint32(pages)), 1),
}

subR = api.BuildQueryRangeRequest(subR, queryRangeReq)
Expand All @@ -426,6 +439,13 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
}
}

func max(a, b uint32) uint32 {
if a > b {
return a
}
return b
}

func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest, parent *http.Request, tenantID string, cutoff time.Time) *http.Request {
traceql.TrimToAfter(&searchReq, cutoff)

Expand All @@ -435,6 +455,7 @@ func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest
}

searchReq.QueryMode = querier.QueryModeRecent
searchReq.Exemplars = uint32(s.cfg.MaxExemplars) // TODO: Review this

req := s.toUpstreamRequest(parent.Context(), searchReq, parent, tenantID)
req.Header.Set(api.HeaderAccept, api.HeaderAcceptProtobuf)
Expand Down
Loading

0 comments on commit 5a6f140

Please sign in to comment.