Skip to content

Commit

Permalink
feat: expose a Traces api V2 (#3912)
Browse files Browse the repository at this point in the history
* v2 first implementation

* remove tempo config file

* fix linting

* added new trace_by_id combiner

* fix linting

* changelog

* fix flaky test

* pass the combiner to the handler to reuse code

* refactor code by using helper function

* do not overwrite the marshalling format of the struct

* rename marshaling property

* init combiner

* added documentatioN
  • Loading branch information
javiermolinar authored Aug 1, 2024
1 parent 6e9d93a commit 195936e
Show file tree
Hide file tree
Showing 15 changed files with 482 additions and 56 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@
* [ENHANCEMENT] Rename batches property of Trace to ResourceSpans to be OTEL compatible [#3895](https://github.com/grafana/tempo/pull/3895)
* [ENHANCEMENT] Reduce memory consumption of query-frontend[#3888](https://github.com/grafana/tempo/pull/3888) (@joe-elliott)
* [ENHANCEMENT] Reduce log level verbosity for e2e tests[#3900](https://github.com/grafana/tempo/pull/3900) (@javiermolinar)
* [ENHANCEMENT] Support reloading client certificates [#537](https://github.com/grafana/dskit/pull/537) (@rubenvp8510)
* [ENHANCEMENT] Added new Traces api V2[#3912](https://github.com/grafana/tempo/pull/3912) (@javiermolinar)
* [ENHANCEMENT] Update to the latest dskit [#3915](https://github.com/grafana/tempo/pull/3915) (@andreasgerstmayr)

* [BUGFIX] Fix panic in certain metrics queries using `rate()` with `by` [#3847](https://github.com/grafana/tempo/pull/3847) (@stoewer)
* [BUGFIX] Fix double appending the primary iterator on second pass with event iterator [#3903](https://github.com/grafana/tempo/pull/3903) (@ie-pham)
* [BUGFIX] Fix metrics queries when grouping by attributes that may not exist [#3734](https://github.com/grafana/tempo/pull/3734) (@mdisibio)
Expand Down
4 changes: 4 additions & 0 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ func (t *App) initQuerier() (services.Service, error) {
tracesHandler := middleware.Wrap(http.HandlerFunc(t.querier.TraceByIDHandler))
t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathTraces)), tracesHandler)

tracesHandlerV2 := middleware.Wrap(http.HandlerFunc(t.querier.TraceByIDHandlerV2))
t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathTracesV2)), tracesHandlerV2)

searchHandler := t.HTTPAuthMiddleware.Wrap(http.HandlerFunc(t.querier.SearchHandler))
t.Server.HTTPRouter().Handle(path.Join(api.PathPrefixQuerier, addHTTPAPIPrefix(&t.cfg, api.PathSearch)), searchHandler)

Expand Down Expand Up @@ -394,6 +397,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {

// http trace by id endpoint
t.Server.HTTPRouter().Handle(addHTTPAPIPrefix(&t.cfg, api.PathTraces), base.Wrap(queryFrontend.TraceByIDHandler))
t.Server.HTTPRouter().Handle(addHTTPAPIPrefix(&t.cfg, api.PathTracesV2), base.Wrap(queryFrontend.TraceByIDHandlerV2))

// http search endpoints
t.Server.HTTPRouter().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSearch), base.Wrap(queryFrontend.SearchHandler))
Expand Down
48 changes: 47 additions & 1 deletion docs/sources/tempo/api_docs/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,53 @@ frontend.

**Returns**

By default, this endpoint returns [OpenTelemetry](https://github.com/open-telemetry/opentelemetry-proto/tree/main/opentelemetry/proto/trace/v1) JSON,
By default, this endpoint returns a mostly compatible [OpenTelemetry](https://github.com/open-telemetry/opentelemetry-proto/tree/main/opentelemetry/proto/trace/v1) JSON,
but if it can also send OpenTelemetry proto if `Accept: application/protobuf` is passed.


### Query V2

The following request is used to retrieve a trace from the query frontend service in
a microservices deployment or the Tempo endpoint in a monolithic mode deployment.

```
GET /api/v2/traces/<traceid>?start=<start>&end=<end>
```

Parameters:

- `start = (unix epoch seconds)`
Optional. Along with `end` define a time range from which traces should be returned.
- `end = (unix epoch seconds)`
Optional. Along with `start` define a time range from which traces should be returned. Providing both `start` and `end` includes traces for the specified time range only. If the parameters aren't provided then Tempo checks for the trace across all blocks in backend. If the parameters are provided, it only checks in the blocks within the specified time range, this can result in trace not being found or partial results if it doesn't fall in the specified time range.

The following query API is also provided on the querier service for _debugging_ purposes.

```
GET /querier/api/v2/traces/<traceid>?mode=xxxx&blockStart=0000&blockEnd=FFFF&start=<start>&end=<end>
```

Parameters:

- `mode = (blocks|ingesters|all)`
Specifies whether the querier should look for the trace in blocks, ingesters or both (all).
Default = `all`
- `blockStart = (GUID)`
Specifies the blockID start boundary. If specified, the querier only searches blocks with IDs > blockStart.
Default = `00000000-0000-0000-0000-000000000000`
Example: `blockStart=12345678-0000-0000-1235-000001240000`
- `blockEnd = (GUID)`
Specifies the blockID finish boundary. If specified, the querier only searches blocks with IDs < blockEnd.
Default = `FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF`
Example: `blockStart=FFFFFFFF-FFFF-FFFF-FFFF-456787652341`
- `start = (unix epoch seconds)`
Optional. Along with `end` define a time range from which traces should be returned.
- `end = (unix epoch seconds)`
Optional. Along with `start` define a time range from which traces should be returned. Providing both `start` and `end` includes blocks for the specified time range only.

**Returns**

By default, this endpoint returns Query response with a [OpenTelemetry](https://github.com/open-telemetry/opentelemetry-proto/tree/main/opentelemetry/proto/trace/v1) JSON trace,
but if it can also send OpenTelemetry proto if `Accept: application/protobuf` is passed.

### Search
Expand Down
29 changes: 23 additions & 6 deletions modules/frontend/combiner/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,17 @@ type genericCombiner[T TResponse] struct {
diff func(T) (T, error) // currently only implemented by the search combiner. required for streaming
quit func(T) bool

//
// Used to determine the response code and when to stop
httpStatusCode int
httpRespBody string
// Used to marshal the response when using an HTTP Combiner, it doesn't affect for a GRPC combiner.
httpMarshalingFormat string
}

// Init an HTTP combiner with default values. The marshaling format dictates how the response will be marshaled, including the Content-type header.
func initHTTPCombiner[T TResponse](c *genericCombiner[T], marshalingFormat string) {
c.httpStatusCode = 200
c.httpMarshalingFormat = marshalingFormat
}

// AddResponse is used to add a http response to the combiner.
Expand Down Expand Up @@ -129,15 +137,24 @@ func (c *genericCombiner[T]) HTTPFinal() (*http.Response, error) {
return nil, err
}

bodyString, err := new(jsonpb.Marshaler).MarshalToString(final)
if err != nil {
return nil, fmt.Errorf("error marshalling response body: %w", err)
var bodyString string
if c.httpMarshalingFormat == api.HeaderAcceptProtobuf {
buff, err := proto.Marshal(final)
if err != nil {
return nil, fmt.Errorf("error marshalling response body: %w", err)
}
bodyString = string(buff)
} else {
bodyString, err = new(jsonpb.Marshaler).MarshalToString(final)
if err != nil {
return nil, fmt.Errorf("error marshalling response body: %w", err)
}
}

return &http.Response{
StatusCode: c.httpStatusCode,
StatusCode: 200,
Header: http.Header{
api.HeaderContentType: {api.HeaderAcceptJSON},
api.HeaderContentType: {c.httpMarshalingFormat},
},
Body: io.NopCloser(strings.NewReader(bodyString)),
ContentLength: int64(len([]byte(bodyString))),
Expand Down
18 changes: 14 additions & 4 deletions modules/frontend/combiner/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/status"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -84,8 +85,16 @@ func TestErroredResponse(t *testing.T) {
}
}

func TestInitHttpCombiner(t *testing.T) {
combiner := newTestCombiner()

require.Equal(t, 200, combiner.httpStatusCode)
require.Equal(t, api.HeaderAcceptJSON, combiner.httpMarshalingFormat)
}

func TestGenericCombiner(t *testing.T) {
combiner := newTestCombiner()

wg := sync.WaitGroup{}

for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -227,10 +236,9 @@ func (p *testPipelineResponse) RequestData() any {
func newTestCombiner() *genericCombiner[*tempopb.ServiceStats] {
count := 0

return &genericCombiner[*tempopb.ServiceStats]{
httpStatusCode: 200,
new: func() *tempopb.ServiceStats { return &tempopb.ServiceStats{} },
current: nil,
gc := &genericCombiner[*tempopb.ServiceStats]{
new: func() *tempopb.ServiceStats { return &tempopb.ServiceStats{} },
current: nil,
combine: func(_, _ *tempopb.ServiceStats, _ PipelineResponse) error {
count++
return nil
Expand All @@ -251,4 +259,6 @@ func newTestCombiner() *genericCombiner[*tempopb.ServiceStats] {
}, nil
},
}
initHTTPCombiner(gc, api.HeaderAcceptJSON)
return gc
}
4 changes: 2 additions & 2 deletions modules/frontend/combiner/search_tag_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func NewSearchTagValues(limitBytes int) Combiner {
httpStatusCode: 200,
new: func() *tempopb.SearchTagValuesResponse { return &tempopb.SearchTagValuesResponse{} },
current: &tempopb.SearchTagValuesResponse{TagValues: make([]string, 0)},
combine: func(partial, final *tempopb.SearchTagValuesResponse, _ PipelineResponse) error {
combine: func(partial, _ *tempopb.SearchTagValuesResponse, _ PipelineResponse) error {
for _, v := range partial.TagValues {
d.Collect(v)
}
Expand Down Expand Up @@ -50,7 +50,7 @@ func NewSearchTagValuesV2(limitBytes int) Combiner {
httpStatusCode: 200,
current: &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{}},
new: func() *tempopb.SearchTagValuesV2Response { return &tempopb.SearchTagValuesV2Response{} },
combine: func(partial, final *tempopb.SearchTagValuesV2Response, _ PipelineResponse) error {
combine: func(partial, _ *tempopb.SearchTagValuesV2Response, _ PipelineResponse) error {
for _, v := range partial.TagValues {
d.Collect(*v)
}
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/combiner/search_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func NewSearchTags(limitBytes int) Combiner {
httpStatusCode: 200,
new: func() *tempopb.SearchTagsResponse { return &tempopb.SearchTagsResponse{} },
current: &tempopb.SearchTagsResponse{TagNames: make([]string, 0)},
combine: func(partial, final *tempopb.SearchTagsResponse, _ PipelineResponse) error {
combine: func(partial, _ *tempopb.SearchTagsResponse, _ PipelineResponse) error {
for _, v := range partial.TagNames {
d.Collect(v)
}
Expand Down Expand Up @@ -49,7 +49,7 @@ func NewSearchTagsV2(limitBytes int) Combiner {
httpStatusCode: 200,
new: func() *tempopb.SearchTagsV2Response { return &tempopb.SearchTagsV2Response{} },
current: &tempopb.SearchTagsV2Response{Scopes: make([]*tempopb.SearchTagsV2Scope, 0)},
combine: func(partial, final *tempopb.SearchTagsV2Response, _ PipelineResponse) error {
combine: func(partial, _ *tempopb.SearchTagsV2Response, _ PipelineResponse) error {
for _, res := range partial.GetScopes() {
for _, tag := range res.Tags {
distinctValues.Collect(res.Name, tag)
Expand Down
33 changes: 33 additions & 0 deletions modules/frontend/combiner/trace_by_id_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package combiner

import (
"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempopb"
)

func NewTraceByIDV2(maxBytes int, marshalingFormat string) Combiner {
combiner := trace.NewCombiner(maxBytes)
gc := &genericCombiner[*tempopb.TraceByIDResponse]{
combine: func(partial *tempopb.TraceByIDResponse, _ *tempopb.TraceByIDResponse, _ PipelineResponse) error {
_, err := combiner.Consume(partial.Trace)
return err
},
finalize: func(resp *tempopb.TraceByIDResponse) (*tempopb.TraceByIDResponse, error) {
traceResult, _ := combiner.Result()
if traceResult == nil {
traceResult = &tempopb.Trace{}
}

// dedupe duplicate span ids
deduper := newDeduper()
traceResult = deduper.dedupe(traceResult)

resp.Trace = traceResult
return resp, nil
},
new: func() *tempopb.TraceByIDResponse { return &tempopb.TraceByIDResponse{} },
current: &tempopb.TraceByIDResponse{},
}
initHTTPCombiner(gc, marshalingFormat)
return gc
}
67 changes: 67 additions & 0 deletions modules/frontend/combiner/trace_by_id_v2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package combiner

import (
"bytes"
"io"
"net/http"
"testing"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type MockResponse struct {
resp *http.Response
}

func (m MockResponse) HTTPResponse() *http.Response {
return m.resp
}

func (m MockResponse) RequestData() any {
return nil
}

func TestNewTraceByIDV2(t *testing.T) {
traceResponse := &tempopb.TraceByIDResponse{
Trace: test.MakeTrace(2, []byte{0x01, 0x02}),
Metrics: &tempopb.TraceByIDMetrics{},
}
resBytes, err := proto.Marshal(traceResponse)
require.NoError(t, err)
response := http.Response{
StatusCode: 200,
Header: map[string][]string{
"Content-Type": {"application/protobuf"},
},
Body: io.NopCloser(bytes.NewReader(resBytes)),
}

t.Run("returns a combined trace response as JSON", func(t *testing.T) {
combiner := NewTraceByIDV2(100_000, api.HeaderAcceptJSON)
err = combiner.AddResponse(MockResponse{&response})
require.NoError(t, err)

res, err := combiner.HTTPFinal()
require.NoError(t, err)
assert.Equal(t, "application/json", res.Header.Get("Content-Type"))

actualResp := &tempopb.TraceByIDResponse{}
err = new(jsonpb.Unmarshaler).Unmarshal(res.Body, actualResp)
require.NoError(t, err)
})
t.Run("returns a combined trace response as protobuff", func(t *testing.T) {
combiner := NewTraceByIDV2(100_000, api.HeaderAcceptProtobuf)
err = combiner.AddResponse(MockResponse{&response})
require.NoError(t, err)

res, err := combiner.HTTPFinal()
require.NoError(t, err)
require.NotNil(t, res)
})
}
26 changes: 14 additions & 12 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ type (
)

type QueryFrontend struct {
TraceByIDHandler, SearchHandler, MetricsSummaryHandler, MetricsQueryInstantHandler, MetricsQueryRangeHandler http.Handler
SearchTagsHandler, SearchTagsV2Handler, SearchTagsValuesHandler, SearchTagsValuesV2Handler http.Handler
cacheProvider cache.Provider
streamingSearch streamingSearchHandler
streamingTags streamingTagsHandler
streamingTagsV2 streamingTagsV2Handler
streamingTagValues streamingTagValuesHandler
streamingTagValuesV2 streamingTagValuesV2Handler
streamingQueryRange streamingQueryRangeHandler
streamingQueryInstant streamingQueryInstantHandler
logger log.Logger
TraceByIDHandler, TraceByIDHandlerV2, SearchHandler, MetricsSummaryHandler, MetricsQueryInstantHandler, MetricsQueryRangeHandler http.Handler
SearchTagsHandler, SearchTagsV2Handler, SearchTagsValuesHandler, SearchTagsValuesV2Handler http.Handler
cacheProvider cache.Provider
streamingSearch streamingSearchHandler
streamingTags streamingTagsHandler
streamingTagsV2 streamingTagsV2Handler
streamingTagValues streamingTagValuesHandler
streamingTagValuesV2 streamingTagValuesV2Handler
streamingQueryRange streamingQueryRangeHandler
streamingQueryInstant streamingQueryInstantHandler
logger log.Logger
}

// New returns a new QueryFrontend
Expand Down Expand Up @@ -141,7 +141,8 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
next)

traces := newTraceIDHandler(cfg, o, tracePipeline, logger)
traces := newTraceIDHandler(cfg, tracePipeline, o, combiner.NewTraceByID, logger)
tracesV2 := newTraceIDHandler(cfg, tracePipeline, o, combiner.NewTraceByIDV2, logger)
search := newSearchHTTPHandler(cfg, searchPipeline, logger)
searchTags := newTagHTTPHandler(cfg, searchTagsPipeline, o, combiner.NewSearchTags, logger)
searchTagsV2 := newTagHTTPHandler(cfg, searchTagsPipeline, o, combiner.NewSearchTagsV2, logger)
Expand All @@ -154,6 +155,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
return &QueryFrontend{
// http/discrete
TraceByIDHandler: newHandler(cfg.Config.LogQueryRequestHeaders, traces, logger),
TraceByIDHandlerV2: newHandler(cfg.Config.LogQueryRequestHeaders, tracesV2, logger),
SearchHandler: newHandler(cfg.Config.LogQueryRequestHeaders, search, logger),
SearchTagsHandler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTags, logger),
SearchTagsV2Handler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTagsV2, logger),
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/pipeline/collector_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"net/http"
"sync"

"github.com/grafana/tempo/modules/frontend/combiner"
"go.uber.org/atomic"

"github.com/grafana/tempo/modules/frontend/combiner"
)

type httpCollector struct {
Expand Down Expand Up @@ -43,7 +44,6 @@ func (r httpCollector) RoundTrip(req *http.Request) (*http.Response, error) {
if err != nil {
return nil, err
}

return r.combiner.HTTPFinal()
}

Expand Down
Loading

0 comments on commit 195936e

Please sign in to comment.