Skip to content

Commit

Permalink
fix2640: Distributor refactoring to assert ingestion rate limits as e…
Browse files Browse the repository at this point in the history
…arly as possible
  • Loading branch information
mghildiy committed Jul 26, 2023
1 parent 5015f57 commit 4a8e28f
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## v2.2.0-rc0 / 2023-07-21

* [CHANGE] Assert ingestion rate limits as early as possible [#2640](https://github.com/grafana/tempo/pull/2703) (@mghildiy)
* [CHANGE] Make vParquet2 the default block format [#2526](https://github.com/grafana/tempo/pull/2526) (@stoewer)
* [CHANGE] Disable tempo-query by default in Jsonnet libs. [#2462](https://github.com/grafana/tempo/pull/2462) (@electron0zero)
* [CHANGE] Integrate `gofumpt` into CI for formatting requirements [2584](https://github.com/grafana/tempo/pull/2584) (@zalegrala)
Expand Down
63 changes: 34 additions & 29 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,45 @@ func (d *Distributor) stopping(_ error) error {
return services.StopManagerAndAwaitStopped(context.Background(), d.subservices)
}

func (d *Distributor) checkForRateLimits(tracesSize, spanCount int, userID string) error {
now := time.Now()
if !d.ingestionRateLimiter.AllowN(now, userID, tracesSize) {
overrides.RecordDiscardedSpans(spanCount, reasonRateLimited, userID)
return status.Errorf(codes.ResourceExhausted,
"%s ingestion rate limit (%d bytes) exceeded while adding %d bytes",
overrides.ErrorPrefixRateLimited,
int(d.ingestionRateLimiter.Limit(now, userID)),
tracesSize)
}

return nil
}

func (d *Distributor) extractBasicInfo(ctx context.Context, traces ptrace.Traces) (userID string, spanCount, tracesSize int, err error) {
user, e := user.ExtractOrgID(ctx)

return user, traces.SpanCount(), (&ptrace.ProtoMarshaler{}).TracesSize(traces), e
}

// PushTraces pushes a batch of traces
func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*tempopb.PushResponse, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "distributor.PushTraces")
defer span.Finish()

userID, spanCount, size, err := d.extractBasicInfo(ctx, traces)
if err != nil {
// can't record discarded spans here b/c there's no tenant
return nil, err
}
if spanCount == 0 {
return &tempopb.PushResponse{}, nil
}
// check limits
err = d.checkForRateLimits(size, spanCount, userID)
if err != nil {
return nil, err
}

// Convert to bytes and back. This is unfortunate for efficiency, but it works
// around the otel-collector internalization of otel-proto which Tempo also uses.
convert, err := (&ptrace.ProtoMarshaler{}).MarshalTraces(traces)
Expand All @@ -290,12 +324,6 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te

batches := trace.Batches

userID, err := user.ExtractOrgID(ctx)
if err != nil {
// can't record discarded spans here b/c there's no tenant
return nil, err
}

if d.cfg.LogReceivedSpans.Enabled || d.cfg.LogReceivedTraces {
if d.cfg.LogReceivedSpans.IncludeAllAttributes {
logSpansWithAllAttributes(batches, d.cfg.LogReceivedSpans.FilterByStatusError, d.logger)
Expand All @@ -304,32 +332,9 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te
}
}

// metric size
size := 0
spanCount := 0
for _, b := range batches {
size += b.Size()
for _, ils := range b.ScopeSpans {
spanCount += len(ils.Spans)
}
}
if spanCount == 0 {
return &tempopb.PushResponse{}, nil
}
metricBytesIngested.WithLabelValues(userID).Add(float64(size))
metricSpansIngested.WithLabelValues(userID).Add(float64(spanCount))

// check limits
now := time.Now()
if !d.ingestionRateLimiter.AllowN(now, userID, size) {
overrides.RecordDiscardedSpans(spanCount, reasonRateLimited, userID)
return nil, status.Errorf(codes.ResourceExhausted,
"%s ingestion rate limit (%d bytes) exceeded while adding %d bytes",
overrides.ErrorPrefixRateLimited,
int(d.ingestionRateLimiter.Limit(now, userID)),
size)
}

keys, rebatchedTraces, err := requestsByTraceID(batches, userID, spanCount)
if err != nil {
overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID)
Expand Down
40 changes: 40 additions & 0 deletions modules/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,46 @@ func TestLogSpans(t *testing.T) {
}
}

func TestRateLimitRespected(t *testing.T) {
// prepare test data
limits := overrides.Limits{
IngestionRateStrategy: overrides.LocalIngestionRateStrategy,
IngestionRateLimitBytes: 400,
IngestionBurstSizeBytes: 200,
}
buf := &bytes.Buffer{}
logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf))
d := prepare(t, &limits, nil, logger)
batches := []*v1.ResourceSpans{
makeResourceSpans("test-service", []*v1.ScopeSpans{
makeScope(
makeSpan("0a0102030405060708090a0b0c0d0e0f", "dad44adc9a83b370", "Test Span1", nil,
makeAttribute("tag1", "value1")),
makeSpan("e3210a2b38097332d1fe43083ea93d29", "6c21c48da4dbd1a7", "Test Span2", &v1.Status{Code: v1.Status_STATUS_CODE_ERROR},
makeAttribute("tag1", "value1"),
makeAttribute("tag2", "value2"))),
makeScope(
makeSpan("bb42ec04df789ff04b10ea5274491685", "1b3a296034f4031e", "Test Span3", nil)),
}, makeAttribute("resource_attribute1", "value1")),
makeResourceSpans("test-service2", []*v1.ScopeSpans{
makeScope(
makeSpan("b1c792dea27d511c145df8402bdd793a", "56afb9fe18b6c2d6", "Test Span", &v1.Status{Code: v1.Status_STATUS_CODE_ERROR})),
}, makeAttribute("resource_attribute2", "value2")),
}
traces := batchesToTraces(t, batches)

// invoke unit
_, err := d.PushTraces(ctx, traces)

// validations
if err == nil {
t.Fatal("Expected error")
}
status, ok := status.FromError(err)
assert.True(t, ok)
assert.True(t, status.Code() == codes.ResourceExhausted, "Wrong status code")
}

type logSpan struct {
Msg string `json:"msg"`
Level string `json:"level"`
Expand Down

0 comments on commit 4a8e28f

Please sign in to comment.