From 67fb5e4f7f3db2035435ac099c750c1e5736735d Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Sun, 17 Nov 2024 06:38:38 -0500 Subject: [PATCH 01/13] enforce max span attribute size --- integration/e2e/config-limits-span-attr.yaml | 44 +++++++++ integration/e2e/limits_test.go | 99 ++++++++++++++++++++ modules/distributor/config.go | 4 + modules/distributor/distributor.go | 30 +++++- modules/distributor/distributor_test.go | 6 +- modules/distributor/forwarder_test.go | 4 +- pkg/util/test/req.go | 11 +++ 7 files changed, 189 insertions(+), 9 deletions(-) create mode 100644 integration/e2e/config-limits-span-attr.yaml diff --git a/integration/e2e/config-limits-span-attr.yaml b/integration/e2e/config-limits-span-attr.yaml new file mode 100644 index 00000000000..bb658297080 --- /dev/null +++ b/integration/e2e/config-limits-span-attr.yaml @@ -0,0 +1,44 @@ +target: all + +server: + http_listen_port: 3200 + +distributor: + retry_after_on_resource_exhausted: 1s + max_span_attr_size: 2000 + receivers: + jaeger: + protocols: + grpc: + otlp: + protocols: + grpc: + http: + +overrides: + defaults: + ingestion: + max_traces_per_user: 10 + rate_limit_bytes: 50000 + burst_size_bytes: 50000 + global: + max_bytes_per_trace: 10000 + +ingester: + lifecycler: + address: 127.0.0.1 + ring: + kvstore: + store: inmemory + replication_factor: 1 + final_sleep: 0s + trace_idle_period: 3600s + +storage: + trace: + backend: local + local: + path: /var/tempo + pool: + max_workers: 10 + queue_depth: 100 diff --git a/integration/e2e/limits_test.go b/integration/e2e/limits_test.go index cc02ca80c22..e2b82e21568 100644 --- a/integration/e2e/limits_test.go +++ b/integration/e2e/limits_test.go @@ -40,6 +40,7 @@ const ( configLimitsQuery = "config-limits-query.yaml" configLimitsPartialError = "config-limits-partial-success.yaml" configLimits429 = "config-limits-429.yaml" + configLimitsSpanAttr = "config-limits-span-attr.yaml" ) func TestLimits(t *testing.T) { @@ -102,6 +103,104 @@ func TestLimits(t *testing.T) { require.NoError(t, err) } +func TestLimitsSpanAttrMaxSize(t *testing.T) { + s, err := e2e.NewScenario("tempo_e2e") + require.NoError(t, err) + defer s.Close() + + require.NoError(t, util2.CopyFileToSharedDir(s, configLimitsSpanAttr, "config.yaml")) + tempo := util2.NewTempoAllInOne() + require.NoError(t, s.StartAndWaitReady(tempo)) + + // otel grpc exporter + exporter, err := util2.NewOtelGRPCExporter(tempo.Endpoint(4317)) + require.NoError(t, err) + + err = exporter.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + + // make request + traceIDs := make([][]byte, 3) + for index := range traceIDs { + traceID := make([]byte, 16) + _, err = crand.Read(traceID) + require.NoError(t, err) + traceIDs[index] = traceID + } + + // traceIDs[0] = trace with default attributes + // traceIDs[1] = trace with attribute slightly less than limit to test boundary + // traceIDs[2] = trace with large attributes + spanCountsByTrace := []int{5, 5, 5} + req := test.MakeReqWithMultipleTraceWithSpanCount(spanCountsByTrace, traceIDs) + + shortString := "" // limit in config is 2000 + for i := 0; i < 1990; i++ { + shortString += "t" + } + + longString := "" // limit in config is 2000 + for i := 0; i < 2002; i++ { + longString += "t" + } + + req.ResourceSpans[1].ScopeSpans[0].Spans[0].Attributes = append(req.ResourceSpans[1].ScopeSpans[0].Spans[0].Attributes, test.MakeAttribute("key", shortString)) + + req.ResourceSpans[2].ScopeSpans[0].Spans[0].Attributes = append(req.ResourceSpans[2].ScopeSpans[0].Spans[0].Attributes, test.MakeAttribute("key", longString)) + req.ResourceSpans[2].ScopeSpans[0].Spans[1].Attributes = append(req.ResourceSpans[2].ScopeSpans[0].Spans[1].Attributes, test.MakeAttribute(longString, "value")) + spanIDShouldBeDiscared1 := req.ResourceSpans[2].ScopeSpans[0].Spans[0].SpanId + spanIDShouldBeDiscared2 := req.ResourceSpans[2].ScopeSpans[0].Spans[1].SpanId + + b, err := req.Marshal() + require.NoError(t, err) + + // unmarshal into otlp proto + traces, err := (&ptrace.ProtoUnmarshaler{}).UnmarshalTraces(b) + require.NoError(t, err) + require.NotNil(t, traces) + + ctx := user.InjectOrgID(context.Background(), tempoUtil.FakeTenantID) + ctx, err = user.InjectIntoGRPCRequest(ctx) + require.NoError(t, err) + + // send traces to tempo + // partial success = no error + err = exporter.ConsumeTraces(ctx, traces) + require.NoError(t, err) + + // shutdown to ensure traces are flushed + require.NoError(t, exporter.Shutdown(context.Background())) + + // query for the trace and make sure the two spans [0,1] did not get ingested + client := httpclient.New("http://"+tempo.Endpoint(3200), tempoUtil.FakeTenantID) + result, err := client.QueryTrace(tempoUtil.TraceIDToHexString(traceIDs[2])) + require.NoError(t, err) + found := false + numSpans := 0 + for _, rs := range result.ResourceSpans { + for _, ss := range rs.ScopeSpans { + for _, span := range ss.Spans { + numSpans++ + if bytes.Equal(span.SpanId, spanIDShouldBeDiscared1) || bytes.Equal(span.SpanId, spanIDShouldBeDiscared2) { + found = true + break + } + } + } + } + + assert.False(t, found, "spans with large attributes should've been discarded") + assert.Equal(t, 3, numSpans, "only 3 spans should've been ingested") + + // test limit metrics + // only the two spans with attr > 2000 bytes should be discarded + err = tempo.WaitSumMetricsWithOptions(e2e.Equals(2), + []string{"tempo_discarded_spans_total"}, + e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "reason", "attribute_too_large")), + ) + require.NoError(t, err) +} + func TestOTLPLimits(t *testing.T) { s, err := e2e.NewScenario("tempo_e2e") require.NoError(t, err) diff --git a/modules/distributor/config.go b/modules/distributor/config.go index 14bb02fa41b..9f9b95d03fd 100644 --- a/modules/distributor/config.go +++ b/modules/distributor/config.go @@ -51,6 +51,8 @@ type Config struct { // For testing. factory ring_client.PoolAddrFunc `yaml:"-"` + + MaxSpanAttrSize int `yaml:"max_span_attr_size"` } type LogSpansConfig struct { @@ -74,6 +76,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.OverrideRingKey = distributorRingKey cfg.ExtendWrites = true + cfg.MaxSpanAttrSize = 1000 // 1KB + f.BoolVar(&cfg.LogReceivedSpans.Enabled, util.PrefixConfig(prefix, "log-received-spans.enabled"), false, "Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.") f.BoolVar(&cfg.LogReceivedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-received-spans.include-attributes"), false, "Enable to include span attributes in the logs.") f.BoolVar(&cfg.LogReceivedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-received-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.") diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index d98dbe9797d..50801f522e5 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -53,6 +53,8 @@ const ( reasonInternalError = "internal_error" // reasonUnknown indicates a pushByte error at the ingester level not related to GRPC reasonUnknown = "unknown_error" + // reasonAttrTooLarge indicates that at least one attribute in the span is too large + reasonAttrTooLarge = "attribute_too_large" distributorRingKey = "distributor" ) @@ -378,13 +380,18 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te d.usage.Observe(userID, batches) } - keys, rebatchedTraces, err := requestsByTraceID(batches, userID, spanCount) + keys, rebatchedTraces, discardedSpansWithLargeAttr, err := requestsByTraceID(batches, userID, spanCount, d.cfg.MaxSpanAttrSize) if err != nil { overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID) logDiscardedResourceSpans(batches, userID, &d.cfg.LogDiscardedSpans, d.logger) return nil, err } + if discardedSpansWithLargeAttr > 0 { + overrides.RecordDiscardedSpans(discardedSpansWithLargeAttr, reasonAttrTooLarge, userID) + level.Warn(d.logger).Log("msg", fmt.Sprintf("discarded %d spans attribute key/value too large when adding to trace for tenant %s", discardedSpansWithLargeAttr, userID)) + } + err = d.sendToIngestersViaBytes(ctx, userID, spanCount, rebatchedTraces, keys) if err != nil { return nil, err @@ -525,18 +532,24 @@ func (d *Distributor) UsageTrackerHandler() http.Handler { // requestsByTraceID takes an incoming tempodb.PushRequest and creates a set of keys for the hash ring // and traces to pass onto the ingesters. -func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int) ([]uint32, []*rebatchedTrace, error) { +func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, max_span_attr_size int) ([]uint32, []*rebatchedTrace, int, error) { const tracesPerBatch = 20 // p50 of internal env tracesByID := make(map[uint32]*rebatchedTrace, tracesPerBatch) + discardedSpans := 0 for _, b := range batches { spansByILS := make(map[uint32]*v1.ScopeSpans) for _, ils := range b.ScopeSpans { for _, span := range ils.Spans { + // drop spans with attributes that are too large + if spanContainsAttributeTooLarge(span, max_span_attr_size) { + discardedSpans++ + continue + } traceID := span.TraceId if !validation.ValidTraceID(traceID) { - return nil, nil, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit, received %d bits", len(traceID)*8) + return nil, nil, 0, status.Errorf(codes.InvalidArgument, "trace ids must be 128 bit, received %d bits", len(traceID)*8) } traceKey := tempo_util.TokenFor(userID, traceID) @@ -602,7 +615,16 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int traces = append(traces, r) } - return keys, traces, nil + return keys, traces, discardedSpans, nil +} + +func spanContainsAttributeTooLarge(span *v1.Span, maxAttrSize int) bool { + for _, attr := range span.Attributes { + if len(attr.Key) > maxAttrSize || attr.Value.Size() > maxAttrSize { + return true + } + } + return false } // discardedPredicate determines if a trace is discarded based on the number of successful replications. diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index ac2f8183316..cc2d715e77d 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -713,7 +713,7 @@ func TestRequestsByTraceID(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - keys, rebatchedTraces, err := requestsByTraceID(tt.batches, util.FakeTenantID, 1) + keys, rebatchedTraces, _, err := requestsByTraceID(tt.batches, util.FakeTenantID, 1, 1000) require.Equal(t, len(keys), len(rebatchedTraces)) for i, expectedKey := range tt.expectedKeys { @@ -760,11 +760,11 @@ func BenchmarkTestsByRequestID(b *testing.B) { for i := 0; i < b.N; i++ { for _, blerg := range ils { - _, _, err := requestsByTraceID([]*v1.ResourceSpans{ + _, _, _, err := requestsByTraceID([]*v1.ResourceSpans{ { ScopeSpans: blerg, }, - }, "test", spansPer*len(traces)) + }, "test", spansPer*len(traces), 1000) require.NoError(b, err) } } diff --git a/modules/distributor/forwarder_test.go b/modules/distributor/forwarder_test.go index 61807712a5f..d5dfbff868d 100644 --- a/modules/distributor/forwarder_test.go +++ b/modules/distributor/forwarder_test.go @@ -28,7 +28,7 @@ func TestForwarder(t *testing.T) { require.NoError(t, err) b := test.MakeBatch(10, id) - keys, rebatchedTraces, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10) + keys, rebatchedTraces, _, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10, 1000) require.NoError(t, err) o, err := overrides.NewOverrides(oCfg, nil, prometheus.DefaultRegisterer) @@ -69,7 +69,7 @@ func TestForwarder_shutdown(t *testing.T) { require.NoError(t, err) b := test.MakeBatch(10, id) - keys, rebatchedTraces, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10) + keys, rebatchedTraces, _, err := requestsByTraceID([]*v1.ResourceSpans{b}, tenantID, 10, 1000) require.NoError(t, err) o, err := overrides.NewOverrides(oCfg, nil, prometheus.DefaultRegisterer) diff --git a/pkg/util/test/req.go b/pkg/util/test/req.go index b33370581d1..969179b1016 100644 --- a/pkg/util/test/req.go +++ b/pkg/util/test/req.go @@ -17,6 +17,17 @@ import ( "github.com/stretchr/testify/require" ) +func MakeAttribute(key, value string) *v1_common.KeyValue { + return &v1_common.KeyValue{ + Key: key, + Value: &v1_common.AnyValue{ + Value: &v1_common.AnyValue_StringValue{ + StringValue: value, + }, + }, + } +} + func MakeSpan(traceID []byte) *v1_trace.Span { return MakeSpanWithAttributeCount(traceID, rand.Int()%10+1) } From c5a304b587252cde1e42c1a20caa519ad965e12f Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Sun, 17 Nov 2024 06:49:07 -0500 Subject: [PATCH 02/13] changelog, lint, docs --- CHANGELOG.md | 1 + docs/sources/tempo/configuration/_index.md | 4 ++++ modules/distributor/config.go | 2 +- modules/distributor/distributor.go | 4 ++-- 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e918d9b5f5..1fa2263434f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -99,6 +99,7 @@ querier: * [ENHANCEMENT] Update golang.org/x/crypto [#4474](https://github.com/grafana/tempo/pull/4474) (@javiermolinar) * [ENHANCEMENT] Distributor shim: add test verifying receiver works (including metrics) [#4477](https://github.com/grafana/tempo/pull/4477) (@yvrhdn) * [ENHANCEMENT] Reduce goroutines in all non-querier components. [#4484](https://github.com/grafana/tempo/pull/4484) (@joe-elliott) +* [ENHANCEMENT] Add option to enforce max span attribute size [#4335](https://github.com/grafana/tempo/pull/4335) (@ie-pham) * [BUGFIX] Handle invalid TraceQL query filter in tag values v2 disk cache [#4392](https://github.com/grafana/tempo/pull/4392) (@electron0zero) * [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen) * [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott) diff --git a/docs/sources/tempo/configuration/_index.md b/docs/sources/tempo/configuration/_index.md index 5e551405883..75e5f737e17 100644 --- a/docs/sources/tempo/configuration/_index.md +++ b/docs/sources/tempo/configuration/_index.md @@ -232,6 +232,10 @@ distributor: # instruct the client how to retry. [retry_after_on_resource_exhausted: | default = '0' ] + # Optional + # Configures the max size a span attribute can be. Any span with at least one attribute over this limit would be discarded with reason "attribute_too_large" + [max_span_attr_size: | default = '10000'] + # Optional. # Configures usage trackers in the distributor which expose metrics of ingested traffic grouped by configurable # attributes exposed on /usage_metrics. diff --git a/modules/distributor/config.go b/modules/distributor/config.go index 9f9b95d03fd..84ee589a181 100644 --- a/modules/distributor/config.go +++ b/modules/distributor/config.go @@ -76,7 +76,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.OverrideRingKey = distributorRingKey cfg.ExtendWrites = true - cfg.MaxSpanAttrSize = 1000 // 1KB + cfg.MaxSpanAttrSize = 10 * 1000 // 10KB f.BoolVar(&cfg.LogReceivedSpans.Enabled, util.PrefixConfig(prefix, "log-received-spans.enabled"), false, "Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.") f.BoolVar(&cfg.LogReceivedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-received-spans.include-attributes"), false, "Enable to include span attributes in the logs.") diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index 50801f522e5..8552701f18d 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -532,7 +532,7 @@ func (d *Distributor) UsageTrackerHandler() http.Handler { // requestsByTraceID takes an incoming tempodb.PushRequest and creates a set of keys for the hash ring // and traces to pass onto the ingesters. -func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, max_span_attr_size int) ([]uint32, []*rebatchedTrace, int, error) { +func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, maxSpanAttrSize int) ([]uint32, []*rebatchedTrace, int, error) { const tracesPerBatch = 20 // p50 of internal env tracesByID := make(map[uint32]*rebatchedTrace, tracesPerBatch) discardedSpans := 0 @@ -543,7 +543,7 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, ma for _, ils := range b.ScopeSpans { for _, span := range ils.Spans { // drop spans with attributes that are too large - if spanContainsAttributeTooLarge(span, max_span_attr_size) { + if spanContainsAttributeTooLarge(span, maxSpanAttrSize) { discardedSpans++ continue } From d9ce7c228783322805cc5d4f80e36420b19fa4f5 Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Mon, 18 Nov 2024 04:22:08 -0500 Subject: [PATCH 03/13] updated test and response --- modules/distributor/distributor.go | 4 ++++ modules/distributor/distributor_test.go | 5 +++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index 8552701f18d..5f1eb12b6a7 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -392,6 +392,10 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te level.Warn(d.logger).Log("msg", fmt.Sprintf("discarded %d spans attribute key/value too large when adding to trace for tenant %s", discardedSpansWithLargeAttr, userID)) } + if discardedSpansWithLargeAttr >= spanCount { + return &tempopb.PushResponse{}, nil + } + err = d.sendToIngestersViaBytes(ctx, userID, spanCount, rebatchedTraces, keys) if err != nil { return nil, err diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index cc2d715e77d..8702d1a74c8 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -740,8 +740,8 @@ func TestRequestsByTraceID(t *testing.T) { } func BenchmarkTestsByRequestID(b *testing.B) { - spansPer := 100 - batches := 10 + spansPer := 1000 + batches := 100 traces := []*tempopb.Trace{ test.MakeTraceWithSpanCount(batches, spansPer, []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}), test.MakeTraceWithSpanCount(batches, spansPer, []byte{0x0B, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}), @@ -1631,6 +1631,7 @@ func prepare(t *testing.T, limits overrides.Config, logger kitlog.Logger) (*Dist }) } + distributorConfig.MaxSpanAttrSize = 1000 distributorConfig.DistributorRing.HeartbeatPeriod = 100 * time.Millisecond distributorConfig.DistributorRing.InstanceID = strconv.Itoa(rand.Int()) distributorConfig.DistributorRing.KVStore.Mock = nil From 532b5fa1f3d68c0ad5fc341b72d4875de86019e8 Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Tue, 19 Nov 2024 03:34:04 -0500 Subject: [PATCH 04/13] update to truncate instead of discard --- docs/sources/tempo/configuration/_index.md | 4 +- integration/e2e/config-limits-span-attr.yaml | 44 --------- integration/e2e/limits_test.go | 98 -------------------- modules/distributor/config.go | 4 +- modules/distributor/distributor.go | 48 ++++++---- modules/distributor/distributor_test.go | 58 +++++++++++- 6 files changed, 90 insertions(+), 166 deletions(-) delete mode 100644 integration/e2e/config-limits-span-attr.yaml diff --git a/docs/sources/tempo/configuration/_index.md b/docs/sources/tempo/configuration/_index.md index 75e5f737e17..4c6148d6480 100644 --- a/docs/sources/tempo/configuration/_index.md +++ b/docs/sources/tempo/configuration/_index.md @@ -233,8 +233,8 @@ distributor: [retry_after_on_resource_exhausted: | default = '0' ] # Optional - # Configures the max size a span attribute can be. Any span with at least one attribute over this limit would be discarded with reason "attribute_too_large" - [max_span_attr_size: | default = '10000'] + # Configures the max size an attribute can be. Any key or value that exceeds this limit will be truncated before storing + [max_span_attr_byte: | default = '2048'] # Optional. # Configures usage trackers in the distributor which expose metrics of ingested traffic grouped by configurable diff --git a/integration/e2e/config-limits-span-attr.yaml b/integration/e2e/config-limits-span-attr.yaml deleted file mode 100644 index bb658297080..00000000000 --- a/integration/e2e/config-limits-span-attr.yaml +++ /dev/null @@ -1,44 +0,0 @@ -target: all - -server: - http_listen_port: 3200 - -distributor: - retry_after_on_resource_exhausted: 1s - max_span_attr_size: 2000 - receivers: - jaeger: - protocols: - grpc: - otlp: - protocols: - grpc: - http: - -overrides: - defaults: - ingestion: - max_traces_per_user: 10 - rate_limit_bytes: 50000 - burst_size_bytes: 50000 - global: - max_bytes_per_trace: 10000 - -ingester: - lifecycler: - address: 127.0.0.1 - ring: - kvstore: - store: inmemory - replication_factor: 1 - final_sleep: 0s - trace_idle_period: 3600s - -storage: - trace: - backend: local - local: - path: /var/tempo - pool: - max_workers: 10 - queue_depth: 100 diff --git a/integration/e2e/limits_test.go b/integration/e2e/limits_test.go index e2b82e21568..d351737a7fc 100644 --- a/integration/e2e/limits_test.go +++ b/integration/e2e/limits_test.go @@ -103,104 +103,6 @@ func TestLimits(t *testing.T) { require.NoError(t, err) } -func TestLimitsSpanAttrMaxSize(t *testing.T) { - s, err := e2e.NewScenario("tempo_e2e") - require.NoError(t, err) - defer s.Close() - - require.NoError(t, util2.CopyFileToSharedDir(s, configLimitsSpanAttr, "config.yaml")) - tempo := util2.NewTempoAllInOne() - require.NoError(t, s.StartAndWaitReady(tempo)) - - // otel grpc exporter - exporter, err := util2.NewOtelGRPCExporter(tempo.Endpoint(4317)) - require.NoError(t, err) - - err = exporter.Start(context.Background(), componenttest.NewNopHost()) - require.NoError(t, err) - - // make request - traceIDs := make([][]byte, 3) - for index := range traceIDs { - traceID := make([]byte, 16) - _, err = crand.Read(traceID) - require.NoError(t, err) - traceIDs[index] = traceID - } - - // traceIDs[0] = trace with default attributes - // traceIDs[1] = trace with attribute slightly less than limit to test boundary - // traceIDs[2] = trace with large attributes - spanCountsByTrace := []int{5, 5, 5} - req := test.MakeReqWithMultipleTraceWithSpanCount(spanCountsByTrace, traceIDs) - - shortString := "" // limit in config is 2000 - for i := 0; i < 1990; i++ { - shortString += "t" - } - - longString := "" // limit in config is 2000 - for i := 0; i < 2002; i++ { - longString += "t" - } - - req.ResourceSpans[1].ScopeSpans[0].Spans[0].Attributes = append(req.ResourceSpans[1].ScopeSpans[0].Spans[0].Attributes, test.MakeAttribute("key", shortString)) - - req.ResourceSpans[2].ScopeSpans[0].Spans[0].Attributes = append(req.ResourceSpans[2].ScopeSpans[0].Spans[0].Attributes, test.MakeAttribute("key", longString)) - req.ResourceSpans[2].ScopeSpans[0].Spans[1].Attributes = append(req.ResourceSpans[2].ScopeSpans[0].Spans[1].Attributes, test.MakeAttribute(longString, "value")) - spanIDShouldBeDiscared1 := req.ResourceSpans[2].ScopeSpans[0].Spans[0].SpanId - spanIDShouldBeDiscared2 := req.ResourceSpans[2].ScopeSpans[0].Spans[1].SpanId - - b, err := req.Marshal() - require.NoError(t, err) - - // unmarshal into otlp proto - traces, err := (&ptrace.ProtoUnmarshaler{}).UnmarshalTraces(b) - require.NoError(t, err) - require.NotNil(t, traces) - - ctx := user.InjectOrgID(context.Background(), tempoUtil.FakeTenantID) - ctx, err = user.InjectIntoGRPCRequest(ctx) - require.NoError(t, err) - - // send traces to tempo - // partial success = no error - err = exporter.ConsumeTraces(ctx, traces) - require.NoError(t, err) - - // shutdown to ensure traces are flushed - require.NoError(t, exporter.Shutdown(context.Background())) - - // query for the trace and make sure the two spans [0,1] did not get ingested - client := httpclient.New("http://"+tempo.Endpoint(3200), tempoUtil.FakeTenantID) - result, err := client.QueryTrace(tempoUtil.TraceIDToHexString(traceIDs[2])) - require.NoError(t, err) - found := false - numSpans := 0 - for _, rs := range result.ResourceSpans { - for _, ss := range rs.ScopeSpans { - for _, span := range ss.Spans { - numSpans++ - if bytes.Equal(span.SpanId, spanIDShouldBeDiscared1) || bytes.Equal(span.SpanId, spanIDShouldBeDiscared2) { - found = true - break - } - } - } - } - - assert.False(t, found, "spans with large attributes should've been discarded") - assert.Equal(t, 3, numSpans, "only 3 spans should've been ingested") - - // test limit metrics - // only the two spans with attr > 2000 bytes should be discarded - err = tempo.WaitSumMetricsWithOptions(e2e.Equals(2), - []string{"tempo_discarded_spans_total"}, - e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "reason", "attribute_too_large")), - ) - require.NoError(t, err) -} - func TestOTLPLimits(t *testing.T) { s, err := e2e.NewScenario("tempo_e2e") require.NoError(t, err) diff --git a/modules/distributor/config.go b/modules/distributor/config.go index 84ee589a181..ed8964cf20d 100644 --- a/modules/distributor/config.go +++ b/modules/distributor/config.go @@ -52,7 +52,7 @@ type Config struct { // For testing. factory ring_client.PoolAddrFunc `yaml:"-"` - MaxSpanAttrSize int `yaml:"max_span_attr_size"` + MaxSpanAttrByte int `yaml:"max_span_attr_byte"` } type LogSpansConfig struct { @@ -76,7 +76,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.OverrideRingKey = distributorRingKey cfg.ExtendWrites = true - cfg.MaxSpanAttrSize = 10 * 1000 // 10KB + cfg.MaxSpanAttrByte = 2048 // 2KB f.BoolVar(&cfg.LogReceivedSpans.Enabled, util.PrefixConfig(prefix, "log-received-spans.enabled"), false, "Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.") f.BoolVar(&cfg.LogReceivedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-received-spans.include-attributes"), false, "Enable to include span attributes in the logs.") diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index 5f1eb12b6a7..73fd043dc67 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -35,6 +35,7 @@ import ( "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/pkg/model" "github.com/grafana/tempo/pkg/tempopb" + v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1" v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" "github.com/grafana/tempo/pkg/usagestats" tempo_util "github.com/grafana/tempo/pkg/util" @@ -380,20 +381,15 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te d.usage.Observe(userID, batches) } - keys, rebatchedTraces, discardedSpansWithLargeAttr, err := requestsByTraceID(batches, userID, spanCount, d.cfg.MaxSpanAttrSize) + keys, rebatchedTraces, truncatedAttributeCount, err := requestsByTraceID(batches, userID, spanCount, d.cfg.MaxSpanAttrByte) if err != nil { overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID) logDiscardedResourceSpans(batches, userID, &d.cfg.LogDiscardedSpans, d.logger) return nil, err } - if discardedSpansWithLargeAttr > 0 { - overrides.RecordDiscardedSpans(discardedSpansWithLargeAttr, reasonAttrTooLarge, userID) - level.Warn(d.logger).Log("msg", fmt.Sprintf("discarded %d spans attribute key/value too large when adding to trace for tenant %s", discardedSpansWithLargeAttr, userID)) - } - - if discardedSpansWithLargeAttr >= spanCount { - return &tempopb.PushResponse{}, nil + if truncatedAttributeCount > 0 { + level.Warn(d.logger).Log("msg", fmt.Sprintf("truncated %d resource/span attributes when adding spans for tenant %s", truncatedAttributeCount, userID)) } err = d.sendToIngestersViaBytes(ctx, userID, spanCount, rebatchedTraces, keys) @@ -539,17 +535,20 @@ func (d *Distributor) UsageTrackerHandler() http.Handler { func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, maxSpanAttrSize int) ([]uint32, []*rebatchedTrace, int, error) { const tracesPerBatch = 20 // p50 of internal env tracesByID := make(map[uint32]*rebatchedTrace, tracesPerBatch) - discardedSpans := 0 + truncatedAttributeCount := 0 for _, b := range batches { spansByILS := make(map[uint32]*v1.ScopeSpans) + // check for large resources for large attributes + if b.Resource.Size() > maxSpanAttrSize { + processAttributes(b.Resource.Attributes, maxSpanAttrSize, &truncatedAttributeCount) + } for _, ils := range b.ScopeSpans { for _, span := range ils.Spans { - // drop spans with attributes that are too large - if spanContainsAttributeTooLarge(span, maxSpanAttrSize) { - discardedSpans++ - continue + // check large spans for large attributes + if span.Size() > maxSpanAttrSize { + processAttributes(span.Attributes, maxSpanAttrSize, &truncatedAttributeCount) } traceID := span.TraceId if !validation.ValidTraceID(traceID) { @@ -619,16 +618,27 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, ma traces = append(traces, r) } - return keys, traces, discardedSpans, nil + return keys, traces, truncatedAttributeCount, nil } -func spanContainsAttributeTooLarge(span *v1.Span, maxAttrSize int) bool { - for _, attr := range span.Attributes { - if len(attr.Key) > maxAttrSize || attr.Value.Size() > maxAttrSize { - return true +// find and truncate the span attributes that are too large +func processAttributes(attributes []*v1_common.KeyValue, maxAttrSize int, count *int) { + for _, attr := range attributes { + if len(attr.Key) > maxAttrSize { + attr.Key = attr.Key[:maxAttrSize-10] + "_truncated" + *count++ + } + + switch value := attr.GetValue().Value.(type) { + case *v1_common.AnyValue_StringValue: + if len(value.StringValue) > maxAttrSize { + value.StringValue = value.StringValue[:maxAttrSize-10] + "_truncated" + *count++ + } + default: + continue } } - return false } // discardedPredicate determines if a trace is discarded based on the number of successful replications. diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index 8702d1a74c8..e5f19c26f54 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -739,6 +739,62 @@ func TestRequestsByTraceID(t *testing.T) { } } +func TestProcessAttributes(t *testing.T) { + spanCount := 10 + batchCount := 3 + trace := test.MakeTraceWithSpanCount(batchCount, spanCount, []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}) + + maxAttrByte := 1000 + longString := strings.Repeat("t", 1100) + + // add long attributes to the resource level + trace.ResourceSpans[0].Resource.Attributes = append(trace.ResourceSpans[0].Resource.Attributes, + test.MakeAttribute("long value", longString), + ) + trace.ResourceSpans[0].Resource.Attributes = append(trace.ResourceSpans[0].Resource.Attributes, + test.MakeAttribute(longString, "long key"), + ) + + // add long attributes to the span level + trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes = append(trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes, + test.MakeAttribute("long value", longString), + ) + trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes = append(trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Attributes, + test.MakeAttribute(longString, "long key"), + ) + + _, rebatchedTrace, truncatedCount, _ := requestsByTraceID(trace.ResourceSpans, "test", spanCount*batchCount, maxAttrByte) + assert.Equal(t, 4, truncatedCount) + for _, rT := range rebatchedTrace { + for _, resource := range rT.trace.ResourceSpans { + // find large resource attributes + for _, attr := range resource.Resource.Attributes { + if attr.Key == "long value" { + assert.Equal(t, longString[:990]+"_truncated", attr.Value.GetStringValue()) + } + if attr.Value.GetStringValue() == "long key" { + assert.Equal(t, longString[:990]+"_truncated", attr.Key) + } + } + // find large span attributes + for _, scope := range resource.ScopeSpans { + for _, span := range scope.Spans { + for _, attr := range span.Attributes { + if attr.Key == "long value" { + assert.Equal(t, longString[:990]+"_truncated", attr.Value.GetStringValue()) + } + if attr.Value.GetStringValue() == "long key" { + assert.Equal(t, longString[:990]+"_truncated", attr.Key) + } + } + } + } + + } + } + +} + func BenchmarkTestsByRequestID(b *testing.B) { spansPer := 1000 batches := 100 @@ -1631,7 +1687,7 @@ func prepare(t *testing.T, limits overrides.Config, logger kitlog.Logger) (*Dist }) } - distributorConfig.MaxSpanAttrSize = 1000 + distributorConfig.MaxSpanAttrByte = 1000 distributorConfig.DistributorRing.HeartbeatPeriod = 100 * time.Millisecond distributorConfig.DistributorRing.InstanceID = strconv.Itoa(rand.Int()) distributorConfig.DistributorRing.KVStore.Mock = nil From a0fe105306cfddb4de8492b9653e6d7b9c31862f Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Tue, 19 Nov 2024 12:02:01 -0500 Subject: [PATCH 05/13] update test --- modules/distributor/distributor.go | 4 ++-- modules/distributor/distributor_test.go | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index 73fd043dc67..9531c14847d 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -625,14 +625,14 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, ma func processAttributes(attributes []*v1_common.KeyValue, maxAttrSize int, count *int) { for _, attr := range attributes { if len(attr.Key) > maxAttrSize { - attr.Key = attr.Key[:maxAttrSize-10] + "_truncated" + attr.Key = attr.Key[:maxAttrSize] + "_truncated" *count++ } switch value := attr.GetValue().Value.(type) { case *v1_common.AnyValue_StringValue: if len(value.StringValue) > maxAttrSize { - value.StringValue = value.StringValue[:maxAttrSize-10] + "_truncated" + value.StringValue = value.StringValue[:maxAttrSize] + "_truncated" *count++ } default: diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index e5f19c26f54..c2b6ed436ec 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -770,10 +770,10 @@ func TestProcessAttributes(t *testing.T) { // find large resource attributes for _, attr := range resource.Resource.Attributes { if attr.Key == "long value" { - assert.Equal(t, longString[:990]+"_truncated", attr.Value.GetStringValue()) + assert.Equal(t, longString[:maxAttrByte]+"_truncated", attr.Value.GetStringValue()) } if attr.Value.GetStringValue() == "long key" { - assert.Equal(t, longString[:990]+"_truncated", attr.Key) + assert.Equal(t, longString[:maxAttrByte]+"_truncated", attr.Key) } } // find large span attributes @@ -781,10 +781,10 @@ func TestProcessAttributes(t *testing.T) { for _, span := range scope.Spans { for _, attr := range span.Attributes { if attr.Key == "long value" { - assert.Equal(t, longString[:990]+"_truncated", attr.Value.GetStringValue()) + assert.Equal(t, longString[:maxAttrByte]+"_truncated", attr.Value.GetStringValue()) } if attr.Value.GetStringValue() == "long key" { - assert.Equal(t, longString[:990]+"_truncated", attr.Key) + assert.Equal(t, longString[:maxAttrByte]+"_truncated", attr.Key) } } } @@ -796,7 +796,7 @@ func TestProcessAttributes(t *testing.T) { } func BenchmarkTestsByRequestID(b *testing.B) { - spansPer := 1000 + spansPer := 5000 batches := 100 traces := []*tempopb.Trace{ test.MakeTraceWithSpanCount(batches, spansPer, []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}), @@ -820,7 +820,7 @@ func BenchmarkTestsByRequestID(b *testing.B) { { ScopeSpans: blerg, }, - }, "test", spansPer*len(traces), 1000) + }, "test", spansPer*len(traces), 5) require.NoError(b, err) } } From a933707ef069db7fc6e8ebeaa2423cf30dc71e8a Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Fri, 22 Nov 2024 09:23:49 -0500 Subject: [PATCH 06/13] add metrics to capture truncated attributes count --- integration/e2e/limits_test.go | 1 - modules/distributor/distributor.go | 11 +++++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/integration/e2e/limits_test.go b/integration/e2e/limits_test.go index d351737a7fc..cc02ca80c22 100644 --- a/integration/e2e/limits_test.go +++ b/integration/e2e/limits_test.go @@ -40,7 +40,6 @@ const ( configLimitsQuery = "config-limits-query.yaml" configLimitsPartialError = "config-limits-partial-success.yaml" configLimits429 = "config-limits-429.yaml" - configLimitsSpanAttr = "config-limits-span-attr.yaml" ) func TestLimits(t *testing.T) { diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index 9531c14847d..7660cec4d44 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -54,8 +54,6 @@ const ( reasonInternalError = "internal_error" // reasonUnknown indicates a pushByte error at the ingester level not related to GRPC reasonUnknown = "unknown_error" - // reasonAttrTooLarge indicates that at least one attribute in the span is too large - reasonAttrTooLarge = "attribute_too_large" distributorRingKey = "distributor" ) @@ -115,6 +113,11 @@ var ( Name: "distributor_metrics_generator_clients", Help: "The current number of metrics-generator clients.", }) + metricAttributesTruncated = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Name: "distributor_attributes_truncated_total", + Help: "The total number of proto bytes received per tenant", + }, []string{"tenant"}) statBytesReceived = usagestats.NewCounter("distributor_bytes_received") statSpansReceived = usagestats.NewCounter("distributor_spans_received") @@ -390,6 +393,7 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te if truncatedAttributeCount > 0 { level.Warn(d.logger).Log("msg", fmt.Sprintf("truncated %d resource/span attributes when adding spans for tenant %s", truncatedAttributeCount, userID)) + metricAttributesTruncated.WithLabelValues(userID).Add(float64(truncatedAttributeCount)) } err = d.sendToIngestersViaBytes(ctx, userID, spanCount, rebatchedTraces, keys) @@ -547,6 +551,8 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, ma for _, ils := range b.ScopeSpans { for _, span := range ils.Spans { // check large spans for large attributes + fmt.Printf("span size %d", span.Size()) + fmt.Printf("maxSpanAttrSize %d", maxSpanAttrSize) if span.Size() > maxSpanAttrSize { processAttributes(span.Attributes, maxSpanAttrSize, &truncatedAttributeCount) } @@ -632,6 +638,7 @@ func processAttributes(attributes []*v1_common.KeyValue, maxAttrSize int, count switch value := attr.GetValue().Value.(type) { case *v1_common.AnyValue_StringValue: if len(value.StringValue) > maxAttrSize { + fmt.Printf("size: %d", len(value.StringValue)) value.StringValue = value.StringValue[:maxAttrSize] + "_truncated" *count++ } From 744a5930abfbde5ea0785b9aea8ef6ec0356f3df Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Fri, 22 Nov 2024 10:16:12 -0500 Subject: [PATCH 07/13] remove test logs --- modules/distributor/distributor.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index 7660cec4d44..a9258e48993 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -551,8 +551,6 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, ma for _, ils := range b.ScopeSpans { for _, span := range ils.Spans { // check large spans for large attributes - fmt.Printf("span size %d", span.Size()) - fmt.Printf("maxSpanAttrSize %d", maxSpanAttrSize) if span.Size() > maxSpanAttrSize { processAttributes(span.Attributes, maxSpanAttrSize, &truncatedAttributeCount) } @@ -638,7 +636,6 @@ func processAttributes(attributes []*v1_common.KeyValue, maxAttrSize int, count switch value := attr.GetValue().Value.(type) { case *v1_common.AnyValue_StringValue: if len(value.StringValue) > maxAttrSize { - fmt.Printf("size: %d", len(value.StringValue)) value.StringValue = value.StringValue[:maxAttrSize] + "_truncated" *count++ } From 8121e3e60c983da163a5604354cef51682d3ebe4 Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Mon, 25 Nov 2024 11:04:28 -0500 Subject: [PATCH 08/13] removing appending "_truncated" --- modules/distributor/distributor.go | 4 ++-- modules/distributor/distributor_test.go | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index a9258e48993..3039eb37ad7 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -629,14 +629,14 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, ma func processAttributes(attributes []*v1_common.KeyValue, maxAttrSize int, count *int) { for _, attr := range attributes { if len(attr.Key) > maxAttrSize { - attr.Key = attr.Key[:maxAttrSize] + "_truncated" + attr.Key = attr.Key[:maxAttrSize] *count++ } switch value := attr.GetValue().Value.(type) { case *v1_common.AnyValue_StringValue: if len(value.StringValue) > maxAttrSize { - value.StringValue = value.StringValue[:maxAttrSize] + "_truncated" + value.StringValue = value.StringValue[:maxAttrSize] *count++ } default: diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index c2b6ed436ec..7fc425f5265 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -770,10 +770,10 @@ func TestProcessAttributes(t *testing.T) { // find large resource attributes for _, attr := range resource.Resource.Attributes { if attr.Key == "long value" { - assert.Equal(t, longString[:maxAttrByte]+"_truncated", attr.Value.GetStringValue()) + assert.Equal(t, longString[:maxAttrByte], attr.Value.GetStringValue()) } if attr.Value.GetStringValue() == "long key" { - assert.Equal(t, longString[:maxAttrByte]+"_truncated", attr.Key) + assert.Equal(t, longString[:maxAttrByte], attr.Key) } } // find large span attributes @@ -781,10 +781,10 @@ func TestProcessAttributes(t *testing.T) { for _, span := range scope.Spans { for _, attr := range span.Attributes { if attr.Key == "long value" { - assert.Equal(t, longString[:maxAttrByte]+"_truncated", attr.Value.GetStringValue()) + assert.Equal(t, longString[:maxAttrByte], attr.Value.GetStringValue()) } if attr.Value.GetStringValue() == "long key" { - assert.Equal(t, longString[:maxAttrByte]+"_truncated", attr.Key) + assert.Equal(t, longString[:maxAttrByte], attr.Key) } } } @@ -813,6 +813,7 @@ func BenchmarkTestsByRequestID(b *testing.B) { } b.ResetTimer() + b.ReportAllocs() for i := 0; i < b.N; i++ { for _, blerg := range ils { From 928ccc4d56ddc849c0c7a03a1f5de79d5ab44ec6 Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Mon, 25 Nov 2024 11:54:32 -0500 Subject: [PATCH 09/13] lint --- modules/distributor/distributor_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index 7fc425f5265..11babaf84b0 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -792,7 +792,6 @@ func TestProcessAttributes(t *testing.T) { } } - } func BenchmarkTestsByRequestID(b *testing.B) { From f0b1ae8d3e64f2e65c631d84299ecaf88278cc4e Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Thu, 2 Jan 2025 12:58:28 -0600 Subject: [PATCH 10/13] fix comments --- docs/sources/tempo/configuration/_index.md | 1 + modules/distributor/distributor.go | 22 ++++++++++++++-------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/docs/sources/tempo/configuration/_index.md b/docs/sources/tempo/configuration/_index.md index 4c6148d6480..eb125fd9d8d 100644 --- a/docs/sources/tempo/configuration/_index.md +++ b/docs/sources/tempo/configuration/_index.md @@ -234,6 +234,7 @@ distributor: # Optional # Configures the max size an attribute can be. Any key or value that exceeds this limit will be truncated before storing + # Setting this parameter to '0' would disable this check against attribute size [max_span_attr_byte: | default = '2048'] # Optional. diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index 3039eb37ad7..a492a77ce9a 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -116,7 +116,7 @@ var ( metricAttributesTruncated = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", Name: "distributor_attributes_truncated_total", - Help: "The total number of proto bytes received per tenant", + Help: "The total number of attribute keys or values truncated per tenant", }, []string{"tenant"}) statBytesReceived = usagestats.NewCounter("distributor_bytes_received") @@ -544,15 +544,18 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, ma for _, b := range batches { spansByILS := make(map[uint32]*v1.ScopeSpans) // check for large resources for large attributes - if b.Resource.Size() > maxSpanAttrSize { - processAttributes(b.Resource.Attributes, maxSpanAttrSize, &truncatedAttributeCount) + if maxSpanAttrSize > 0 { + fmt.Println("checking size") + resourceAttrTruncatedCount := processAttributes(b.Resource.Attributes, maxSpanAttrSize) + truncatedAttributeCount += resourceAttrTruncatedCount } for _, ils := range b.ScopeSpans { for _, span := range ils.Spans { // check large spans for large attributes - if span.Size() > maxSpanAttrSize { - processAttributes(span.Attributes, maxSpanAttrSize, &truncatedAttributeCount) + if maxSpanAttrSize > 0 { + spanAttrTruncatedCount := processAttributes(span.Attributes, maxSpanAttrSize) + truncatedAttributeCount += spanAttrTruncatedCount } traceID := span.TraceId if !validation.ValidTraceID(traceID) { @@ -626,23 +629,26 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, ma } // find and truncate the span attributes that are too large -func processAttributes(attributes []*v1_common.KeyValue, maxAttrSize int, count *int) { +func processAttributes(attributes []*v1_common.KeyValue, maxAttrSize int) int { + count := 0 for _, attr := range attributes { if len(attr.Key) > maxAttrSize { attr.Key = attr.Key[:maxAttrSize] - *count++ + count++ } switch value := attr.GetValue().Value.(type) { case *v1_common.AnyValue_StringValue: if len(value.StringValue) > maxAttrSize { value.StringValue = value.StringValue[:maxAttrSize] - *count++ + count++ } default: continue } } + + return count } // discardedPredicate determines if a trace is discarded based on the number of successful replications. From 4b7198966dd52d034e8d4a1a4e60efb3a846feb4 Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Thu, 2 Jan 2025 13:21:38 -0600 Subject: [PATCH 11/13] manifest.md --- docs/sources/tempo/configuration/manifest.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/sources/tempo/configuration/manifest.md b/docs/sources/tempo/configuration/manifest.md index 49692dbb783..ed0e82bbb57 100644 --- a/docs/sources/tempo/configuration/manifest.md +++ b/docs/sources/tempo/configuration/manifest.md @@ -185,6 +185,7 @@ distributor: stale_duration: 15m0s extend_writes: true retry_after_on_resource_exhausted: 0s + max_span_attr_byte: 2048 ingester_client: pool_config: checkinterval: 15s From 9768d3ec3fad149f3a0d7d1c31ce9297c665a280 Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Thu, 2 Jan 2025 14:04:24 -0600 Subject: [PATCH 12/13] fix test --- modules/distributor/distributor.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index a492a77ce9a..b027dc5407d 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -544,8 +544,7 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, ma for _, b := range batches { spansByILS := make(map[uint32]*v1.ScopeSpans) // check for large resources for large attributes - if maxSpanAttrSize > 0 { - fmt.Println("checking size") + if maxSpanAttrSize > 0 && b.Resource != nil { resourceAttrTruncatedCount := processAttributes(b.Resource.Attributes, maxSpanAttrSize) truncatedAttributeCount += resourceAttrTruncatedCount } From 95b5253654571a0a41cbf940479e26376b86c04e Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Thu, 2 Jan 2025 15:59:28 -0600 Subject: [PATCH 13/13] remove log line --- modules/distributor/distributor.go | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index b027dc5407d..ce18d1240c0 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -392,7 +392,6 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te } if truncatedAttributeCount > 0 { - level.Warn(d.logger).Log("msg", fmt.Sprintf("truncated %d resource/span attributes when adding spans for tenant %s", truncatedAttributeCount, userID)) metricAttributesTruncated.WithLabelValues(userID).Add(float64(truncatedAttributeCount)) }