From 6afedd95e5b94d7f4dcac0175116d305ce4c619e Mon Sep 17 00:00:00 2001 From: Javier Molina Reyes Date: Fri, 27 Sep 2024 16:45:44 +0200 Subject: [PATCH 1/2] fix: bring back content-type header (#4123) * fix: bring back content-type header * init combiner in the proper place --- CHANGELOG.md | 1 + modules/frontend/combiner/metrics_query_range.go | 12 ++++++++---- modules/frontend/combiner/search.go | 5 ++++- modules/frontend/combiner/search_tag_values.go | 9 +++++++-- modules/frontend/combiner/search_tags.go | 9 +++++++-- 5 files changed, 27 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a3fbf3e6ed3..862ebde2c86b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ * [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero) * [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen) * [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott) +* [BUGFIX] Bring back application-json content-type header. [#4121](https://github.com/grafana/tempo/pull/4121) (@javiermolinar) * [CHANGE] TraceByID: don't allow concurrent_shards greater than query_shards. [#4074](https://github.com/grafana/tempo/pull/4074) (@electron0zero) * **BREAKING CHANGE** tempo-query is no longer a jaeger instance with grpcPlugin. Its now a standalone server. Serving a grpc api for jaeger on `0.0.0.0:7777` by default. [#3840](https://github.com/grafana/tempo/issues/3840) (@frzifus) * [CHANGE] **BREAKING CHANGE** The dynamic injection of X-Scope-OrgID header for metrics generator remote-writes is changed. If the header is aleady set in per-tenant overrides or global tempo configuration, then it is honored and not overwritten. [#4021](https://github.com/grafana/tempo/pull/4021) (@mdisibio) diff --git a/modules/frontend/combiner/metrics_query_range.go b/modules/frontend/combiner/metrics_query_range.go index 8288123353dc..d542ece7a75b 100644 --- a/modules/frontend/combiner/metrics_query_range.go +++ b/modules/frontend/combiner/metrics_query_range.go @@ -4,6 +4,7 @@ import ( "sort" "strings" + "github.com/grafana/tempo/pkg/api" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/traceql" ) @@ -17,11 +18,11 @@ func NewQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (Combiner, e return nil, err } - return &genericCombiner[*tempopb.QueryRangeResponse]{ + c := &genericCombiner[*tempopb.QueryRangeResponse]{ httpStatusCode: 200, new: func() *tempopb.QueryRangeResponse { return &tempopb.QueryRangeResponse{} }, current: &tempopb.QueryRangeResponse{Metrics: &tempopb.SearchMetrics{}}, - combine: func(partial *tempopb.QueryRangeResponse, _ *tempopb.QueryRangeResponse, resp PipelineResponse) error { + combine: func(partial *tempopb.QueryRangeResponse, _ *tempopb.QueryRangeResponse, _ PipelineResponse) error { if partial.Metrics != nil { // this is a coordination between the sharder and combiner. the sharder returns one response with summary metrics // only. the combiner correctly takes and accumulates that job. however, if the response has no jobs this is @@ -51,7 +52,11 @@ func NewQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (Combiner, e sortResponse(resp) return resp, nil }, - }, nil + } + + initHTTPCombiner(c, api.HeaderAcceptJSON) + + return c, nil } func NewTypedQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (GRPCCombiner[*tempopb.QueryRangeResponse], error) { @@ -59,7 +64,6 @@ func NewTypedQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (GRPCCo if err != nil { return nil, err } - return c.(GRPCCombiner[*tempopb.QueryRangeResponse]), nil } diff --git a/modules/frontend/combiner/search.go b/modules/frontend/combiner/search.go index e6f72c5777c3..a4c44dad8593 100644 --- a/modules/frontend/combiner/search.go +++ b/modules/frontend/combiner/search.go @@ -3,6 +3,7 @@ package combiner import ( "sort" + "github.com/grafana/tempo/pkg/api" "github.com/grafana/tempo/pkg/search" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/traceql" @@ -15,7 +16,7 @@ func NewSearch(limit int) Combiner { metadataCombiner := traceql.NewMetadataCombiner() diffTraces := map[string]struct{}{} - return &genericCombiner[*tempopb.SearchResponse]{ + c := &genericCombiner[*tempopb.SearchResponse]{ httpStatusCode: 200, new: func() *tempopb.SearchResponse { return &tempopb.SearchResponse{} }, current: &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}}, @@ -96,6 +97,8 @@ func NewSearch(limit int) Combiner { return metadataCombiner.Count() >= limit }, } + initHTTPCombiner(c, api.HeaderAcceptJSON) + return c } func addRootSpanNotReceivedText(results []*tempopb.TraceSearchMetadata) { diff --git a/modules/frontend/combiner/search_tag_values.go b/modules/frontend/combiner/search_tag_values.go index 4cb5b8bf537b..29d8ec4163ff 100644 --- a/modules/frontend/combiner/search_tag_values.go +++ b/modules/frontend/combiner/search_tag_values.go @@ -1,6 +1,7 @@ package combiner import ( + "github.com/grafana/tempo/pkg/api" "github.com/grafana/tempo/pkg/collector" "github.com/grafana/tempo/pkg/tempopb" ) @@ -14,7 +15,7 @@ func NewSearchTagValues(limitBytes int) Combiner { // Distinct collector with no limit d := collector.NewDistinctString(limitBytes) - return &genericCombiner[*tempopb.SearchTagValuesResponse]{ + c := &genericCombiner[*tempopb.SearchTagValuesResponse]{ httpStatusCode: 200, new: func() *tempopb.SearchTagValuesResponse { return &tempopb.SearchTagValuesResponse{} }, current: &tempopb.SearchTagValuesResponse{TagValues: make([]string, 0)}, @@ -36,6 +37,8 @@ func NewSearchTagValues(limitBytes int) Combiner { return response, nil }, } + initHTTPCombiner(c, api.HeaderAcceptJSON) + return c } func NewTypedSearchTagValues(limitBytes int) GRPCCombiner[*tempopb.SearchTagValuesResponse] { @@ -46,7 +49,7 @@ func NewSearchTagValuesV2(limitBytes int) Combiner { // Distinct collector with no limit and diff enabled d := collector.NewDistinctValueWithDiff(limitBytes, func(tv tempopb.TagValue) int { return len(tv.Type) + len(tv.Value) }) - return &genericCombiner[*tempopb.SearchTagValuesV2Response]{ + c := &genericCombiner[*tempopb.SearchTagValuesV2Response]{ httpStatusCode: 200, current: &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{}}, new: func() *tempopb.SearchTagValuesV2Response { return &tempopb.SearchTagValuesV2Response{} }, @@ -78,6 +81,8 @@ func NewSearchTagValuesV2(limitBytes int) Combiner { return response, nil }, } + initHTTPCombiner(c, api.HeaderAcceptJSON) + return c } func NewTypedSearchTagValuesV2(limitBytes int) GRPCCombiner[*tempopb.SearchTagValuesV2Response] { diff --git a/modules/frontend/combiner/search_tags.go b/modules/frontend/combiner/search_tags.go index fda7e3414a26..47af9b5a1e6c 100644 --- a/modules/frontend/combiner/search_tags.go +++ b/modules/frontend/combiner/search_tags.go @@ -1,6 +1,7 @@ package combiner import ( + "github.com/grafana/tempo/pkg/api" "github.com/grafana/tempo/pkg/collector" "github.com/grafana/tempo/pkg/tempopb" ) @@ -13,7 +14,7 @@ var ( func NewSearchTags(limitBytes int) Combiner { d := collector.NewDistinctString(limitBytes) - return &genericCombiner[*tempopb.SearchTagsResponse]{ + c := &genericCombiner[*tempopb.SearchTagsResponse]{ httpStatusCode: 200, new: func() *tempopb.SearchTagsResponse { return &tempopb.SearchTagsResponse{} }, current: &tempopb.SearchTagsResponse{TagNames: make([]string, 0)}, @@ -35,6 +36,8 @@ func NewSearchTags(limitBytes int) Combiner { return response, nil }, } + initHTTPCombiner(c, api.HeaderAcceptJSON) + return c } func NewTypedSearchTags(limitBytes int) GRPCCombiner[*tempopb.SearchTagsResponse] { @@ -45,7 +48,7 @@ func NewSearchTagsV2(limitBytes int) Combiner { // Distinct collector map to collect scopes and scope values distinctValues := collector.NewScopedDistinctString(limitBytes) - return &genericCombiner[*tempopb.SearchTagsV2Response]{ + c := &genericCombiner[*tempopb.SearchTagsV2Response]{ httpStatusCode: 200, new: func() *tempopb.SearchTagsV2Response { return &tempopb.SearchTagsV2Response{} }, current: &tempopb.SearchTagsV2Response{Scopes: make([]*tempopb.SearchTagsV2Scope, 0)}, @@ -86,6 +89,8 @@ func NewSearchTagsV2(limitBytes int) Combiner { return response, nil }, } + initHTTPCombiner(c, api.HeaderAcceptJSON) + return c } func NewTypedSearchTagsV2(limitBytes int) GRPCCombiner[*tempopb.SearchTagsV2Response] { From efb69a6dbcff0f022c751be4e07e14e0137d949f Mon Sep 17 00:00:00 2001 From: Suraj Nath <9503187+electron0zero@users.noreply.github.com> Date: Fri, 27 Sep 2024 21:22:43 +0530 Subject: [PATCH 2/2] tempo-cli: add support for /api/v2/traces endpoint (#4127) * tempo-cli: add support for traces API V2 * update CHANGELOG.md * enable help to be more compact but still fully-specified form * request protobuf for v2 endpoint * default to using v2 endpoint and mark it as breaking change * use generics in printAsJSON * fix: support for +Inf, -Inf and NaN values in trace by id endpoints 'encoding/json' package can't handle +Inf, -Inf, NaN values and will fail to Marshal the response from tracebyid endpoints if response has keys with values of +Inf, -Inf or NaN. gogo/protobuf/jsonpb package correctly handles these values so use that to Marshal and print the response to stdout --- CHANGELOG.md | 3 ++ cmd/tempo-cli/cmd-query-trace-id.go | 36 ++++++++++++++++++++-- cmd/tempo-cli/main.go | 2 +- cmd/tempo-cli/shared.go | 2 +- docs/sources/tempo/operations/tempo_cli.md | 1 + pkg/httpclient/client.go | 18 +++++++++-- 6 files changed, 54 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 862ebde2c86b..b5366d1aa1c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## main / unreleased +* [CHANGE] tempo-cli: add support for /api/v2/traces endpoint [#4127](https://github.com/grafana/tempo/pull/4127) (@electron0zero) + **BREAKING CHANGE** The `tempo-cli` now uses the `/api/v2/traces` endpoint by default, + please use `--v1` flag to use `/api/traces` endpoint, which was the default in previous versions. * [ENHANCEMENT] Speedup collection of results from ingesters in the querier [#4100](https://github.com/grafana/tempo/pull/4100) (@electron0zero) * [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero) * [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero) diff --git a/cmd/tempo-cli/cmd-query-trace-id.go b/cmd/tempo-cli/cmd-query-trace-id.go index 1c69a98f194a..dfd91fc5ac50 100644 --- a/cmd/tempo-cli/cmd-query-trace-id.go +++ b/cmd/tempo-cli/cmd-query-trace-id.go @@ -1,24 +1,54 @@ package main import ( + "fmt" + "os" + + "github.com/gogo/protobuf/jsonpb" "github.com/grafana/tempo/pkg/httpclient" + "github.com/grafana/tempo/pkg/tempopb" ) type queryTraceIDCmd struct { APIEndpoint string `arg:"" help:"tempo api endpoint"` TraceID string `arg:"" help:"trace ID to retrieve"` + V1 bool `name:"v1" help:"Use v1 API /api/traces endpoint"` OrgID string `help:"optional orgID"` } func (cmd *queryTraceIDCmd) Run(_ *globalOptions) error { client := httpclient.New(cmd.APIEndpoint, cmd.OrgID) - // util.QueryTrace will only add orgID header if len(orgID) > 0 - trace, err := client.QueryTrace(cmd.TraceID) + + // use v1 API if specified, we default to v2 + if cmd.V1 { + trace, err := client.QueryTrace(cmd.TraceID) + if err != nil { + return err + } + return printTrace(trace) + } + + traceResp, err := client.QueryTraceV2(cmd.TraceID) if err != nil { return err } + if traceResp.Message != "" { + // print message and status to stderr if there is one. + // allows users to get a clean trace on the stdout, and pipe it to a file or another commands. + _, _ = fmt.Fprintf(os.Stderr, "status: %s , message: %s\n", traceResp.Status, traceResp.Message) + } + return printTrace(traceResp.Trace) +} - return printAsJSON(trace) +func printTrace(trace *tempopb.Trace) error { + // tracebyid endpoints are protobuf, we are using 'gogo/protobuf/jsonpb' to marshal the + // trace to json because 'encoding/json' package can't handle +Inf, -Inf, NaN + marshaller := &jsonpb.Marshaler{} + err := marshaller.Marshal(os.Stdout, trace) + if err != nil { + _, _ = fmt.Fprintf(os.Stderr, "Failed to marshal trace: %v\n", err) + } + return nil } diff --git a/cmd/tempo-cli/main.go b/cmd/tempo-cli/main.go index cb0a043a638c..3d046e79dd5c 100644 --- a/cmd/tempo-cli/main.go +++ b/cmd/tempo-cli/main.go @@ -94,7 +94,7 @@ func main() { ctx := kong.Parse(&cli, kong.UsageOnError(), kong.ConfigureHelp(kong.HelpOptions{ - // Compact: true, + Compact: true, }), ) err := ctx.Run(&cli.globalOptions) diff --git a/cmd/tempo-cli/shared.go b/cmd/tempo-cli/shared.go index b0b2249b6607..e286326f3d73 100644 --- a/cmd/tempo-cli/shared.go +++ b/cmd/tempo-cli/shared.go @@ -125,7 +125,7 @@ func loadBlock(r backend.Reader, c backend.Compactor, tenantID string, id uuid.U }, nil } -func printAsJSON(value interface{}) error { +func printAsJSON[T any](value T) error { traceJSON, err := json.Marshal(value) if err != nil { return err diff --git a/docs/sources/tempo/operations/tempo_cli.md b/docs/sources/tempo/operations/tempo_cli.md index 7b1087a224b2..695583d07154 100644 --- a/docs/sources/tempo/operations/tempo_cli.md +++ b/docs/sources/tempo/operations/tempo_cli.md @@ -70,6 +70,7 @@ Arguments: Options: - `--org-id ` Organization ID (for use in multi-tenant setup). +- `--v1` use v1 API (use /api/traces endpoint to fetch traces, default: /api/v2/traces). **Example:** ```bash diff --git a/pkg/httpclient/client.go b/pkg/httpclient/client.go index 532f1a6fbfdd..9a29eabcc442 100644 --- a/pkg/httpclient/client.go +++ b/pkg/httpclient/client.go @@ -26,7 +26,8 @@ import ( const ( orgIDHeader = "X-Scope-OrgID" - QueryTraceEndpoint = "/api/traces" + QueryTraceEndpoint = "/api/traces" + QueryTraceV2Endpoint = "/api/v2/traces" acceptHeader = "Accept" applicationProtobuf = "application/protobuf" @@ -95,11 +96,11 @@ func (c *Client) getFor(url string, m proto.Message) (*http.Response, error) { } marshallingFormat := applicationJSON - if strings.Contains(url, QueryTraceEndpoint) { + if strings.Contains(url, QueryTraceEndpoint) || strings.Contains(url, QueryTraceV2Endpoint) { marshallingFormat = applicationProtobuf } // Set 'Accept' header to 'application/protobuf'. - // This is required for the /api/traces endpoint to return a protobuf response. + // This is required for the /api/traces and /api/v2/traces endpoint to return a protobuf response. // JSON lost backwards compatibility with the upgrade to `opentelemetry-proto` v0.18.0. req.Header.Set(acceptHeader, marshallingFormat) @@ -253,7 +254,18 @@ func (c *Client) QueryTrace(id string) (*tempopb.Trace, error) { } return nil, err } + return m, nil +} +func (c *Client) QueryTraceV2(id string) (*tempopb.TraceByIDResponse, error) { + m := &tempopb.TraceByIDResponse{} + resp, err := c.getFor(c.BaseURL+QueryTraceV2Endpoint+"/"+id, m) + if err != nil { + if resp != nil && resp.StatusCode == http.StatusNotFound { + return nil, util.ErrTraceNotFound + } + return nil, err + } return m, nil }