Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip #4668

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft

wip #4668

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/tempo-cli/cmd-query-blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ func queryBlock(ctx context.Context, r backend.Reader, _ backend.Compactor, bloc
searchOpts := common.SearchOptions{}
tempodb.SearchConfig{}.ApplyToOptions(&searchOpts)

trace, err := block.FindTraceByID(ctx, traceID, searchOpts)
res, err := block.FindTraceByID(ctx, traceID, searchOpts)
if err != nil {
return nil, err
}

trace := res.Trace
if trace == nil {
return nil, nil
}
Expand Down
4 changes: 4 additions & 0 deletions modules/frontend/combiner/trace_by_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func NewTraceByID(maxBytes int, contentType string) Combiner {
}
}

func NewTypedTraceByID(maxBytes int, contentType string) GRPCCombiner[*tempopb.TraceByIDResponse] {
return NewTraceByID(maxBytes, contentType).(GRPCCombiner[*tempopb.TraceByIDResponse])
}

func (c *traceByIDCombiner) AddResponse(r PipelineResponse) error {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down
9 changes: 9 additions & 0 deletions modules/frontend/combiner/trace_by_id_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,22 @@ import (
"github.com/grafana/tempo/pkg/tempopb"
)

func NewTypedTraceByIDV2(maxBytes int, marshalingFormat string) GRPCCombiner[*tempopb.TraceByIDResponse] {
return NewTraceByIDV2(maxBytes, marshalingFormat).(GRPCCombiner[*tempopb.TraceByIDResponse])
}

func NewTraceByIDV2(maxBytes int, marshalingFormat string) Combiner {
combiner := trace.NewCombiner(maxBytes, true)
var partialTrace bool
var inspectedBytes uint64
gc := &genericCombiner[*tempopb.TraceByIDResponse]{
combine: func(partial *tempopb.TraceByIDResponse, _ *tempopb.TraceByIDResponse, _ PipelineResponse) error {
if partial.Status == tempopb.TraceByIDResponse_PARTIAL {
partialTrace = true
}
if partial.Metrics != nil {
inspectedBytes += partial.Metrics.InspectedBytes
}
_, err := combiner.Consume(partial.Trace)
return err
},
Expand All @@ -28,6 +36,7 @@ func NewTraceByIDV2(maxBytes int, marshalingFormat string) Combiner {
deduper := newDeduper()
traceResult = deduper.dedupe(traceResult)
resp.Trace = traceResult
resp.Metrics = &tempopb.TraceByIDMetrics{InspectedBytes: inspectedBytes}

if partialTrace || combiner.IsPartialTrace() {
resp.Status = tempopb.TraceByIDResponse_PARTIAL
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
next)

traces := newTraceIDHandler(cfg, tracePipeline, o, combiner.NewTraceByID, logger)
tracesV2 := newTraceIDHandler(cfg, tracePipeline, o, combiner.NewTraceByIDV2, logger)
traces := newTraceIDHandler(cfg, tracePipeline, o, combiner.NewTypedTraceByID, logger)
tracesV2 := newTraceIDHandler(cfg, tracePipeline, o, combiner.NewTypedTraceByIDV2, logger)
search := newSearchHTTPHandler(cfg, searchPipeline, logger)
searchTags := newTagsHTTPHandler(cfg, searchTagsPipeline, o, logger)
searchTagsV2 := newTagsV2HTTPHandler(cfg, searchTagsPipeline, o, logger)
Expand Down
9 changes: 5 additions & 4 deletions modules/frontend/slos.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ var (
NativeHistogramMinResetDuration: 1 * time.Hour,
}, []string{"tenant", "op"})

searchThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": searchOp})
metadataThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": metadataOp})
metricsThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": metricsOp})
traceByIDThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": traceByIDOp})
searchThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": searchOp})
metadataThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": metadataOp})
metricsThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": metricsOp})
)

type (
Expand All @@ -72,7 +73,7 @@ type (

// todo: remove post hooks and implement as a handler
func traceByIDSLOPostHook(cfg SLOConfig) handlerPostHook {
return sloHook(traceByIDCounter, sloTraceByIDCounter, nil, cfg)
return sloHook(traceByIDCounter, sloTraceByIDCounter, traceByIDThroughput, cfg)
}

func searchSLOPostHook(cfg SLOConfig) handlerPostHook {
Expand Down
12 changes: 10 additions & 2 deletions modules/frontend/traceid_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"strings"
"time"

"github.com/grafana/tempo/pkg/tempopb"

"github.com/go-kit/log"
"github.com/go-kit/log/level" //nolint:all //deprecated
"github.com/grafana/dskit/user"
Expand All @@ -16,7 +18,7 @@ import (
)

// newTraceIDHandler creates a http.handler for trace by id requests
func newTraceIDHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], o overrides.Interface, combinerFn func(int, string) combiner.Combiner, logger log.Logger) http.RoundTripper {
func newTraceIDHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], o overrides.Interface, combinerFn func(int, string) combiner.GRPCCombiner[*tempopb.TraceByIDResponse], logger log.Logger) http.RoundTripper {
postSLOHook := traceByIDSLOPostHook(cfg.TraceByID.SLO)

return RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
Expand Down Expand Up @@ -71,7 +73,13 @@ func newTraceIDHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.Pipe
resp, err := rt.RoundTrip(req)
elapsed := time.Since(start)

postSLOHook(resp, tenant, 0, elapsed, err)
var bytesProcessed uint64
findResp, _ := comb.GRPCFinal()
if findResp != nil && findResp.Metrics != nil {
bytesProcessed = findResp.Metrics.InspectedBytes
}

postSLOHook(resp, tenant, bytesProcessed, elapsed, err)

level.Info(logger).Log(
"msg", "trace id response",
Expand Down
8 changes: 2 additions & 6 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,16 +347,12 @@ func (i *Ingester) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDRequ
return &tempopb.TraceByIDResponse{}, nil
}

trace, err := inst.FindTraceByID(ctx, req.TraceID, req.AllowPartialTrace)
res, err = inst.FindTraceByID(ctx, req.TraceID, req.AllowPartialTrace)
if err != nil {
return nil, err
}

span.AddEvent("trace found", oteltrace.WithAttributes(attribute.Bool("found", trace != nil)))

res = &tempopb.TraceByIDResponse{
Trace: trace,
}
span.AddEvent("trace found", oteltrace.WithAttributes(attribute.Bool("found", res.Trace != nil)))

return res, nil
}
Expand Down
21 changes: 16 additions & 5 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,12 +402,13 @@ func (i *instance) ClearFlushedBlocks(completeBlockTimeout time.Duration) error
return err
}

func (i *instance) FindTraceByID(ctx context.Context, id []byte, allowPartialTrace bool) (*tempopb.Trace, error) {
func (i *instance) FindTraceByID(ctx context.Context, id []byte, allowPartialTrace bool) (*tempopb.TraceByIDResponse, error) {
ctx, span := tracer.Start(ctx, "instance.FindTraceByID")
defer span.End()

var err error
var completeTrace *tempopb.Trace
metrics := tempopb.TraceByIDMetrics{}

// live traces
i.tracesMtx.Lock()
Expand All @@ -417,6 +418,9 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte, allowPartialTra
i.tracesMtx.Unlock()
return nil, fmt.Errorf("unable to unmarshal liveTrace: %w", err)
}
for _, b := range liveTrace.batches {
metrics.InspectedBytes += uint64(len(b))
}
}
i.tracesMtx.Unlock()

Expand All @@ -436,10 +440,11 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte, allowPartialTra
if err != nil {
return nil, fmt.Errorf("headBlock.FindTraceByID failed: %w", err)
}
_, err = combiner.Consume(tr)
_, err = combiner.Consume(tr.Trace)
if err != nil {
return nil, err
}
metrics.InspectedBytes += tr.Metrics.InspectedBytes

i.blocksMtx.RLock()
defer i.blocksMtx.RUnlock()
Expand All @@ -450,10 +455,11 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte, allowPartialTra
if err != nil {
return nil, fmt.Errorf("completingBlock.FindTraceByID failed: %w", err)
}
_, err = combiner.Consume(tr)
_, err = combiner.Consume(tr.Trace)
if err != nil {
return nil, err
}
metrics.InspectedBytes += tr.Metrics.InspectedBytes
}

