Skip to content

Commit

Permalink
Merge branch 'main' into more_fast_collectors
Browse files Browse the repository at this point in the history
  • Loading branch information
electron0zero authored Sep 27, 2024
2 parents 40bd11f + efb69a6 commit 78321fb
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 17 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
## main / unreleased

* [ENHANCEMENT] Speedup DistinctString and ScopedDistinctString collectors [#4109](https://github.com/grafana/tempo/pull/4109) (@electron0zero)
* [CHANGE] tempo-cli: add support for /api/v2/traces endpoint [#4127](https://github.com/grafana/tempo/pull/4127) (@electron0zero)
**BREAKING CHANGE** The `tempo-cli` now uses the `/api/v2/traces` endpoint by default,
please use `--v1` flag to use `/api/traces` endpoint, which was the default in previous versions.
* [ENHANCEMENT] Speedup collection of results from ingesters in the querier [#4100](https://github.com/grafana/tempo/pull/4100) (@electron0zero)
* [ENHANCEMENT] Speedup DistinctValue collector and exit early for ingesters [#4104](https://github.com/grafana/tempo/pull/4104) (@electron0zero)
* [ENHANCEMENT] Add disk caching in ingester SearchTagValuesV2 for completed blocks [#4069](https://github.com/grafana/tempo/pull/4069) (@electron0zero)
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
* [BUGFIX] Bring back application-json content-type header. [#4121](https://github.com/grafana/tempo/pull/4121) (@javiermolinar)
* [CHANGE] TraceByID: don't allow concurrent_shards greater than query_shards. [#4074](https://github.com/grafana/tempo/pull/4074) (@electron0zero)
* **BREAKING CHANGE** tempo-query is no longer a jaeger instance with grpcPlugin. Its now a standalone server. Serving a grpc api for jaeger on `0.0.0.0:7777` by default. [#3840](https://github.com/grafana/tempo/issues/3840) (@frzifus)
* [CHANGE] **BREAKING CHANGE** The dynamic injection of X-Scope-OrgID header for metrics generator remote-writes is changed. If the header is aleady set in per-tenant overrides or global tempo configuration, then it is honored and not overwritten. [#4021](https://github.com/grafana/tempo/pull/4021) (@mdisibio)
Expand Down
36 changes: 33 additions & 3 deletions cmd/tempo-cli/cmd-query-trace-id.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,54 @@
package main

import (
"fmt"
"os"

"github.com/gogo/protobuf/jsonpb"
"github.com/grafana/tempo/pkg/httpclient"
"github.com/grafana/tempo/pkg/tempopb"
)

type queryTraceIDCmd struct {
APIEndpoint string `arg:"" help:"tempo api endpoint"`
TraceID string `arg:"" help:"trace ID to retrieve"`

V1 bool `name:"v1" help:"Use v1 API /api/traces endpoint"`
OrgID string `help:"optional orgID"`
}

func (cmd *queryTraceIDCmd) Run(_ *globalOptions) error {
client := httpclient.New(cmd.APIEndpoint, cmd.OrgID)

// util.QueryTrace will only add orgID header if len(orgID) > 0
trace, err := client.QueryTrace(cmd.TraceID)

// use v1 API if specified, we default to v2
if cmd.V1 {
trace, err := client.QueryTrace(cmd.TraceID)
if err != nil {
return err
}
return printTrace(trace)
}

traceResp, err := client.QueryTraceV2(cmd.TraceID)
if err != nil {
return err
}
if traceResp.Message != "" {
// print message and status to stderr if there is one.
// allows users to get a clean trace on the stdout, and pipe it to a file or another commands.
_, _ = fmt.Fprintf(os.Stderr, "status: %s , message: %s\n", traceResp.Status, traceResp.Message)
}
return printTrace(traceResp.Trace)
}

return printAsJSON(trace)
func printTrace(trace *tempopb.Trace) error {
// tracebyid endpoints are protobuf, we are using 'gogo/protobuf/jsonpb' to marshal the
// trace to json because 'encoding/json' package can't handle +Inf, -Inf, NaN
marshaller := &jsonpb.Marshaler{}
err := marshaller.Marshal(os.Stdout, trace)
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Failed to marshal trace: %v\n", err)
}
return nil
}
2 changes: 1 addition & 1 deletion cmd/tempo-cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func main() {
ctx := kong.Parse(&cli,
kong.UsageOnError(),
kong.ConfigureHelp(kong.HelpOptions{
// Compact: true,
Compact: true,
}),
)
err := ctx.Run(&cli.globalOptions)
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo-cli/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func loadBlock(r backend.Reader, c backend.Compactor, tenantID string, id uuid.U
}, nil
}

func printAsJSON(value interface{}) error {
func printAsJSON[T any](value T) error {
traceJSON, err := json.Marshal(value)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions docs/sources/tempo/operations/tempo_cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Arguments:

Options:
- `--org-id <value>` Organization ID (for use in multi-tenant setup).
- `--v1` use v1 API (use /api/traces endpoint to fetch traces, default: /api/v2/traces).

**Example:**
```bash
Expand Down
12 changes: 8 additions & 4 deletions modules/frontend/combiner/metrics_query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sort"
"strings"

"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
)
Expand All @@ -17,11 +18,11 @@ func NewQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (Combiner, e
return nil, err
}

return &genericCombiner[*tempopb.QueryRangeResponse]{
c := &genericCombiner[*tempopb.QueryRangeResponse]{
httpStatusCode: 200,
new: func() *tempopb.QueryRangeResponse { return &tempopb.QueryRangeResponse{} },
current: &tempopb.QueryRangeResponse{Metrics: &tempopb.SearchMetrics{}},
combine: func(partial *tempopb.QueryRangeResponse, _ *tempopb.QueryRangeResponse, resp PipelineResponse) error {
combine: func(partial *tempopb.QueryRangeResponse, _ *tempopb.QueryRangeResponse, _ PipelineResponse) error {
if partial.Metrics != nil {
// this is a coordination between the sharder and combiner. the sharder returns one response with summary metrics
// only. the combiner correctly takes and accumulates that job. however, if the response has no jobs this is
Expand Down Expand Up @@ -51,15 +52,18 @@ func NewQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (Combiner, e
sortResponse(resp)
return resp, nil
},
}, nil
}

initHTTPCombiner(c, api.HeaderAcceptJSON)

return c, nil
}

func NewTypedQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (GRPCCombiner[*tempopb.QueryRangeResponse], error) {
c, err := NewQueryRange(req, trackDiffs)
if err != nil {
return nil, err
}

return c.(GRPCCombiner[*tempopb.QueryRangeResponse]), nil
}

Expand Down
5 changes: 4 additions & 1 deletion modules/frontend/combiner/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package combiner
import (
"sort"

"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/search"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
Expand All @@ -15,7 +16,7 @@ func NewSearch(limit int) Combiner {
metadataCombiner := traceql.NewMetadataCombiner()
diffTraces := map[string]struct{}{}

return &genericCombiner[*tempopb.SearchResponse]{
c := &genericCombiner[*tempopb.SearchResponse]{
httpStatusCode: 200,
new: func() *tempopb.SearchResponse { return &tempopb.SearchResponse{} },
current: &tempopb.SearchResponse{Metrics: &tempopb.SearchMetrics{}},
Expand Down Expand Up @@ -96,6 +97,8 @@ func NewSearch(limit int) Combiner {
return metadataCombiner.Count() >= limit
},
}
initHTTPCombiner(c, api.HeaderAcceptJSON)
return c
}

func addRootSpanNotReceivedText(results []*tempopb.TraceSearchMetadata) {
Expand Down
9 changes: 7 additions & 2 deletions modules/frontend/combiner/search_tag_values.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package combiner

import (
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/collector"
"github.com/grafana/tempo/pkg/tempopb"
)
Expand All @@ -14,7 +15,7 @@ func NewSearchTagValues(limitBytes int) Combiner {
// Distinct collector with no limit
d := collector.NewDistinctStringWithDiff(limitBytes)

return &genericCombiner[*tempopb.SearchTagValuesResponse]{
c := &genericCombiner[*tempopb.SearchTagValuesResponse]{
httpStatusCode: 200,
new: func() *tempopb.SearchTagValuesResponse { return &tempopb.SearchTagValuesResponse{} },
current: &tempopb.SearchTagValuesResponse{TagValues: make([]string, 0)},
Expand All @@ -40,6 +41,8 @@ func NewSearchTagValues(limitBytes int) Combiner {
return response, nil
},
}
initHTTPCombiner(c, api.HeaderAcceptJSON)
return c
}

func NewTypedSearchTagValues(limitBytes int) GRPCCombiner[*tempopb.SearchTagValuesResponse] {
Expand All @@ -50,7 +53,7 @@ func NewSearchTagValuesV2(limitBytes int) Combiner {
// Distinct collector with no limit and diff enabled
d := collector.NewDistinctValueWithDiff(limitBytes, func(tv tempopb.TagValue) int { return len(tv.Type) + len(tv.Value) })

return &genericCombiner[*tempopb.SearchTagValuesV2Response]{
c := &genericCombiner[*tempopb.SearchTagValuesV2Response]{
httpStatusCode: 200,
current: &tempopb.SearchTagValuesV2Response{TagValues: []*tempopb.TagValue{}},
new: func() *tempopb.SearchTagValuesV2Response { return &tempopb.SearchTagValuesV2Response{} },
Expand Down Expand Up @@ -85,6 +88,8 @@ func NewSearchTagValuesV2(limitBytes int) Combiner {
return response, nil
},
}
initHTTPCombiner(c, api.HeaderAcceptJSON)
return c
}

func NewTypedSearchTagValuesV2(limitBytes int) GRPCCombiner[*tempopb.SearchTagValuesV2Response] {
Expand Down
9 changes: 7 additions & 2 deletions modules/frontend/combiner/search_tags.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package combiner

import (
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/collector"
"github.com/grafana/tempo/pkg/tempopb"
)
Expand All @@ -13,7 +14,7 @@ var (
func NewSearchTags(limitBytes int) Combiner {
d := collector.NewDistinctStringWithDiff(limitBytes)

return &genericCombiner[*tempopb.SearchTagsResponse]{
c := &genericCombiner[*tempopb.SearchTagsResponse]{
httpStatusCode: 200,
new: func() *tempopb.SearchTagsResponse { return &tempopb.SearchTagsResponse{} },
current: &tempopb.SearchTagsResponse{TagNames: make([]string, 0)},
Expand All @@ -40,6 +41,8 @@ func NewSearchTags(limitBytes int) Combiner {
return response, nil
},
}
initHTTPCombiner(c, api.HeaderAcceptJSON)
return c
}

func NewTypedSearchTags(limitBytes int) GRPCCombiner[*tempopb.SearchTagsResponse] {
Expand All @@ -50,7 +53,7 @@ func NewSearchTagsV2(limitBytes int) Combiner {
// Distinct collector map to collect scopes and scope values
distinctValues := collector.NewScopedDistinctStringWithDiff(limitBytes)

return &genericCombiner[*tempopb.SearchTagsV2Response]{
c := &genericCombiner[*tempopb.SearchTagsV2Response]{
httpStatusCode: 200,
new: func() *tempopb.SearchTagsV2Response { return &tempopb.SearchTagsV2Response{} },
current: &tempopb.SearchTagsV2Response{Scopes: make([]*tempopb.SearchTagsV2Scope, 0)},
Expand Down Expand Up @@ -94,6 +97,8 @@ func NewSearchTagsV2(limitBytes int) Combiner {
return response, nil
},
}
initHTTPCombiner(c, api.HeaderAcceptJSON)
return c
}

func NewTypedSearchTagsV2(limitBytes int) GRPCCombiner[*tempopb.SearchTagsV2Response] {
Expand Down
18 changes: 15 additions & 3 deletions pkg/httpclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
const (
orgIDHeader = "X-Scope-OrgID"

QueryTraceEndpoint = "/api/traces"
QueryTraceEndpoint = "/api/traces"
QueryTraceV2Endpoint = "/api/v2/traces"

acceptHeader = "Accept"
applicationProtobuf = "application/protobuf"
Expand Down Expand Up @@ -95,11 +96,11 @@ func (c *Client) getFor(url string, m proto.Message) (*http.Response, error) {
}

marshallingFormat := applicationJSON
if strings.Contains(url, QueryTraceEndpoint) {
if strings.Contains(url, QueryTraceEndpoint) || strings.Contains(url, QueryTraceV2Endpoint) {
marshallingFormat = applicationProtobuf
}
// Set 'Accept' header to 'application/protobuf'.
// This is required for the /api/traces endpoint to return a protobuf response.
// This is required for the /api/traces and /api/v2/traces endpoint to return a protobuf response.
// JSON lost backwards compatibility with the upgrade to `opentelemetry-proto` v0.18.0.
req.Header.Set(acceptHeader, marshallingFormat)

Expand Down Expand Up @@ -253,7 +254,18 @@ func (c *Client) QueryTrace(id string) (*tempopb.Trace, error) {
}
return nil, err
}
return m, nil
}

func (c *Client) QueryTraceV2(id string) (*tempopb.TraceByIDResponse, error) {
m := &tempopb.TraceByIDResponse{}
resp, err := c.getFor(c.BaseURL+QueryTraceV2Endpoint+"/"+id, m)
if err != nil {
if resp != nil && resp.StatusCode == http.StatusNotFound {
return nil, util.ErrTraceNotFound
}
return nil, err
}
return m, nil
}

Expand Down

0 comments on commit 78321fb

Please sign in to comment.