Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce max span attribute size #4335

Merged
merged 13 commits into from
Jan 2, 2025
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ querier:
* [ENHANCEMENT] Update golang.org/x/crypto [#4474](https://github.com/grafana/tempo/pull/4474) (@javiermolinar)
* [ENHANCEMENT] Distributor shim: add test verifying receiver works (including metrics) [#4477](https://github.com/grafana/tempo/pull/4477) (@yvrhdn)
* [ENHANCEMENT] Reduce goroutines in all non-querier components. [#4484](https://github.com/grafana/tempo/pull/4484) (@joe-elliott)
* [ENHANCEMENT] Add option to enforce max span attribute size [#4335](https://github.com/grafana/tempo/pull/4335) (@ie-pham)
* [BUGFIX] Handle invalid TraceQL query filter in tag values v2 disk cache [#4392](https://github.com/grafana/tempo/pull/4392) (@electron0zero)
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
Expand Down
5 changes: 5 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ distributor:
# instruct the client how to retry.
[retry_after_on_resource_exhausted: <duration> | default = '0' ]

# Optional
# Configures the max size an attribute can be. Any key or value that exceeds this limit will be truncated before storing
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
# Setting this parameter to '0' would disable this check against attribute size
[max_span_attr_byte: <int> | default = '2048']

# Optional.
# Configures usage trackers in the distributor which expose metrics of ingested traffic grouped by configurable
# attributes exposed on /usage_metrics.
Expand Down
1 change: 1 addition & 0 deletions docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ distributor:
stale_duration: 15m0s
extend_writes: true
retry_after_on_resource_exhausted: 0s
max_span_attr_byte: 2048
ingester_client:
pool_config:
checkinterval: 15s
Expand Down
4 changes: 4 additions & 0 deletions modules/distributor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type Config struct {

// For testing.
factory ring_client.PoolAddrFunc `yaml:"-"`

MaxSpanAttrByte int `yaml:"max_span_attr_byte"`
}

type LogSpansConfig struct {
Expand All @@ -74,6 +76,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.OverrideRingKey = distributorRingKey
cfg.ExtendWrites = true

cfg.MaxSpanAttrByte = 2048 // 2KB

f.BoolVar(&cfg.LogReceivedSpans.Enabled, util.PrefixConfig(prefix, "log-received-spans.enabled"), false, "Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.")
f.BoolVar(&cfg.LogReceivedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-received-spans.include-attributes"), false, "Enable to include span attributes in the logs.")
f.BoolVar(&cfg.LogReceivedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-received-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.")
Expand Down
53 changes: 49 additions & 4 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/usagestats"
tempo_util "github.com/grafana/tempo/pkg/util"
Expand Down Expand Up @@ -112,6 +113,11 @@ var (
Name: "distributor_metrics_generator_clients",
Help: "The current number of metrics-generator clients.",
})
metricAttributesTruncated = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "distributor_attributes_truncated_total",
Help: "The total number of attribute keys or values truncated per tenant",
}, []string{"tenant"})

statBytesReceived = usagestats.NewCounter("distributor_bytes_received")
statSpansReceived = usagestats.NewCounter("distributor_spans_received")
Expand Down Expand Up @@ -378,13 +384,18 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te
d.usage.Observe(userID, batches)
}

keys, rebatchedTraces, err := requestsByTraceID(batches, userID, spanCount)
keys, rebatchedTraces, truncatedAttributeCount, err := requestsByTraceID(batches, userID, spanCount, d.cfg.MaxSpanAttrByte)
if err != nil {
overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID)
logDiscardedResourceSpans(batches, userID, &d.cfg.LogDiscardedSpans, d.logger)
return nil, err
}

if truncatedAttributeCount > 0 {
level.Warn(d.logger).Log("msg", fmt.Sprintf("truncated %d resource/span attributes when adding spans for tenant %s", truncatedAttributeCount, userID))
Copy link
Member

@joe-elliott joe-elliott Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error message doesn't tell us anything that the is not already in the metric. remove? if we add some details that make it worth keeping i think we should consider dropping to debug b/c this could get spammy

metricAttributesTruncated.WithLabelValues(userID).Add(float64(truncatedAttributeCount))
}

err = d.sendToIngestersViaBytes(ctx, userID, spanCount, rebatchedTraces, keys)
if err != nil {
return nil, err
Expand Down Expand Up @@ -525,18 +536,29 @@ func (d *Distributor) UsageTrackerHandler() http.Handler {

// requestsByTraceID takes an incoming tempodb.PushRequest and creates a set of keys for the hash ring
// and traces to pass onto the ingesters.
func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int) ([]uint32, []*rebatchedTrace, error) {
func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, maxSpanAttrSize int) ([]uint32, []*rebatchedTrace, int, error) {
const tracesPerBatch = 20 // p50 of internal env
tracesByID := make(map[uint32]*rebatchedTrace, tracesPerBatch)
truncatedAttributeCount := 0

for _, b := range batches {
spansByILS := make(map[uint32]*v1.ScopeSpans)
// check for large resources for large attributes
if maxSpanAttrSize > 0 && b.Resource != nil {
resourceAttrTruncatedCount := processAttributes(b.Resource.Attributes, maxSpanAttrSize)
truncatedAttributeCount += resourceAttrTruncatedCount
}

for _, ils := range b.ScopeSpans {
for _, span := range ils.Spans {
// check large spans for large attributes
if maxSpanAttrSize > 0 {
spanAttrTruncatedCount := processAttributes(span.Attributes, maxSpanAttrSize)
truncatedAttributeCount += spanAttrTruncatedCount
}
traceID := span.TraceId
if !validation.ValidTraceID(traceID) {
return nil, nil, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit, received %d bits", len(traceID)*8)
return nil, nil, 0, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit, received %d bits", len(traceID)*8)
}

traceKey := tempo_util.TokenFor(userID, traceID)
Expand Down Expand Up @@ -602,7 +624,30 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int
traces = append(traces, r)
}

return keys, traces, nil
return keys, traces, truncatedAttributeCount, nil
}

// find and truncate the span attributes that are too large
func processAttributes(attributes []*v1_common.KeyValue, maxAttrSize int) int {
count := 0
for _, attr := range attributes {
if len(attr.Key) > maxAttrSize {
attr.Key = attr.Key[:maxAttrSize]
count++
}

switch value := attr.GetValue().Value.(type) {
case *v1_common.AnyValue_StringValue:
if len(value.StringValue) > maxAttrSize {
value.StringValue = value.StringValue[:maxAttrSize]
count++
}
default:
continue
}
}

return count
}

// discardedPredicate determines if a trace is discarded based on the number of successful replications.
Expand Down
67 changes: 62 additions & 5 deletions modules/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func TestRequestsByTraceID(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
keys, rebatchedTraces, err := requestsByTraceID(tt.batches, util.FakeTenantID, 1)
keys, rebatchedTraces, _, err := requestsByTraceID(tt.batches, util.FakeTenantID, 1, 1000)
require.Equal(t, len(keys), len(rebatchedTraces))

for i, expectedKey := range tt.expectedKeys {
Expand All @@ -739,9 +739,64 @@ func TestRequestsByTraceID(t *testing.T) {
}
}

func TestProcessAttributes(t *testing.T) {
spanCount := 10
batchCount := 3
trace := test.MakeTraceWithSpanCount(batchCount, spanCount, []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F})

maxAttrByte := 1000
longString := strings.Repeat("t", 1100)

// add long attributes to the resource level
trace.ResourceSpans[0].Resource.Attributes = append(trace.ResourceSpans[0].Resource.Attributes,
test.MakeAttribute("long value", longString),
)
trace.ResourceSpans[0].Resource.Attributes = append(trace.ResourceSpans[0].Resource.Attributes,
test.MakeAttribute(longString, "long key"),
)

// add long attributes to the span level
trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes = append(trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes,
test.MakeAttribute("long value", longString),
)
trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes = append(trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes,
test.MakeAttribute(longString, "long key"),
)

_, rebatchedTrace, truncatedCount, _ := requestsByTraceID(trace.ResourceSpans, "test", spanCount*batchCount, maxAttrByte)
assert.Equal(t, 4, truncatedCount)
for _, rT := range rebatchedTrace {
for _, resource := range rT.trace.ResourceSpans {
// find large resource attributes
for _, attr := range resource.Resource.Attributes {
if attr.Key == "long value" {
assert.Equal(t, longString[:maxAttrByte], attr.Value.GetStringValue())
}
if attr.Value.GetStringValue() == "long key" {
assert.Equal(t, longString[:maxAttrByte], attr.Key)
}
}
// find large span attributes
for _, scope := range resource.ScopeSpans {
for _, span := range scope.Spans {
for _, attr := range span.Attributes {
if attr.Key == "long value" {
assert.Equal(t, longString[:maxAttrByte], attr.Value.GetStringValue())
}
if attr.Value.GetStringValue() == "long key" {
assert.Equal(t, longString[:maxAttrByte], attr.Key)
}
}
}
}

}
}
}

func BenchmarkTestsByRequestID(b *testing.B) {
spansPer := 100
batches := 10
spansPer := 5000
batches := 100
traces := []*tempopb.Trace{
test.MakeTraceWithSpanCount(batches, spansPer, []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}),
test.MakeTraceWithSpanCount(batches, spansPer, []byte{0x0B, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}),
Expand All @@ -757,14 +812,15 @@ func BenchmarkTestsByRequestID(b *testing.B) {
}

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
for _, blerg := range ils {
_, _, err := requestsByTraceID([]*v1.ResourceSpans{
_, _, _, err := requestsByTraceID([]*v1.ResourceSpans{
{
ScopeSpans: blerg,
},
}, "test", spansPer*len(traces))
}, "test", spansPer*len(traces), 5)
require.NoError(b, err)
}
}
Expand Down Expand Up @@ -1631,6 +1687,7 @@ func prepare(t *testing.T, limits overrides.Config, logger kitlog.Logger) (*Dist
})
}

distributorConfig.MaxSpanAttrByte = 1000
distributorConfig.DistributorRing.HeartbeatPeriod = 100 * time.Millisecond
distributorConfig.DistributorRing.InstanceID = strconv.Itoa(rand.Int())
distributorConfig.DistributorRing.KVStore.Mock = nil
Expand Down
4 changes: 2 additions & 2 deletions modules/distributor/forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestForwarder(t *testing.T) {
require.NoError(t, err)

b := test.MakeBatch(10, id)
keys, rebatchedTraces, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10)
keys, rebatchedTraces, _, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10, 1000)
require.NoError(t, err)

o, err := overrides.NewOverrides(oCfg, nil, prometheus.DefaultRegisterer)
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestForwarder_shutdown(t *testing.T) {
require.NoError(t, err)

b := test.MakeBatch(10, id)
keys, rebatchedTraces, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10)
keys, rebatchedTraces, _, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10, 1000)
require.NoError(t, err)

o, err := overrides.NewOverrides(oCfg, nil, prometheus.DefaultRegisterer)
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/test/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ import (
"github.com/stretchr/testify/require"
)

func MakeAttribute(key, value string) *v1_common.KeyValue {
return &v1_common.KeyValue{
Key: key,
Value: &v1_common.AnyValue{
Value: &v1_common.AnyValue_StringValue{
StringValue: value,
},
},
}
}

func MakeSpan(traceID []byte) *v1_trace.Span {
return MakeSpanWithAttributeCount(traceID, rand.Int()%10+1)
}
Expand Down