// completeBlock
Expand All @@ -462,14 +468,19 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte, allowPartialTra
if err != nil {
return nil, fmt.Errorf("completeBlock.FindTraceByID failed: %w", err)
}
_, err = combiner.Consume(found)
_, err = combiner.Consume(found.Trace)
if err != nil {
return nil, err
}
metrics.InspectedBytes += tr.Metrics.InspectedBytes
}

result, _ := combiner.Result()
return result, nil
response := &tempopb.TraceByIDResponse{
Trace: result,
Metrics: &metrics,
}
return response, nil
}

// AddCompletingBlock adds an AppendBlock directly to the slice of completing blocks.
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/local_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewLocalBlock(ctx context.Context, existingBlock common.BackendBlock, l *lo
return c
}

func (c *LocalBlock) FindTraceByID(ctx context.Context, id common.ID, opts common.SearchOptions) (*tempopb.Trace, error) {
func (c *LocalBlock) FindTraceByID(ctx context.Context, id common.ID, opts common.SearchOptions) (*tempopb.TraceByIDResponse, error) {
ctx, span := tracer.Start(ctx, "LocalBlock.FindTraceByID")
defer span.End()
return c.BackendBlock.FindTraceByID(ctx, id, opts)
Expand Down
9 changes: 7 additions & 2 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque

maxBytes := q.limits.MaxBytesPerTrace(userID)
combiner := trace.NewCombiner(maxBytes, req.AllowPartialTrace)
mc := collector.NewMetricsCollector()

if req.QueryMode == QueryModeIngesters || req.QueryMode == QueryModeAll {
var getRSFn replicationSetFn
Expand Down Expand Up @@ -243,6 +244,7 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
spanCountTotal.Add(int64(spanCount))
traceCountTotal.Inc()
found.Store(true)
mc.Add(resp.Metrics.InspectedBytes)
}
return nil
}
Expand Down Expand Up @@ -279,17 +281,20 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque
attribute.Int("foundPartialTraces", len(partialTraces))))

for _, partialTrace := range partialTraces {
_, err = combiner.Consume(partialTrace)
_, err = combiner.Consume(partialTrace.Trace)
if err != nil {
return nil, err
}
if partialTrace.Metrics != nil {
mc.Add(partialTrace.Metrics.InspectedBytes)
}
}
}

completeTrace, _ := combiner.Result()
resp := &tempopb.TraceByIDResponse{
Trace: completeTrace,
Metrics: &tempopb.TraceByIDMetrics{},
Metrics: &tempopb.TraceByIDMetrics{InspectedBytes: mc.TotalValue()},
}

if combiner.IsPartialTrace() {
Expand Down
Loading
Loading