Skip to content

Commit

Permalink
add a LogDiscardedSpansConfig to the distributor to log discarded spa…
Browse files Browse the repository at this point in the history
…ns causing issues.

```yaml
distributor:
  config:
    log_discarded_spans:
      enabled: true
    log_received_spans:
      # ...

```

Relates to grafana#3957.

Signed-off-by: Daniel Strobusch <[email protected]>
  • Loading branch information
dastrobu committed Aug 21, 2024
1 parent 7961fb7 commit 5ed499e
Show file tree
Hide file tree
Showing 10 changed files with 359 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [CHANGE] Azure v2 backend becomes the only and primary Azure backend [#3875](https://github.com/grafana/tempo/pull/3875) (@zalegrala)
**BREAKING CHANGE** The `use_v2_sdk` configuration option has been removed.
* [CHANGE] BlockMeta improvements to reduce the size [#3950](https://github.com/grafana/tempo/pull/3950) [#3951](https://github.com/grafana/tempo/pull/3951) [#3952](https://github.com/grafana/tempo/pull/3952)(@zalegrala)
* [FEATURE] Discarded span logging `log_discarded_spans` [#3957](https://github.com/grafana/tempo/issues/3957) (@dastrobu)
* [FEATURE] TraceQL support for link scope and link:traceID and link:spanID [#3741](https://github.com/grafana/tempo/pull/3741) (@stoewer)
* [FEATURE] TraceQL support for link attribute querying [#3814](https://github.com/grafana/tempo/pull/3814) (@ie-pham)
* [FEATURE] TraceQL support for event scope and event:name intrinsic [#3708](https://github.com/grafana/tempo/pull/3708) (@stoewer)
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ var (
Explain: fmt.Sprintf("default=%d", tempodb.DefaultBlocklistPollConcurrency),
}
warnLogReceivedTraces = ConfigWarning{
Message: "Span logging is enabled. This is for debuging only and not recommended for production deployments.",
Message: "Span logging is enabled. This is for debugging only and not recommended for production deployments.",
}
warnStorageTraceBackendLocal = ConfigWarning{
Message: "Local backend will not correctly retrieve traces with a distributed deployment unless all components have access to the same disk. You should probably be using object storage as a backend.",
Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo/app/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestConfig_CheckConfig(t *testing.T) {
},
},
Distributor: distributor.Config{
LogReceivedSpans: distributor.LogReceivedSpansConfig{
LogReceivedSpans: distributor.LogSpansConfig{
Enabled: true,
},
},
Expand Down
9 changes: 8 additions & 1 deletion docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,19 @@ distributor:


# Optional.
# Enable to log every received span to help debug ingestion or calculate span error distributions using the logs
# Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.
# This is not recommended for production environments
log_received_spans:
[enabled: <boolean> | default = false]
[include_all_attributes: <boolean> | default = false]
[filter_by_status_error: <boolean> | default = false]

# Optional.
# Enable to log every discarded span to help debug ingestion or calculate span error distributions using the logs.
log_discarded_spans:
[enabled: <boolean> | default = false]
[include_all_attributes: <boolean> | default = false]
[filter_by_status_error: <boolean> | default = false]

# Optional.
# Enable to metric every received span to help debug ingestion
Expand Down
31 changes: 31 additions & 0 deletions docs/sources/tempo/troubleshooting/max-trace-limit-reached.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@ aliases:

The two most likely causes of refused spans are unhealthy ingesters or trace limits being exceeded.

To log spans that are discarded, add the `--distributor.log_discarded_spans.enabled` flag to the distributor or
adjust the [distributor config]({{< relref "../configuration#distributor" >}}):

```yaml
distributor:
log_discarded_spans:
enabled: true
include_all_attributes: false # set to true for more verbose logs
```
Adding the flag logs all discarded spans, as shown below:
```
level=info ts=2024-08-19T16:06:25.880684385Z caller=distributor.go:767 msg=discarded spanid=c2ebe710d2e2ce7a traceid=bd63605778e3dbe935b05e6afd291006
level=info ts=2024-08-19T16:06:25.881169385Z caller=distributor.go:767 msg=discarded spanid=5352b0cb176679c8 traceid=ba41cae5089c9284e18bca08fbf10ca2
```

## Unhealthy ingesters

Unhealthy ingesters can be caused by failing OOMs or scale down events.
Expand Down Expand Up @@ -41,3 +58,17 @@ tempo_discarded_spans_total
```

In this case, use available configuration options to [increase limits]({{< relref "../configuration#ingestion-limits" >}}).

## Client resets connection

When the client resets the connection before the distributor can consume the trace data, you see logs like this:

```
msg="pusher failed to consume trace data" err="context canceled"
```

This issue needs to be fixed on the client side. To inspect which clients are causing the issue, logging discarded spans
with `include_all_attributes: true` may help.

Note that there may be other reasons for a closed context as well. Identifying the reason for a closed context is
not straightforward and may require additional debugging.
2 changes: 2 additions & 0 deletions example/helm/microservices-tempo-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ distributor:
config:
log_received_spans:
enabled: true
log_discarded_spans:
enabled: true
2 changes: 2 additions & 0 deletions integration/e2e/deployments/config-all-in-one-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ distributor:
zipkin:
log_received_spans:
enabled: true
log_discarded_spans:
enabled: true

ingester:
lifecycler:
Expand Down
9 changes: 7 additions & 2 deletions modules/distributor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ type Config struct {
// otel collector: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver
Receivers map[string]interface{} `yaml:"receivers"`
OverrideRingKey string `yaml:"override_ring_key"`
LogReceivedSpans LogReceivedSpansConfig `yaml:"log_received_spans,omitempty"`
LogReceivedSpans LogSpansConfig `yaml:"log_received_spans,omitempty"`
LogDiscardedSpans LogSpansConfig `yaml:"log_discarded_spans,omitempty"`
MetricReceivedSpans MetricReceivedSpansConfig `yaml:"metric_received_spans,omitempty"`

Forwarders forwarder.ConfigList `yaml:"forwarders"`
Expand All @@ -51,7 +52,7 @@ type Config struct {
factory ring_client.PoolAddrFunc `yaml:"-"`
}

type LogReceivedSpansConfig struct {
type LogSpansConfig struct {
Enabled bool `yaml:"enabled"`
IncludeAllAttributes bool `yaml:"include_all_attributes"`
FilterByStatusError bool `yaml:"filter_by_status_error"`
Expand All @@ -75,4 +76,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
f.BoolVar(&cfg.LogReceivedSpans.Enabled, util.PrefixConfig(prefix, "log-received-spans.enabled"), false, "Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.")
f.BoolVar(&cfg.LogReceivedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-received-spans.include-attributes"), false, "Enable to include span attributes in the logs.")
f.BoolVar(&cfg.LogReceivedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-received-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.")

f.BoolVar(&cfg.LogDiscardedSpans.Enabled, util.PrefixConfig(prefix, "log-discarded-spans.enabled"), false, "Enable to log every discarded span to help debug ingestion or calculate span error distributions using the logs.")
f.BoolVar(&cfg.LogDiscardedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-discarded-spans.include-attributes"), false, "Enable to include span attributes in the logs.")
f.BoolVar(&cfg.LogDiscardedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-discarded-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.")
}
87 changes: 77 additions & 10 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ type rebatchedTrace struct {
spanCount int
}

type discardedTrace struct {
*rebatchedTrace
errorReason tempopb.PushErrorReason
}

// Distributor coordinates replicates and distribution of log streams.
type Distributor struct {
services.Service
Expand Down Expand Up @@ -344,9 +349,7 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te

batches := trace.ResourceSpans

if d.cfg.LogReceivedSpans.Enabled {
logSpans(batches, &d.cfg.LogReceivedSpans, d.logger)
}
logReceivedSpans(batches, &d.cfg.LogReceivedSpans, d.logger)
if d.cfg.MetricReceivedSpans.Enabled {
metricSpans(batches, userID, &d.cfg.MetricReceivedSpans)
}
Expand All @@ -357,6 +360,7 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te
keys, rebatchedTraces, err := requestsByTraceID(batches, userID, spanCount)
if err != nil {
overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID)
logDiscardedResourceSpans(batches, &d.cfg.LogDiscardedSpans, d.logger)
return nil, err
}

Expand Down Expand Up @@ -435,17 +439,22 @@ func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string

return nil
}, func() {})
// if err != nil, we discarded everything because of an internal error
// if err != nil, we discarded everything because of an internal error (like "context cancelled")
if err != nil {
overrides.RecordDiscardedSpans(totalSpanCount, reasonInternalError, userID)
logDiscardedRebatchedSpans(traces, &d.cfg.LogDiscardedSpans, d.logger)
return err
}

// count discarded span count
mu.Lock()
defer mu.Unlock()

maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount := countDiscaredSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, writeRing.ReplicationFactor())
discardedTraces := discardedTraces(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, writeRing.ReplicationFactor())
logDiscardedSpans(discardedTraces, &d.cfg.LogDiscardedSpans, d.logger)

maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount := countDiscardedSpans(discardedTraces)

overrides.RecordDiscardedSpans(maxLiveDiscardedCount, reasonLiveTracesExceeded, userID)
overrides.RecordDiscardedSpans(traceTooLargeDiscardedCount, reasonTraceTooLarge, userID)
overrides.RecordDiscardedSpans(unknownErrorCount, reasonUnknown, userID)
Expand Down Expand Up @@ -575,16 +584,32 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int
return keys, traces, nil
}

func countDiscaredSpans(numSuccessByTraceIndex []int, lastErrorReasonByTraceIndex []tempopb.PushErrorReason, traces []*rebatchedTrace, repFactor int) (maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount int) {
func discardedTraces(numSuccessByTraceIndex []int, lastErrorReasonByTraceIndex []tempopb.PushErrorReason, traces []*rebatchedTrace, repFactor int) []*discardedTrace {
quorum := int(math.Floor(float64(repFactor)/2)) + 1 // min success required
discardedTraces := make([]*discardedTrace, 0)

for traceIndex, numSuccess := range numSuccessByTraceIndex {
// we will count anything that did not receive min success as discarded
if numSuccess >= quorum {
continue
}
spanCount := traces[traceIndex].spanCount
switch lastErrorReasonByTraceIndex[traceIndex] {
trace := traces[traceIndex]
errorReason := lastErrorReasonByTraceIndex[traceIndex]
if errorReason != tempopb.PushErrorReason_NO_ERROR {
discardedTraces = append(discardedTraces, &discardedTrace{
trace,
errorReason,
})
}
}

return discardedTraces
}

func countDiscardedSpans(discardedTraces []*discardedTrace) (maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount int) {
for _, discardedTrace := range discardedTraces {
spanCount := discardedTrace.spanCount
switch discardedTrace.errorReason {
case tempopb.PushErrorReason_MAX_LIVE_TRACES:
maxLiveDiscardedCount += spanCount
case tempopb.PushErrorReason_TRACE_TOO_LARGE:
Expand Down Expand Up @@ -654,7 +679,49 @@ func metricSpans(batches []*v1.ResourceSpans, tenantID string, cfg *MetricReceiv
}
}

func logSpans(batches []*v1.ResourceSpans, cfg *LogReceivedSpansConfig, logger log.Logger) {
func logDiscardedSpans(batches []*discardedTrace, cfg *LogSpansConfig, logger log.Logger) {
if !cfg.Enabled {
return
}
for _, b := range batches {
logDiscardedResourceSpans(b.trace.ResourceSpans, cfg, logger)
}
}

func logDiscardedRebatchedSpans(batches []*rebatchedTrace, cfg *LogSpansConfig, logger log.Logger) {
if !cfg.Enabled {
return
}
for _, b := range batches {
logDiscardedResourceSpans(b.trace.ResourceSpans, cfg, logger)
}
}

func logDiscardedResourceSpans(batches []*v1.ResourceSpans, cfg *LogSpansConfig, logger log.Logger) {
if !cfg.Enabled {
return
}
loggerWithAtts := logger
loggerWithAtts = log.With(
loggerWithAtts,
"msg", "discarded",
)
logSpans(batches, cfg, loggerWithAtts)
}

func logReceivedSpans(batches []*v1.ResourceSpans, cfg *LogSpansConfig, logger log.Logger) {
if !cfg.Enabled {
return
}
loggerWithAtts := logger
loggerWithAtts = log.With(
loggerWithAtts,
"msg", "received",
)
logSpans(batches, cfg, loggerWithAtts)
}

func logSpans(batches []*v1.ResourceSpans, cfg *LogSpansConfig, logger log.Logger) {
for _, b := range batches {
loggerWithAtts := logger

Expand Down Expand Up @@ -697,7 +764,7 @@ func logSpan(s *v1.Span, allAttributes bool, logger log.Logger) {
"span_status", s.GetStatus().GetCode().String())
}

level.Info(logger).Log("msg", "received", "spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId))
level.Info(logger).Log("spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId))
}

// startEndFromSpan returns a unix epoch timestamp in seconds for the start and end of a span
Expand Down
Loading

0 comments on commit 5ed499e

Please sign in to comment.