Skip to content

Commit

Permalink
[Traceql Metrics] PR 4 - Sampling (#3275)
Browse files Browse the repository at this point in the history
* Add general purpose with(hints) to traceql

* draft changes to support sampling rate hint

* Add query mode parameter so we can apply sharding and sampling rules to the generator. Fix sampling rate rounding

* Switch to last shard

* Make query recent mode a const

* lint
  • Loading branch information
mdisibio authored Jan 12, 2024
1 parent 3091f69 commit 2ca7265
Show file tree
Hide file tree
Showing 13 changed files with 1,043 additions and 703 deletions.
127 changes: 95 additions & 32 deletions modules/frontend/query_range_sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/opentracing/opentracing-go"

"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/modules/querier"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/boundedwaitgroup"
"github.com/grafana/tempo/pkg/tempopb"
Expand Down Expand Up @@ -61,14 +62,14 @@ func newQueryRangeSharder(reader tempodb.Reader, o overrides.Interface, cfg Quer
}

func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
now := time.Now()
span, ctx := opentracing.StartSpanFromContext(r.Context(), "frontend.QueryRangeSharder")
defer span.Finish()

var (
isProm bool
err error
queryRangeReq *tempopb.QueryRangeRequest
generatorReq *queryRangeJob
tenantID string
isProm bool
err error
generatorReq *queryRangeJob
now = time.Now()
)

// This route supports two flavors. (1) Prometheus-compatible (2) Tempo native
Expand All @@ -84,23 +85,20 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
now = time.Unix(now.Unix(), 0)
}

queryRangeReq, err = api.ParseQueryRangeRequest(r)
queryRangeReq, err := api.ParseQueryRangeRequest(r)
if err != nil {
return s.respErrHandler(isProm, err)
}

_, err = traceql.Parse(queryRangeReq.Query)
expr, err := traceql.Parse(queryRangeReq.Query)
if err != nil {
return s.respErrHandler(isProm, err)
}

ctx := r.Context()
tenantID, err = user.ExtractOrgID(ctx)
tenantID, err := user.ExtractOrgID(ctx)
if err != nil {
return s.respErrHandler(isProm, err)
}
span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.QueryRangeSharder")
defer span.Finish()

subCtx, subCancel := context.WithCancel(ctx)
defer subCancel()
Expand All @@ -114,7 +112,15 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
}, nil
}

generatorReq = s.generatorRequest(*queryRangeReq)
// Check sampling rate hint
samplingRate := 1.0
if ok, v := expr.Hints.GetFloat(traceql.HintSample); ok {
if v > 0 && v < 1.0 {
samplingRate = v
}
}

generatorReq = s.generatorRequest(*queryRangeReq, samplingRate)

reqCh := make(chan *queryRangeJob, 1) // buffer of 1 allows us to insert ingestReq if it exists
stopCh := make(chan struct{})
Expand All @@ -124,7 +130,7 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
reqCh <- generatorReq
}

s.backendRequests(tenantID, queryRangeReq, now, reqCh, stopCh)
totalBlocks, totalBlockBytes := s.backendRequests(tenantID, queryRangeReq, now, samplingRate, reqCh, stopCh)

wg := boundedwaitgroup.New(uint(s.cfg.ConcurrentRequests))
c := traceql.QueryRangeCombiner{}
Expand Down Expand Up @@ -180,30 +186,52 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (*http.Response, error) {
return
}

// Multiply up the sampling rate
if job.samplingRate != 1.0 {
for _, series := range results.Series {
for i, sample := range series.Samples {
sample.Value *= 1.0 / job.samplingRate
series.Samples[i] = sample
}
}
}

mtx.Lock()
defer mtx.Unlock()
c.Combine(results.Series)
c.Combine(results)
}(job)
}

// wait for all goroutines running in wg to finish or cancelled
wg.Wait()

