diff --git a/CHANGELOG.md b/CHANGELOG.md index 02dff10c61d..44f47e9683a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ Tempo serverless related metric `tempo_querier_external_endpoint_duration_seconds`, `tempo_querier_external_endpoint_hedged_roundtrips_total` and `tempo_feature_enabled` are being removed. * [CHANGE] **BREAKING CHANGE** Removed `internal_error` as a reason from `tempo_discarded_spans_total`. [#4554](https://github.com/grafana/tempo/pull/4554) (@joe-elliott) +* [CHANGE] **BREAKING CHANGE** Enforce max attribute size at event, link, and instrumentation scope. Make config per-tenant. + Renamed max_span_attr_byte to max_attribute_bytes + [#4633](https://github.com/grafana/tempo/pull/4633) (@ie-pham) * [ENHANCEMENT] Update minio to version [#4341](https://github.com/grafana/tempo/pull/4568) (@javiermolinar) * [ENHANCEMENT] Prevent queries in the ingester from blocking flushing traces to disk and memory spikes. [#4483](https://github.com/grafana/tempo/pull/4483) (@joe-elliott) * [ENHANCEMENT] Update tempo operational dashboard for new block-builder and v2 traces api [#4559](https://github.com/grafana/tempo/pull/4559) (@mdisibio) @@ -27,6 +30,7 @@ * [BUGFIX] TraceQL incorrect results for additional spanset filters after a select operation [#4600](https://github.com/grafana/tempo/pull/4600) (@mdisibio) * [BUGFIX] TraceQL results caching bug for floats ending in .0 [#4539](https://github.com/grafana/tempo/pull/4539) (@carles-grafana) * [BUGFIX] Fix starting consuming log [#4539](https://github.com/grafana/tempo/pull/46299) (@javiermolinar) + # v2.7.0 * [CHANGE] Disable gRPC compression in the querier and distributor for performance reasons [#4429](https://github.com/grafana/tempo/pull/4429) (@carles-grafana) diff --git a/docs/sources/tempo/configuration/_index.md b/docs/sources/tempo/configuration/_index.md index fa7125d59f7..b502e805442 100644 --- a/docs/sources/tempo/configuration/_index.md +++ b/docs/sources/tempo/configuration/_index.md @@ -237,7 +237,7 @@ distributor: # Optional # Configures the max size an attribute can be. Any key or value that exceeds this limit will be truncated before storing # Setting this parameter to '0' would disable this check against attribute size - [max_span_attr_byte: | default = '2048'] + [max_attribute_bytes: | default = '2048'] # Optional. # Configures usage trackers in the distributor which expose metrics of ingested traffic grouped by configurable @@ -260,7 +260,7 @@ This issue has been observed when trying to fetch a single trace using the [`tra While a trace might not have a lot of spans (roughly 500), it can have a larger size (approximately 250KB). Some of the spans in that trace had attributes whose values were very large in size. -To avoid these out-of-memory crashes, use `max_span_attr_byte` to limit the maximum allowable size of any individual attribute. +To avoid these out-of-memory crashes, use `max_attribute_bytes` to limit the maximum allowable size of any individual attribute. Any key or values that exceed the configured limit are truncated before storing. The default value is `2048`. @@ -1620,6 +1620,9 @@ overrides: # Should not be lower than RF. [tenant_shard_size: | default = 0] + # Maximum bytes any attribute can be for both keys and values. + [max_attribute_bytes: | default = 0] + # Read related overrides read: # Maximum size in bytes of a tag-values query. Tag-values query is used mainly diff --git a/docs/sources/tempo/configuration/manifest.md b/docs/sources/tempo/configuration/manifest.md index 878b406ef83..4b5c565a128 100644 --- a/docs/sources/tempo/configuration/manifest.md +++ b/docs/sources/tempo/configuration/manifest.md @@ -203,7 +203,7 @@ distributor: max_consumer_lag_at_startup: 0s extend_writes: true retry_after_on_resource_exhausted: 0s - max_span_attr_byte: 2048 + max_attribute_bytes: 2048 ingester_client: pool_config: checkinterval: 15s diff --git a/docs/sources/tempo/troubleshooting/out-of-memory-errors.md b/docs/sources/tempo/troubleshooting/out-of-memory-errors.md index 32ed71dfb40..6418373603c 100644 --- a/docs/sources/tempo/troubleshooting/out-of-memory-errors.md +++ b/docs/sources/tempo/troubleshooting/out-of-memory-errors.md @@ -14,7 +14,7 @@ Learn about out-of-memory (OOM) issues and how to troubleshoot them. Tempo queriers can run out of memory when fetching traces that have spans with very large attributes. This issue has been observed when trying to fetch a single trace using the [`tracebyID` endpoint](https://grafana.com/docs/tempo/latest/api_docs/#query). -To avoid these out-of-memory crashes, use `max_span_attr_byte` to limit the maximum allowable size of any individual attribute. +To avoid these out-of-memory crashes, use `max_attribute_bytes` to limit the maximum allowable size of any individual attribute. Any key or values that exceed the configured limit are truncated before storing. Use the `tempo_distributor_attributes_truncated_total` metric to track how many attributes are truncated. @@ -23,7 +23,7 @@ Use the `tempo_distributor_attributes_truncated_total` metric to track how many # Optional # Configures the max size an attribute can be. Any key or value that exceeds this limit will be truncated before storing # Setting this parameter to '0' would disable this check against attribute size - [max_span_attr_byte: | default = '2048'] + [max_attribute_bytes: | default = '2048'] ``` Refer to the [configuration for distributors](https://grafana.com/docs/tempo//configuration/#set-max-attribute-size-to-help-control-out-of-memory-errors) documentation for more information. @@ -95,7 +95,7 @@ When writing these attributes, they can spike the memory usage of the write comp * compactor * metrics-generator -You can [automatically limit attribute sizes](https://github.com/grafana/tempo/pull/4335) using [`max_span_attr_byte`]((https://grafana.com/docs/tempo//configuration/#set-max-attribute-size-to-help-control-out-of-memory-errors). +You can [automatically limit attribute sizes](https://github.com/grafana/tempo/pull/4335) using [`max_attribute_bytes`]((https://grafana.com/docs/tempo//configuration/#set-max-attribute-size-to-help-control-out-of-memory-errors). You can also use these options: * Manually update application instrumentation to remove or limit these attributes diff --git a/modules/distributor/config.go b/modules/distributor/config.go index 2a2cb988d1b..d14ffaf7767 100644 --- a/modules/distributor/config.go +++ b/modules/distributor/config.go @@ -57,7 +57,7 @@ type Config struct { // For testing. factory ring_client.PoolAddrFunc `yaml:"-"` - MaxSpanAttrByte int `yaml:"max_span_attr_byte"` + MaxAttributeBytes int `yaml:"max_attribute_bytes"` } type LogSpansConfig struct { @@ -81,7 +81,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.OverrideRingKey = distributorRingKey cfg.ExtendWrites = true - cfg.MaxSpanAttrByte = 2048 // 2KB + cfg.MaxAttributeBytes = 2048 // 2KB f.BoolVar(&cfg.LogReceivedSpans.Enabled, util.PrefixConfig(prefix, "log-received-spans.enabled"), false, "Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.") f.BoolVar(&cfg.LogReceivedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-received-spans.include-attributes"), false, "Enable to include span attributes in the logs.") diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index 6935bd54b7f..b87d39c9aad 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -439,7 +439,9 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te d.usage.Observe(userID, batches) } - keys, rebatchedTraces, truncatedAttributeCount, err := requestsByTraceID(batches, userID, spanCount, d.cfg.MaxSpanAttrByte) + maxAttributeBytes := d.getMaxAttributeBytes(userID) + + keys, rebatchedTraces, truncatedAttributeCount, err := requestsByTraceID(batches, userID, spanCount, maxAttributeBytes) if err != nil { logDiscardedResourceSpans(batches, userID, &d.cfg.LogDiscardedSpans, d.logger) return nil, err @@ -679,18 +681,36 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, ma for _, b := range batches { spansByILS := make(map[uint32]*v1.ScopeSpans) - // check for large resources for large attributes + // check resource for large attributes if maxSpanAttrSize > 0 && b.Resource != nil { resourceAttrTruncatedCount := processAttributes(b.Resource.Attributes, maxSpanAttrSize) truncatedAttributeCount += resourceAttrTruncatedCount } for _, ils := range b.ScopeSpans { + + // check instrumentation for large attributes + if maxSpanAttrSize > 0 && ils.Scope != nil { + scopeAttrTruncatedCount := processAttributes(ils.Scope.Attributes, maxSpanAttrSize) + truncatedAttributeCount += scopeAttrTruncatedCount + } + for _, span := range ils.Spans { - // check large spans for large attributes + // check spans for large attributes if maxSpanAttrSize > 0 { spanAttrTruncatedCount := processAttributes(span.Attributes, maxSpanAttrSize) truncatedAttributeCount += spanAttrTruncatedCount + + // check large attributes for events and links + for _, event := range span.Events { + eventAttrTruncatedCount := processAttributes(event.Attributes, maxSpanAttrSize) + truncatedAttributeCount += eventAttrTruncatedCount + } + + for _, link := range span.Links { + linkAttrTruncatedCount := processAttributes(link.Attributes, maxSpanAttrSize) + truncatedAttributeCount += linkAttrTruncatedCount + } } traceID := span.TraceId if !validation.ValidTraceID(traceID) { @@ -986,3 +1006,11 @@ func logSpan(s *v1.Span, allAttributes bool, logger log.Logger) { func startEndFromSpan(span *v1.Span) (uint32, uint32) { return uint32(span.StartTimeUnixNano / uint64(time.Second)), uint32(span.EndTimeUnixNano / uint64(time.Second)) } + +func (d *Distributor) getMaxAttributeBytes(userID string) int { + if tenantMaxAttrByte := d.overrides.IngestionMaxAttributeBytes(userID); tenantMaxAttrByte > 0 { + return tenantMaxAttrByte + } + + return d.cfg.MaxAttributeBytes +} diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index b4cdb839a11..0c2dbb5fd4c 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -763,8 +763,42 @@ func TestProcessAttributes(t *testing.T) { test.MakeAttribute(longString, "long key"), ) + // add long attributes to the event level + trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Events = append(trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Events, + &v1.Span_Event{ + TimeUnixNano: 0, + Attributes: []*v1_common.KeyValue{ + test.MakeAttribute("long value", longString), + test.MakeAttribute(longString, "long key"), + }, + }, + ) + + // add long attributes to the link level + trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Links = append(trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Links, + &v1.Span_Link{ + TraceId: []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}, + SpanId: []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F}, + Attributes: []*v1_common.KeyValue{ + test.MakeAttribute("long value", longString), + test.MakeAttribute(longString, "long key"), + }, + }, + ) + + // add long attributes to scope level + trace.ResourceSpans[0].ScopeSpans[0].Scope = &v1_common.InstrumentationScope{ + Name: "scope scope", + Version: "1.0", + Attributes: []*v1_common.KeyValue{ + test.MakeAttribute("long value", longString), + test.MakeAttribute(longString, "long key"), + }, + } + _, rebatchedTrace, truncatedCount, _ := requestsByTraceID(trace.ResourceSpans, "test", spanCount*batchCount, maxAttrByte) - assert.Equal(t, 4, truncatedCount) + // 2 at resource level, 2 at span level, 2 at event level, 2 at link level, 2 at scope level + assert.Equal(t, 10, truncatedCount) for _, rT := range rebatchedTrace { for _, resource := range rT.trace.ResourceSpans { // find large resource attributes @@ -778,6 +812,15 @@ func TestProcessAttributes(t *testing.T) { } // find large span attributes for _, scope := range resource.ScopeSpans { + for _, attr := range scope.Scope.Attributes { + if attr.Key == "long value" { + assert.Equal(t, longString[:maxAttrByte], attr.Value.GetStringValue()) + } + if attr.Value.GetStringValue() == "long key" { + assert.Equal(t, longString[:maxAttrByte], attr.Key) + } + } + for _, span := range scope.Spans { for _, attr := range span.Attributes { if attr.Key == "long value" { @@ -787,6 +830,29 @@ func TestProcessAttributes(t *testing.T) { assert.Equal(t, longString[:maxAttrByte], attr.Key) } } + // events + for _, event := range span.Events { + for _, attr := range event.Attributes { + if attr.Key == "long value" { + assert.Equal(t, longString[:maxAttrByte], attr.Value.GetStringValue()) + } + if attr.Value.GetStringValue() == "long key" { + assert.Equal(t, longString[:maxAttrByte], attr.Key) + } + } + } + + // links + for _, link := range span.Links { + for _, attr := range link.Attributes { + if attr.Key == "long value" { + assert.Equal(t, longString[:maxAttrByte], attr.Value.GetStringValue()) + } + if attr.Value.GetStringValue() == "long key" { + assert.Equal(t, longString[:maxAttrByte], attr.Key) + } + } + } } } @@ -1687,7 +1753,7 @@ func prepare(t *testing.T, limits overrides.Config, logger kitlog.Logger) (*Dist }) } - distributorConfig.MaxSpanAttrByte = 1000 + distributorConfig.MaxAttributeBytes = 1000 distributorConfig.DistributorRing.HeartbeatPeriod = 100 * time.Millisecond distributorConfig.DistributorRing.InstanceID = strconv.Itoa(rand.Int()) distributorConfig.DistributorRing.KVStore.Mock = nil diff --git a/modules/overrides/config.go b/modules/overrides/config.go index c973eb1b628..f5f851b4206 100644 --- a/modules/overrides/config.go +++ b/modules/overrides/config.go @@ -77,6 +77,8 @@ type IngestionOverrides struct { MaxGlobalTracesPerUser int `yaml:"max_global_traces_per_user,omitempty" json:"max_global_traces_per_user,omitempty"` TenantShardSize int `yaml:"tenant_shard_size,omitempty" json:"tenant_shard_size,omitempty"` + + MaxAttributeBytes int `yaml:"max_attribute_bytes,omitempty" json:"max_attribute_bytes,omitempty"` } type ForwarderOverrides struct { diff --git a/modules/overrides/config_legacy.go b/modules/overrides/config_legacy.go index 6516e2817e7..fec514d2ad5 100644 --- a/modules/overrides/config_legacy.go +++ b/modules/overrides/config_legacy.go @@ -14,12 +14,13 @@ import ( func (c *Overrides) toLegacy() LegacyOverrides { return LegacyOverrides{ - IngestionRateStrategy: c.Ingestion.RateStrategy, - IngestionRateLimitBytes: c.Ingestion.RateLimitBytes, - IngestionBurstSizeBytes: c.Ingestion.BurstSizeBytes, - IngestionTenantShardSize: c.Ingestion.TenantShardSize, - MaxLocalTracesPerUser: c.Ingestion.MaxLocalTracesPerUser, - MaxGlobalTracesPerUser: c.Ingestion.MaxGlobalTracesPerUser, + IngestionRateStrategy: c.Ingestion.RateStrategy, + IngestionRateLimitBytes: c.Ingestion.RateLimitBytes, + IngestionBurstSizeBytes: c.Ingestion.BurstSizeBytes, + IngestionTenantShardSize: c.Ingestion.TenantShardSize, + MaxLocalTracesPerUser: c.Ingestion.MaxLocalTracesPerUser, + MaxGlobalTracesPerUser: c.Ingestion.MaxGlobalTracesPerUser, + IngestionMaxAttributeBytes: c.Ingestion.MaxAttributeBytes, Forwarders: c.Forwarders, @@ -72,10 +73,11 @@ func (c *Overrides) toLegacy() LegacyOverrides { // limits via flags, or per-user limits via yaml config. type LegacyOverrides struct { // Distributor enforced limits. - IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"` - IngestionRateLimitBytes int `yaml:"ingestion_rate_limit_bytes" json:"ingestion_rate_limit_bytes"` - IngestionBurstSizeBytes int `yaml:"ingestion_burst_size_bytes" json:"ingestion_burst_size_bytes"` - IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"` + IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"` + IngestionRateLimitBytes int `yaml:"ingestion_rate_limit_bytes" json:"ingestion_rate_limit_bytes"` + IngestionBurstSizeBytes int `yaml:"ingestion_burst_size_bytes" json:"ingestion_burst_size_bytes"` + IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"` + IngestionMaxAttributeBytes int `yaml:"ingestion_max_attribute_bytes" json:"ingestion_max_attribute_bytes"` // Ingester enforced limits. MaxLocalTracesPerUser int `yaml:"max_traces_per_user" json:"max_traces_per_user"` @@ -149,6 +151,7 @@ func (l *LegacyOverrides) toNewLimits() Overrides { MaxLocalTracesPerUser: l.MaxLocalTracesPerUser, MaxGlobalTracesPerUser: l.MaxGlobalTracesPerUser, TenantShardSize: l.IngestionTenantShardSize, + MaxAttributeBytes: l.IngestionMaxAttributeBytes, }, Read: ReadOverrides{ MaxBytesPerTagValuesQuery: l.MaxBytesPerTagValuesQuery, diff --git a/modules/overrides/config_test.go b/modules/overrides/config_test.go index 5aef3950206..a47b075641a 100644 --- a/modules/overrides/config_test.go +++ b/modules/overrides/config_test.go @@ -44,6 +44,7 @@ ingestion_rate_strategy: global ingestion_rate_limit_bytes: 100_000 ingestion_burst_size_bytes: 100_000 ingestion_tenant_shard_size: 3 +ingestion_max_attribute_bytes: 1_000 max_traces_per_user: 1000 max_global_traces_per_user: 1000 @@ -66,6 +67,7 @@ max_search_duration: 5m "ingestion_rate_limit_bytes": 100000, "ingestion_burst_size_bytes": 100000, "ingestion_tenant_shard_size": 3, + "ingestion_max_attribute_bytes": 1000, "max_traces_per_user": 1000, "max_global_traces_per_user": 1000, @@ -100,6 +102,7 @@ ingestion_rate_strategy: local ingestion_rate_limit_bytes: 12345 ingestion_burst_size_bytes: 67890 ingestion_tenant_shard_size: 3 +ingestion_max_attribute_bytes: 1000 max_traces_per_user: 1 max_global_traces_per_user: 2 forwarders: ['foo'] @@ -166,6 +169,7 @@ defaults: max_traces_per_user: 1 max_global_traces_per_user: 2 tenant_shard_size: 3 + max_attribute_bytes: 1000 read: max_bytes_per_tag_values_query: 15 max_blocks_per_tag_values_query: 16 diff --git a/modules/overrides/interface.go b/modules/overrides/interface.go index 79797c42076..a3752cfdbc4 100644 --- a/modules/overrides/interface.go +++ b/modules/overrides/interface.go @@ -40,6 +40,7 @@ type Interface interface { IngestionRateLimitBytes(userID string) float64 IngestionBurstSizeBytes(userID string) int IngestionTenantShardSize(userID string) int + IngestionMaxAttributeBytes(userID string) int MetricsGeneratorIngestionSlack(userID string) time.Duration MetricsGeneratorRingSize(userID string) int MetricsGeneratorProcessors(userID string) map[string]struct{} diff --git a/modules/overrides/runtime_config_overrides.go b/modules/overrides/runtime_config_overrides.go index b4fe43bcb9a..c2c615e1023 100644 --- a/modules/overrides/runtime_config_overrides.go +++ b/modules/overrides/runtime_config_overrides.go @@ -326,6 +326,10 @@ func (o *runtimeConfigOverridesManager) IngestionTenantShardSize(userID string) return o.getOverridesForUser(userID).Ingestion.TenantShardSize } +func (o *runtimeConfigOverridesManager) IngestionMaxAttributeBytes(userID string) int { + return o.getOverridesForUser(userID).Ingestion.MaxAttributeBytes +} + // MaxBytesPerTrace returns the maximum size of a single trace in bytes allowed for a user. func (o *runtimeConfigOverridesManager) MaxBytesPerTrace(userID string) int { return o.getOverridesForUser(userID).Global.MaxBytesPerTrace