Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove tolerate_failed_blocks from trace by id path #2416

Merged
merged 8 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
## main / unreleased

* [CHANGE] **Breaking Change** Remove support tolerate_failed_blocks. [#2416](https://github.com/grafana/tempo/pull/2416) (@joe-elliott)
Removed config option:
```
query_frontend:
tolerate_failed_blocks: <int>
```
* [ENHANCEMENT] Add support to filter using negated regex operator `!~` [#2410](https://github.com/grafana/tempo/pull/2410) (@kousikmitra)
* [ENHANCEMENT] Add `prefix` configuration option to `storage.trace.azure` and `storage.trace.gcs` [#2386](https://github.com/grafana/tempo/pull/2386) (@kousikmitra)
* [ENHANCEMENT] Add `prefix` configuration option to `storage.trace.s3` [#2362](https://github.com/grafana/tempo/pull/2362) (@kousikmitra)
* [ENHANCEMENT] Add support for `concurrent_shards` under `trace_by_id` [#2416](https://github.com/grafana/tempo/pull/2416) (@joe-elliott)
```
query_frontend:
trace_by_id:
concurrent_shards: 3
```
* [FEATURE] Add support for `q` query param in `/api/v2/search/<tag.name>/values` to filter results based on a TraceQL query [#2253](https://github.com/grafana/tempo/pull/2253) (@mapno)
* [FEATURE] Add a GRPC streaming endpoint for traceql search [#2366](https://github.com/grafana/tempo/pull/2366) (@joe-elliott)
* [ENHANCEMENT] Add `scope` parameter to `/api/search/tags` [#2282](https://github.com/grafana/tempo/pull/2282) (@joe-elliott)
Expand Down
9 changes: 4 additions & 5 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,6 @@ query_frontend:
# (default: 2)
[max_retries: <int>]

# number of block queries that are tolerated to error before considering the entire query as failed
# numbers greater than 0 make possible for a read to return partial results
# (default: 0)
[tolerate_failed_blocks: <int>]

search:
# Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.
# (default: 2000)
Expand Down Expand Up @@ -417,6 +412,10 @@ query_frontend:
# (default: 50)
[query_shards: <int>]

# The maximum number of shards to execute at once. If set to 0 query_shards is used.
# (default: 0)
[concurrent_shards: <int>]

# If set to a non-zero value, a second request will be issued at the provided duration.
# Recommended to be set to p99 of search requests to reduce long-tail latency.
[hedge_requests_at: <duration> | default = 2s ]
Expand Down
17 changes: 8 additions & 9 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ var (
)

type Config struct {
Config v1.Config `yaml:",inline"`
MaxRetries int `yaml:"max_retries,omitempty"`
TolerateFailedBlocks int `yaml:"tolerate_failed_blocks,omitempty"`
Search SearchConfig `yaml:"search"`
TraceByID TraceByIDConfig `yaml:"trace_by_id"`
Config v1.Config `yaml:",inline"`
MaxRetries int `yaml:"max_retries,omitempty"`
Search SearchConfig `yaml:"search"`
TraceByID TraceByIDConfig `yaml:"trace_by_id"`
}

type SearchConfig struct {
Expand All @@ -31,9 +30,10 @@ type SearchConfig struct {
}

type TraceByIDConfig struct {
QueryShards int `yaml:"query_shards,omitempty"`
Hedging HedgingConfig `yaml:",inline"`
SLO SLOConfig `yaml:",inline"`
QueryShards int `yaml:"query_shards,omitempty"`
ConcurrentShards int `yaml:"concurrent_shards,omitempty"`
Hedging HedgingConfig `yaml:",inline"`
SLO SLOConfig `yaml:",inline"`
}

type HedgingConfig struct {
Expand All @@ -54,7 +54,6 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {

cfg.Config.MaxOutstandingPerTenant = 2000
cfg.MaxRetries = 2
cfg.TolerateFailedBlocks = 0
cfg.Search = SearchConfig{
Sharder: SearchSharderConfig{
QueryBackendAfter: 15 * time.Minute,
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func newTraceByIDMiddleware(cfg Config, logger log.Logger) Middleware {
rt := NewRoundTripper(
next,
newDeduper(logger),
newTraceByIDSharder(cfg.TraceByID.QueryShards, cfg.TolerateFailedBlocks, cfg.TraceByID.SLO, logger),
newTraceByIDSharder(&cfg.TraceByID, logger),
newHedgedRequestWare(cfg.TraceByID.Hedging),
)

Expand Down
45 changes: 15 additions & 30 deletions modules/frontend/tracebyidsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"io"
"math"
"net/http"
Expand All @@ -18,6 +17,7 @@ import (
"github.com/golang/protobuf/proto" //nolint:all //deprecated
"github.com/grafana/tempo/modules/querier"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/boundedwaitgroup"
"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/opentracing/opentracing-go"
Expand All @@ -29,26 +29,22 @@ const (
maxQueryShards = 100_000
)

func newTraceByIDSharder(queryShards, maxFailedBlocks int, sloCfg SLOConfig, logger log.Logger) Middleware {
func newTraceByIDSharder(cfg *TraceByIDConfig, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
return shardQuery{
next: next,
queryShards: queryShards,
sloCfg: sloCfg,
cfg: cfg,
logger: logger,
blockBoundaries: createBlockBoundaries(queryShards - 1), // one shard will be used to query ingesters
maxFailedBlocks: uint32(maxFailedBlocks),
blockBoundaries: createBlockBoundaries(cfg.QueryShards - 1), // one shard will be used to query ingesters
}
})
}

type shardQuery struct {
next http.RoundTripper
queryShards int
sloCfg SLOConfig
cfg *TraceByIDConfig
logger log.Logger
blockBoundaries [][]byte
maxFailedBlocks uint32
}

// RoundTrip implements http.RoundTripper
Expand All @@ -75,11 +71,14 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
}

// execute requests
wg := sync.WaitGroup{}
concurrentShards := uint(s.cfg.QueryShards)
if s.cfg.ConcurrentShards > 0 {
concurrentShards = uint(s.cfg.ConcurrentShards)
}
wg := boundedwaitgroup.New(concurrentShards)
mtx := sync.Mutex{}

var overallError error
var totalFailedBlocks uint32
combiner := trace.NewCombiner()
combiner.Consume(&tempopb.Trace{}) // The query path returns a non-nil result even if no inputs (which is different than other paths which return nil for no inputs)
statusCode := http.StatusNotFound
Expand Down Expand Up @@ -140,14 +139,6 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
return
}

if traceResp.Metrics != nil {
totalFailedBlocks += traceResp.Metrics.FailedBlocks
if totalFailedBlocks > s.maxFailedBlocks {
overallError = fmt.Errorf("too many failed block queries %d (max %d)", totalFailedBlocks, s.maxFailedBlocks)
return
}
}

// if not found bail
if resp.StatusCode == http.StatusNotFound {
return
Expand All @@ -166,10 +157,6 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
return nil, overallError
}

if totalFailedBlocks > 0 {
_ = level.Warn(s.logger).Log("msg", "some blocks failed. returning success due to tolerate_failed_blocks", "failed", totalFailedBlocks, "tolerate_failed_blocks", s.maxFailedBlocks)
}

overallTrace, _ := combiner.Result()
if overallTrace == nil || statusCode != http.StatusOK {
// translate non-404s into 500s. if, for instance, we get a 400 back from an internal component
Expand All @@ -187,19 +174,17 @@ func (s shardQuery) RoundTrip(r *http.Request) (*http.Response, error) {
}

buff, err := proto.Marshal(&tempopb.TraceByIDResponse{
Trace: overallTrace,
Metrics: &tempopb.TraceByIDMetrics{
FailedBlocks: totalFailedBlocks,
},
Trace: overallTrace,
Metrics: &tempopb.TraceByIDMetrics{},
})
if err != nil {
_ = level.Error(s.logger).Log("msg", "error marshalling response to proto", "err", err)
return nil, err
}

// only record metric when it's enabled and within slo
if s.sloCfg.DurationSLO != 0 {
if reqTime < s.sloCfg.DurationSLO {
if s.cfg.SLO.DurationSLO != 0 {
if reqTime < s.cfg.SLO.DurationSLO {
// we are within SLO if query returned 200 within DurationSLO seconds
// TODO: we don't have throughput metrics for TraceByID.
sloTraceByIDCounter.WithLabelValues(tenantID).Inc()
Expand All @@ -225,7 +210,7 @@ func (s *shardQuery) buildShardedRequests(parent *http.Request) ([]*http.Request
return nil, err
}

reqs := make([]*http.Request, s.queryShards)
reqs := make([]*http.Request, s.cfg.QueryShards)
// build sharded block queries
for i := 0; i < len(s.blockBoundaries); i++ {
reqs[i] = parent.Clone(ctx)
Expand Down
123 changes: 61 additions & 62 deletions modules/frontend/tracebyidsharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/go-kit/log"
"github.com/gogo/protobuf/proto"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
)

func TestCreateBlockBoundaries(t *testing.T) {
Expand Down Expand Up @@ -95,7 +97,9 @@ func TestBuildShardedRequests(t *testing.T) {
queryShards := 2

sharder := &shardQuery{
queryShards: queryShards,
cfg: &TraceByIDConfig{
QueryShards: queryShards,
},
blockBoundaries: createBlockBoundaries(queryShards - 1),
}

Expand Down Expand Up @@ -243,77 +247,27 @@ func TestShardingWareDoRequest(t *testing.T) {
err2: errors.New("booo"),
expectedError: errors.New("booo"),
},
{
name: "failedBlocks under: 200+200",
status1: 200,
trace1: trace1,
status2: 200,
trace2: trace2,
failedBlockQueries1: 1,
failedBlockQueries2: 1,
expectedStatus: 200,
expectedTrace: splitTrace,
},
{
name: "failedBlocks over: 200+200",
status1: 200,
trace1: trace1,
status2: 200,
trace2: trace2,
failedBlockQueries1: 0,
failedBlockQueries2: 5,
expectedError: errors.New("too many failed block queries 5 (max 2)"),
},
{
name: "failedBlocks under: 200+404",
status1: 200,
trace1: trace1,
status2: 404,
failedBlockQueries1: 1,
failedBlockQueries2: 0,
expectedStatus: 200,
expectedTrace: trace1,
},
{
name: "failedBlocks under: 404+200",
status1: 200,
trace1: trace1,
status2: 404,
failedBlockQueries1: 0,
failedBlockQueries2: 1,
expectedStatus: 200,
expectedTrace: trace1,
},
{
name: "failedBlocks over: 404+200",
status1: 200,
trace1: trace1,
status2: 404,
failedBlockQueries1: 0,
failedBlockQueries2: 5,
expectedError: errors.New("too many failed block queries 5 (max 2)"),
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
sharder := newTraceByIDSharder(2, 2, testSLOcfg, log.NewNopLogger())
sharder := newTraceByIDSharder(&TraceByIDConfig{
QueryShards: 2,
SLO: testSLOcfg,
}, log.NewNopLogger())

next := RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
var testTrace *tempopb.Trace
var statusCode int
var err error
var failedBlockQueries int
if r.RequestURI == "/querier/api/traces/1234?mode=ingesters" {
testTrace = tc.trace1
statusCode = tc.status1
err = tc.err1
failedBlockQueries = tc.failedBlockQueries1
} else {
testTrace = tc.trace2
err = tc.err2
statusCode = tc.status2
failedBlockQueries = tc.failedBlockQueries2
}

if err != nil {
Expand All @@ -324,17 +278,13 @@ func TestShardingWareDoRequest(t *testing.T) {
if statusCode != 500 {
if testTrace != nil {
resBytes, err = proto.Marshal(&tempopb.TraceByIDResponse{
Trace: testTrace,
Metrics: &tempopb.TraceByIDMetrics{
FailedBlocks: uint32(failedBlockQueries),
},
Trace: testTrace,
Metrics: &tempopb.TraceByIDMetrics{},
})
require.NoError(t, err)
} else {
resBytes, err = proto.Marshal(&tempopb.TraceByIDResponse{
Metrics: &tempopb.TraceByIDMetrics{
FailedBlocks: uint32(failedBlockQueries),
},
Metrics: &tempopb.TraceByIDMetrics{},
})
require.NoError(t, err)
}
Expand Down Expand Up @@ -377,3 +327,52 @@ func TestShardingWareDoRequest(t *testing.T) {
})
}
}

func TestConcurrentShards(t *testing.T) {
concurrency := 2

sharder := newTraceByIDSharder(&TraceByIDConfig{
QueryShards: 20,
ConcurrentShards: concurrency,
SLO: testSLOcfg,
}, log.NewNopLogger())

sawMaxConcurrncy := atomic.NewBool(false)
currentlyExecuting := atomic.NewInt32(0)
next := RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
current := currentlyExecuting.Inc()
if current > int32(concurrency) {
t.Fatal("too many concurrent requests")
}
if current == int32(concurrency) {
// future developer. i'm concerned under pressure this won't be set b/c only 1 request will be executed at a time
// feel free to remove
sawMaxConcurrncy.Store(true)
}

// force concurrency
time.Sleep(100 * time.Millisecond)
resBytes, err := proto.Marshal(&tempopb.TraceByIDResponse{
Trace: &tempopb.Trace{},
Metrics: &tempopb.TraceByIDMetrics{},
})
require.NoError(t, err)

currentlyExecuting.Dec()
return &http.Response{
Body: io.NopCloser(bytes.NewReader(resBytes)),
StatusCode: 200,
}, nil
})

testRT := NewRoundTripper(next, sharder)

req := httptest.NewRequest("GET", "/api/traces/1234", nil)
ctx := req.Context()
ctx = user.InjectOrgID(ctx, "blerg")
req = req.WithContext(ctx)

_, err := testRT.RoundTrip(req)
require.NoError(t, err)
require.True(t, sawMaxConcurrncy.Load())
}
Loading