res := c.Response()
res.Metrics.CompletedJobs = uint32(startedReqs)
res.Metrics.TotalBlocks = uint32(totalBlocks)
res.Metrics.TotalBlockBytes = uint64(totalBlockBytes)

reqTime := time.Since(now)
throughput := float64(res.Metrics.InspectedBytes) / reqTime.Seconds()

span.SetTag("totalBlocks", res.Metrics.TotalBlocks)
span.SetTag("inspectedBytes", res.Metrics.InspectedBytes)
span.SetTag("inspectedTraces", res.Metrics.InspectedTraces)
span.SetTag("totalBlockBytes", res.Metrics.TotalBlockBytes)
span.SetTag("totalJobs", res.Metrics.TotalJobs)
span.SetTag("finishedJobs", res.Metrics.CompletedJobs)
span.SetTag("requestThroughput", throughput)

var bodyString string
if isProm {
promResp := s.convertToPromFormat(&tempopb.QueryRangeResponse{
Series: c.Results(),
})
promResp := s.convertToPromFormat(res)
bytes, err := json.Marshal(promResp)
if err != nil {
return nil, err
}
bodyString = string(bytes)
} else {
m := &jsonpb.Marshaler{}
bodyString, err = m.MarshalToString(&tempopb.QueryRangeResponse{
Series: c.Results(),
})
bodyString, err = m.MarshalToString(res)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -236,7 +264,7 @@ func (s *queryRangeSharder) blockMetas(start, end int64, tenantID string) []*bac
return metas
}

