Skip to content

Commit

Permalink
switch max attribute bytes to per tenant and also check event links a… (
Browse files Browse the repository at this point in the history
#4633)

* switch max attribute bytes to per tenant and also check event links and instrumentation scopes

* changelog and lint

* remove test log lines

* add breaking change label

* lint

* add test, make func part of struct
  • Loading branch information
ie-pham authored Jan 29, 2025
1 parent 27a98c4 commit d544ec1
Show file tree
Hide file tree
Showing 12 changed files with 138 additions and 23 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
Tempo serverless related metric `tempo_querier_external_endpoint_duration_seconds`, `tempo_querier_external_endpoint_hedged_roundtrips_total` and `tempo_feature_enabled` are being removed.

* [CHANGE] **BREAKING CHANGE** Removed `internal_error` as a reason from `tempo_discarded_spans_total`. [#4554](https://github.com/grafana/tempo/pull/4554) (@joe-elliott)
* [CHANGE] **BREAKING CHANGE** Enforce max attribute size at event, link, and instrumentation scope. Make config per-tenant.
Renamed max_span_attr_byte to max_attribute_bytes
[#4633](https://github.com/grafana/tempo/pull/4633) (@ie-pham)
* [ENHANCEMENT] Update minio to version [#4341](https://github.com/grafana/tempo/pull/4568) (@javiermolinar)
* [ENHANCEMENT] Prevent queries in the ingester from blocking flushing traces to disk and memory spikes. [#4483](https://github.com/grafana/tempo/pull/4483) (@joe-elliott)
* [ENHANCEMENT] Update tempo operational dashboard for new block-builder and v2 traces api [#4559](https://github.com/grafana/tempo/pull/4559) (@mdisibio)
Expand All @@ -27,6 +30,7 @@
* [BUGFIX] TraceQL incorrect results for additional spanset filters after a select operation [#4600](https://github.com/grafana/tempo/pull/4600) (@mdisibio)
* [BUGFIX] TraceQL results caching bug for floats ending in .0 [#4539](https://github.com/grafana/tempo/pull/4539) (@carles-grafana)
* [BUGFIX] Fix starting consuming log [#4539](https://github.com/grafana/tempo/pull/46299) (@javiermolinar)

# v2.7.0

* [CHANGE] Disable gRPC compression in the querier and distributor for performance reasons [#4429](https://github.com/grafana/tempo/pull/4429) (@carles-grafana)
Expand Down
7 changes: 5 additions & 2 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ distributor:
# Optional
# Configures the max size an attribute can be. Any key or value that exceeds this limit will be truncated before storing
# Setting this parameter to '0' would disable this check against attribute size
[max_span_attr_byte: <int> | default = '2048']
[max_attribute_bytes: <int> | default = '2048']

# Optional.
# Configures usage trackers in the distributor which expose metrics of ingested traffic grouped by configurable
Expand All @@ -260,7 +260,7 @@ This issue has been observed when trying to fetch a single trace using the [`tra
While a trace might not have a lot of spans (roughly 500), it can have a larger size (approximately 250KB).
Some of the spans in that trace had attributes whose values were very large in size.

To avoid these out-of-memory crashes, use `max_span_attr_byte` to limit the maximum allowable size of any individual attribute.
To avoid these out-of-memory crashes, use `max_attribute_bytes` to limit the maximum allowable size of any individual attribute.
Any key or values that exceed the configured limit are truncated before storing.
The default value is `2048`.

Expand Down Expand Up @@ -1620,6 +1620,9 @@ overrides:
# Should not be lower than RF.
[tenant_shard_size: <int> | default = 0]
# Maximum bytes any attribute can be for both keys and values.
[max_attribute_bytes: <int> | default = 0]
# Read related overrides
read:
# Maximum size in bytes of a tag-values query. Tag-values query is used mainly
Expand Down
2 changes: 1 addition & 1 deletion docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ distributor:
max_consumer_lag_at_startup: 0s
extend_writes: true
retry_after_on_resource_exhausted: 0s
max_span_attr_byte: 2048
max_attribute_bytes: 2048
ingester_client:
pool_config:
checkinterval: 15s
Expand Down
6 changes: 3 additions & 3 deletions docs/sources/tempo/troubleshooting/out-of-memory-errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Learn about out-of-memory (OOM) issues and how to troubleshoot them.
Tempo queriers can run out of memory when fetching traces that have spans with very large attributes.
This issue has been observed when trying to fetch a single trace using the [`tracebyID` endpoint](https://grafana.com/docs/tempo/latest/api_docs/#query).

To avoid these out-of-memory crashes, use `max_span_attr_byte` to limit the maximum allowable size of any individual attribute.
To avoid these out-of-memory crashes, use `max_attribute_bytes` to limit the maximum allowable size of any individual attribute.
Any key or values that exceed the configured limit are truncated before storing.

Use the `tempo_distributor_attributes_truncated_total` metric to track how many attributes are truncated.
Expand All @@ -23,7 +23,7 @@ Use the `tempo_distributor_attributes_truncated_total` metric to track how many
# Optional
# Configures the max size an attribute can be. Any key or value that exceeds this limit will be truncated before storing
# Setting this parameter to '0' would disable this check against attribute size
[max_span_attr_byte: <int> | default = '2048']
[max_attribute_bytes: <int> | default = '2048']
```
Refer to the [configuration for distributors](https://grafana.com/docs/tempo/<TEMPO_VERSION>/configuration/#set-max-attribute-size-to-help-control-out-of-memory-errors) documentation for more information.
Expand Down Expand Up @@ -95,7 +95,7 @@ When writing these attributes, they can spike the memory usage of the write comp
* compactor
* metrics-generator
You can [automatically limit attribute sizes](https://github.com/grafana/tempo/pull/4335) using [`max_span_attr_byte`]((https://grafana.com/docs/tempo/<TEMPO_VERSION>/configuration/#set-max-attribute-size-to-help-control-out-of-memory-errors).
You can [automatically limit attribute sizes](https://github.com/grafana/tempo/pull/4335) using [`max_attribute_bytes`]((https://grafana.com/docs/tempo/<TEMPO_VERSION>/configuration/#set-max-attribute-size-to-help-control-out-of-memory-errors).
You can also use these options:

* Manually update application instrumentation to remove or limit these attributes
Expand Down
4 changes: 2 additions & 2 deletions modules/distributor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type Config struct {
// For testing.
factory ring_client.PoolAddrFunc `yaml:"-"`

MaxSpanAttrByte int `yaml:"max_span_attr_byte"`
MaxAttributeBytes int `yaml:"max_attribute_bytes"`
}

type LogSpansConfig struct {
Expand All @@ -81,7 +81,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.OverrideRingKey = distributorRingKey
cfg.ExtendWrites = true

cfg.MaxSpanAttrByte = 2048 // 2KB
cfg.MaxAttributeBytes = 2048 // 2KB

f.BoolVar(&cfg.LogReceivedSpans.Enabled, util.PrefixConfig(prefix, "log-received-spans.enabled"), false, "Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.")
f.BoolVar(&cfg.LogReceivedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-received-spans.include-attributes"), false, "Enable to include span attributes in the logs.")
Expand Down
34 changes: 31 additions & 3 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,9 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te
d.usage.Observe(userID, batches)
}

keys, rebatchedTraces, truncatedAttributeCount, err := requestsByTraceID(batches, userID, spanCount, d.cfg.MaxSpanAttrByte)
maxAttributeBytes := d.getMaxAttributeBytes(userID)

keys, rebatchedTraces, truncatedAttributeCount, err := requestsByTraceID(batches, userID, spanCount, maxAttributeBytes)
if err != nil {
logDiscardedResourceSpans(batches, userID, &d.cfg.LogDiscardedSpans, d.logger)
return nil, err
Expand Down Expand Up @@ -679,18 +681,36 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount, ma

for _, b := range batches {
spansByILS := make(map[uint32]*v1.ScopeSpans)
// check for large resources for large attributes
// check resource for large attributes
if maxSpanAttrSize > 0 && b.Resource != nil {
resourceAttrTruncatedCount := processAttributes(b.Resource.Attributes, maxSpanAttrSize)
truncatedAttributeCount += resourceAttrTruncatedCount
}

for _, ils := range b.ScopeSpans {

// check instrumentation for large attributes
if maxSpanAttrSize > 0 && ils.Scope != nil {
scopeAttrTruncatedCount := processAttributes(ils.Scope.Attributes, maxSpanAttrSize)
truncatedAttributeCount += scopeAttrTruncatedCount
}

for _, span := range ils.Spans {
// check large spans for large attributes
// check spans for large attributes
if maxSpanAttrSize > 0 {
spanAttrTruncatedCount := processAttributes(span.Attributes, maxSpanAttrSize)
truncatedAttributeCount += spanAttrTruncatedCount

// check large attributes for events and links
for _, event := range span.Events {
eventAttrTruncatedCount := processAttributes(event.Attributes, maxSpanAttrSize)
truncatedAttributeCount += eventAttrTruncatedCount
}

for _, link := range span.Links {
linkAttrTruncatedCount := processAttributes(link.Attributes, maxSpanAttrSize)
truncatedAttributeCount += linkAttrTruncatedCount
}
}
traceID := span.TraceId
if !validation.ValidTraceID(traceID) {
Expand Down Expand Up @@ -986,3 +1006,11 @@ func logSpan(s *v1.Span, allAttributes bool, logger log.Logger) {
func startEndFromSpan(span *v1.Span) (uint32, uint32) {
return uint32(span.StartTimeUnixNano / uint64(time.Second)), uint32(span.EndTimeUnixNano / uint64(time.Second))
}

func (d *Distributor) getMaxAttributeBytes(userID string) int {
if tenantMaxAttrByte := d.overrides.IngestionMaxAttributeBytes(userID); tenantMaxAttrByte > 0 {
return tenantMaxAttrByte
}

return d.cfg.MaxAttributeBytes
}
70 changes: 68 additions & 2 deletions modules/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,8 +763,42 @@ func TestProcessAttributes(t *testing.T) {
test.MakeAttribute(longString, "long key"),
)

// add long attributes to the event level
trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Events = append(trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Events,
&v1.Span_Event{
TimeUnixNano: 0,
Attributes: []*v1_common.KeyValue{
test.MakeAttribute("long value", longString),
test.MakeAttribute(longString, "long key"),
},
},
)

// add long attributes to the link level
trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Links = append(trace.ResourceSpans[0].ScopeSpans[0].Spans[0].Links,
&v1.Span_Link{
TraceId: []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F},
SpanId: []byte{0x0A, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F},
Attributes: []*v1_common.KeyValue{
test.MakeAttribute("long value", longString),
test.MakeAttribute(longString, "long key"),
},
},
)

// add long attributes to scope level
trace.ResourceSpans[0].ScopeSpans[0].Scope = &v1_common.InstrumentationScope{
Name: "scope scope",
Version: "1.0",
Attributes: []*v1_common.KeyValue{
test.MakeAttribute("long value", longString),
test.MakeAttribute(longString, "long key"),
},
}

_, rebatchedTrace, truncatedCount, _ := requestsByTraceID(trace.ResourceSpans, "test", spanCount*batchCount, maxAttrByte)
assert.Equal(t, 4, truncatedCount)
// 2 at resource level, 2 at span level, 2 at event level, 2 at link level, 2 at scope level
assert.Equal(t, 10, truncatedCount)
for _, rT := range rebatchedTrace {
for _, resource := range rT.trace.ResourceSpans {
// find large resource attributes
Expand All @@ -778,6 +812,15 @@ func TestProcessAttributes(t *testing.T) {
}
// find large span attributes
for _, scope := range resource.ScopeSpans {
for _, attr := range scope.Scope.Attributes {
if attr.Key == "long value" {
assert.Equal(t, longString[:maxAttrByte], attr.Value.GetStringValue())
}
if attr.Value.GetStringValue() == "long key" {
assert.Equal(t, longString[:maxAttrByte], attr.Key)
}
}

for _, span := range scope.Spans {
for _, attr := range span.Attributes {
if attr.Key == "long value" {
Expand All @@ -787,6 +830,29 @@ func TestProcessAttributes(t *testing.T) {
assert.Equal(t, longString[:maxAttrByte], attr.Key)
}
}
// events
for _, event := range span.Events {
for _, attr := range event.Attributes {
if attr.Key == "long value" {
assert.Equal(t, longString[:maxAttrByte], attr.Value.GetStringValue())
}
if attr.Value.GetStringValue() == "long key" {
assert.Equal(t, longString[:maxAttrByte], attr.Key)
}
}
}

// links
for _, link := range span.Links {
for _, attr := range link.Attributes {
if attr.Key == "long value" {
assert.Equal(t, longString[:maxAttrByte], attr.Value.GetStringValue())
}
if attr.Value.GetStringValue() == "long key" {
assert.Equal(t, longString[:maxAttrByte], attr.Key)
}
}
}
}
}

Expand Down Expand Up @@ -1687,7 +1753,7 @@ func prepare(t *testing.T, limits overrides.Config, logger kitlog.Logger) (*Dist
})
}

distributorConfig.MaxSpanAttrByte = 1000
distributorConfig.MaxAttributeBytes = 1000
distributorConfig.DistributorRing.HeartbeatPeriod = 100 * time.Millisecond
distributorConfig.DistributorRing.InstanceID = strconv.Itoa(rand.Int())
distributorConfig.DistributorRing.KVStore.Mock = nil
Expand Down
2 changes: 2 additions & 0 deletions modules/overrides/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type IngestionOverrides struct {
MaxGlobalTracesPerUser int `yaml:"max_global_traces_per_user,omitempty" json:"max_global_traces_per_user,omitempty"`

TenantShardSize int `yaml:"tenant_shard_size,omitempty" json:"tenant_shard_size,omitempty"`

MaxAttributeBytes int `yaml:"max_attribute_bytes,omitempty" json:"max_attribute_bytes,omitempty"`
}

type ForwarderOverrides struct {
Expand Down
23 changes: 13 additions & 10 deletions modules/overrides/config_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ import (

func (c *Overrides) toLegacy() LegacyOverrides {
return LegacyOverrides{
IngestionRateStrategy: c.Ingestion.RateStrategy,
IngestionRateLimitBytes: c.Ingestion.RateLimitBytes,
IngestionBurstSizeBytes: c.Ingestion.BurstSizeBytes,
IngestionTenantShardSize: c.Ingestion.TenantShardSize,
MaxLocalTracesPerUser: c.Ingestion.MaxLocalTracesPerUser,
MaxGlobalTracesPerUser: c.Ingestion.MaxGlobalTracesPerUser,
IngestionRateStrategy: c.Ingestion.RateStrategy,
IngestionRateLimitBytes: c.Ingestion.RateLimitBytes,
IngestionBurstSizeBytes: c.Ingestion.BurstSizeBytes,
IngestionTenantShardSize: c.Ingestion.TenantShardSize,
MaxLocalTracesPerUser: c.Ingestion.MaxLocalTracesPerUser,
MaxGlobalTracesPerUser: c.Ingestion.MaxGlobalTracesPerUser,
IngestionMaxAttributeBytes: c.Ingestion.MaxAttributeBytes,

Forwarders: c.Forwarders,

Expand Down Expand Up @@ -72,10 +73,11 @@ func (c *Overrides) toLegacy() LegacyOverrides {
// limits via flags, or per-user limits via yaml config.
type LegacyOverrides struct {
// Distributor enforced limits.
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
IngestionRateLimitBytes int `yaml:"ingestion_rate_limit_bytes" json:"ingestion_rate_limit_bytes"`
IngestionBurstSizeBytes int `yaml:"ingestion_burst_size_bytes" json:"ingestion_burst_size_bytes"`
IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"`
IngestionRateStrategy string `yaml:"ingestion_rate_strategy" json:"ingestion_rate_strategy"`
IngestionRateLimitBytes int `yaml:"ingestion_rate_limit_bytes" json:"ingestion_rate_limit_bytes"`
IngestionBurstSizeBytes int `yaml:"ingestion_burst_size_bytes" json:"ingestion_burst_size_bytes"`
IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"`
IngestionMaxAttributeBytes int `yaml:"ingestion_max_attribute_bytes" json:"ingestion_max_attribute_bytes"`

// Ingester enforced limits.
MaxLocalTracesPerUser int `yaml:"max_traces_per_user" json:"max_traces_per_user"`
Expand Down Expand Up @@ -149,6 +151,7 @@ func (l *LegacyOverrides) toNewLimits() Overrides {
MaxLocalTracesPerUser: l.MaxLocalTracesPerUser,
MaxGlobalTracesPerUser: l.MaxGlobalTracesPerUser,
TenantShardSize: l.IngestionTenantShardSize,
MaxAttributeBytes: l.IngestionMaxAttributeBytes,
},
Read: ReadOverrides{
MaxBytesPerTagValuesQuery: l.MaxBytesPerTagValuesQuery,
Expand Down
4 changes: 4 additions & 0 deletions modules/overrides/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ ingestion_rate_strategy: global
ingestion_rate_limit_bytes: 100_000
ingestion_burst_size_bytes: 100_000
ingestion_tenant_shard_size: 3
ingestion_max_attribute_bytes: 1_000
max_traces_per_user: 1000
max_global_traces_per_user: 1000
Expand All @@ -66,6 +67,7 @@ max_search_duration: 5m
"ingestion_rate_limit_bytes": 100000,
"ingestion_burst_size_bytes": 100000,
"ingestion_tenant_shard_size": 3,
"ingestion_max_attribute_bytes": 1000,
"max_traces_per_user": 1000,
"max_global_traces_per_user": 1000,
Expand Down Expand Up @@ -100,6 +102,7 @@ ingestion_rate_strategy: local
ingestion_rate_limit_bytes: 12345
ingestion_burst_size_bytes: 67890
ingestion_tenant_shard_size: 3
ingestion_max_attribute_bytes: 1000
max_traces_per_user: 1
max_global_traces_per_user: 2
forwarders: ['foo']
Expand Down Expand Up @@ -166,6 +169,7 @@ defaults:
max_traces_per_user: 1
max_global_traces_per_user: 2
tenant_shard_size: 3
max_attribute_bytes: 1000
read:
max_bytes_per_tag_values_query: 15
max_blocks_per_tag_values_query: 16
Expand Down
1 change: 1 addition & 0 deletions modules/overrides/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Interface interface {
IngestionRateLimitBytes(userID string) float64
IngestionBurstSizeBytes(userID string) int
IngestionTenantShardSize(userID string) int
IngestionMaxAttributeBytes(userID string) int
MetricsGeneratorIngestionSlack(userID string) time.Duration
MetricsGeneratorRingSize(userID string) int
MetricsGeneratorProcessors(userID string) map[string]struct{}
Expand Down
4 changes: 4 additions & 0 deletions modules/overrides/runtime_config_overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ func (o *runtimeConfigOverridesManager) IngestionTenantShardSize(userID string)
return o.getOverridesForUser(userID).Ingestion.TenantShardSize
}

func (o *runtimeConfigOverridesManager) IngestionMaxAttributeBytes(userID string) int {
return o.getOverridesForUser(userID).Ingestion.MaxAttributeBytes
}

// MaxBytesPerTrace returns the maximum size of a single trace in bytes allowed for a user.
func (o *runtimeConfigOverridesManager) MaxBytesPerTrace(userID string) int {
return o.getOverridesForUser(userID).Global.MaxBytesPerTrace
Expand Down

0 comments on commit d544ec1

Please sign in to comment.