func (s *queryRangeSharder) backendRequests(tenantID string, searchReq *tempopb.QueryRangeRequest, now time.Time, reqCh chan *queryRangeJob, stopCh <-chan struct{}) {
func (s *queryRangeSharder) backendRequests(tenantID string, searchReq *tempopb.QueryRangeRequest, now time.Time, samplingRate float64, reqCh chan *queryRangeJob, stopCh <-chan struct{}) (totalBlocks, totalBlockBytes int) {
// request without start or end, search only in generator
if searchReq.Start == 0 || searchReq.End == 0 {
close(reqCh)
Expand All @@ -257,12 +285,28 @@ func (s *queryRangeSharder) backendRequests(tenantID string, searchReq *tempopb.
return
}

// Blocks within overall time range. This is just for instrumentation, more precise time
// range is checked for each window.
blocks := s.blockMetas(int64(start), int64(end), tenantID)
if len(blocks) == 0 {
// no need to search backend
close(reqCh)
return
}

totalBlocks = len(blocks)
for _, b := range blocks {
totalBlockBytes += int(b.Size)
}

go func() {
s.buildBackendRequests(tenantID, searchReq, start, end, reqCh, stopCh)
s.buildBackendRequests(tenantID, searchReq, start, end, samplingRate, reqCh, stopCh)
}()

return
}

func (s *queryRangeSharder) buildBackendRequests(tenantID string, searchReq *tempopb.QueryRangeRequest, start, end uint64, reqCh chan *queryRangeJob, stopCh <-chan struct{}) {
func (s *queryRangeSharder) buildBackendRequests(tenantID string, searchReq *tempopb.QueryRangeRequest, start, end uint64, samplingRate float64, reqCh chan *queryRangeJob, stopCh <-chan struct{}) {
defer close(reqCh)

timeWindowSize := uint64(s.cfg.Interval.Nanoseconds())
Expand Down Expand Up @@ -295,8 +339,16 @@ func (s *queryRangeSharder) buildBackendRequests(tenantID string, searchReq *tem
shardR.ShardID = i
shardR.ShardCount = shards

if samplingRate != 1.0 {
shardR.ShardID *= uint32(1.0 / samplingRate)
shardR.ShardCount *= uint32(1.0 / samplingRate)

// Set final sampling rate after integer rounding
samplingRate = float64(shards) / float64(shardR.ShardCount)
}

select {
case reqCh <- &queryRangeJob{req: shardR}:
case reqCh <- &queryRangeJob{req: shardR, samplingRate: samplingRate}:
case <-stopCh:
return
}
Expand All @@ -321,7 +373,7 @@ func (s *queryRangeSharder) backendRange(now time.Time, start, end uint64, query
return start, end
}

func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest) *queryRangeJob {
func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest, samplingRate float64) *queryRangeJob {
now := time.Now()
cutoff := uint64(now.Add(-s.cfg.QueryBackendAfter).UnixNano())

Expand All @@ -339,11 +391,21 @@ func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest
return nil
}

// Shard 0 indicates generator request
searchReq.ShardID = 0
searchReq.ShardCount = 0
searchReq.QueryMode = querier.QueryModeRecent

// No sharding on the generators (unnecessary), but we do apply sampling
// rates. In this case we execute a single arbitrary shard. Choosing
// the last shard works. The first shard should be avoided because it is
// weighted slightly off due to int63/128 sharding boundaries.
searchReq.ShardID = uint32(1.0 / samplingRate)
searchReq.ShardCount = uint32(1.0 / samplingRate)

// Set final sampling rate after integer rounding
samplingRate = 1.0 / float64(searchReq.ShardCount)

return &queryRangeJob{
req: searchReq,
req: searchReq,
samplingRate: samplingRate,
}
}

Expand Down Expand Up @@ -470,6 +532,7 @@ type PromResult struct {
}

type queryRangeJob struct {
req tempopb.QueryRangeRequest
err error
req tempopb.QueryRangeRequest
err error
samplingRate float64
}
1 change: 1 addition & 0 deletions modules/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
QueryModeIngesters = "ingesters"
QueryModeBlocks = "blocks"
QueryModeAll = "all"
QueryModeRecent = "recent"
)

// TraceByIDHandler is a http.HandlerFunc to retrieve traces
Expand Down
10 changes: 3 additions & 7 deletions modules/querier/querier_query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

func (q *Querier) QueryRange(ctx context.Context, req *tempopb.QueryRangeRequest) (*tempopb.QueryRangeResponse, error) {
if req.ShardCount == 0 {
if req.QueryMode == QueryModeRecent {
return q.queryRangeRecent(ctx, req)
}

Expand Down Expand Up @@ -49,14 +49,10 @@ func (q *Querier) queryRangeRecent(ctx context.Context, req *tempopb.QueryRangeR

c := traceql.QueryRangeCombiner{}
for _, result := range lookupResults {
c.Combine(result.response.(*tempopb.QueryRangeResponse).Series)
c.Combine(result.response.(*tempopb.QueryRangeResponse))
}

resp := &tempopb.QueryRangeResponse{
Series: c.Results(),
}

return resp, nil
return c.Response(), nil
}

func (q *Querier) queryBackend(ctx context.Context, req *tempopb.QueryRangeRequest) (*tempopb.QueryRangeResponse, error) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ func ParseQueryRangeRequest(r *http.Request) (*tempopb.QueryRangeRequest, error)
}

req.Query = r.Form.Get("query")
req.QueryMode = r.Form.Get(QueryModeKey)

start, end, _ := bounds(r)
req.Start = uint64(start.UnixNano())
Expand Down Expand Up @@ -485,6 +486,7 @@ func BuildQueryRangeRequest(req *http.Request, searchReq *tempopb.QueryRangeRequ
q.Set(urlParamStep, time.Duration(searchReq.Step).String())
q.Set(urlParamShard, strconv.FormatUint(uint64(searchReq.ShardID), 10))
q.Set(urlParamShardCount, strconv.FormatUint(uint64(searchReq.ShardCount), 10))
q.Set(QueryModeKey, searchReq.QueryMode)

if len(searchReq.Query) > 0 {
q.Set(urlParamQuery, searchReq.Query)
Expand Down
Loading

0 comments on commit 2ca7265

Please sign in to comment.