Skip to content

Commit

Permalink
add a LogDiscardedSpansConfig to the distributor to log discarded spa…
Browse files Browse the repository at this point in the history
…ns causing issues.

```yaml
distributor:
  config:
    log_discarded_spans:
      enabled: true
    log_received_spans:
      # ...

```

Relates to grafana#3957.

Signed-off-by: Daniel Strobusch <[email protected]>
  • Loading branch information
dastrobu committed Aug 23, 2024
1 parent fbf249a commit cb42bbc
Show file tree
Hide file tree
Showing 10 changed files with 381 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [FEATURE] Discarded span logging `log_discarded_spans` [#3957](https://github.com/grafana/tempo/issues/3957) (@dastrobu)
* [ENHANCEMENT] TraceQL: Attribute iterators collect matched array values [#3867](https://github.com/grafana/tempo/pull/3867) (@electron0zero, @stoewer)
* [ENHANCEMENT] Add bytes and spans received to usage stats [#3983](https://github.com/grafana/tempo/pull/3983) (@joe-elliott)

Expand Down
9 changes: 8 additions & 1 deletion cmd/tempo/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func (c *Config) CheckConfig() []ConfigWarning {
warnings = append(warnings, warnLogReceivedTraces)
}

if c.Distributor.LogDiscardedSpans.Enabled {
warnings = append(warnings, warnLogDiscardedTraces)
}

if c.StorageConfig.Trace.Backend == backend.Local && c.Target != SingleBinary {
warnings = append(warnings, warnStorageTraceBackendLocal)
}
Expand Down Expand Up @@ -266,7 +270,10 @@ var (
Explain: fmt.Sprintf("default=%d", tempodb.DefaultBlocklistPollConcurrency),
}
warnLogReceivedTraces = ConfigWarning{
Message: "Span logging is enabled. This is for debuging only and not recommended for production deployments.",
Message: "Span logging is enabled. This is for debugging only and not recommended for production deployments.",
}
warnLogDiscardedTraces = ConfigWarning{
Message: "Span logging for discarded traces is enabled. This is for debugging only and not recommended for production deployments.",
}
warnStorageTraceBackendLocal = ConfigWarning{
Message: "Local backend will not correctly retrieve traces with a distributed deployment unless all components have access to the same disk. You should probably be using object storage as a backend.",
Expand Down
6 changes: 5 additions & 1 deletion cmd/tempo/app/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ func TestConfig_CheckConfig(t *testing.T) {
},
},
Distributor: distributor.Config{
LogReceivedSpans: distributor.LogReceivedSpansConfig{
LogReceivedSpans: distributor.LogSpansConfig{
Enabled: true,
},
LogDiscardedSpans: distributor.LogSpansConfig{
Enabled: true,
},
},
Expand All @@ -57,6 +60,7 @@ func TestConfig_CheckConfig(t *testing.T) {
warnStorageTraceBackendS3,
warnBlocklistPollConcurrency,
warnLogReceivedTraces,
warnLogDiscardedTraces,
warnNativeAWSAuthEnabled,
warnConfiguredLegacyCache,
},
Expand Down
9 changes: 8 additions & 1 deletion docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,19 @@ distributor:


# Optional.
# Enable to log every received span to help debug ingestion or calculate span error distributions using the logs
# Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.
# This is not recommended for production environments
log_received_spans:
[enabled: <boolean> | default = false]
[include_all_attributes: <boolean> | default = false]
[filter_by_status_error: <boolean> | default = false]

# Optional.
# Enable to log every discarded span to help debug ingestion or calculate span error distributions using the logs.
log_discarded_spans:
[enabled: <boolean> | default = false]
[include_all_attributes: <boolean> | default = false]
[filter_by_status_error: <boolean> | default = false]

# Optional.
# Enable to metric every received span to help debug ingestion
Expand Down
31 changes: 31 additions & 0 deletions docs/sources/tempo/troubleshooting/max-trace-limit-reached.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,23 @@ aliases:

The two most likely causes of refused spans are unhealthy ingesters or trace limits being exceeded.

To log spans that are discarded, add the `--distributor.log_discarded_spans.enabled` flag to the distributor or
adjust the [distributor config]({{< relref "../configuration#distributor" >}}):

```yaml
distributor:
log_discarded_spans:
enabled: true
include_all_attributes: false # set to true for more verbose logs
```
Adding the flag logs all discarded spans, as shown below:
```
level=info ts=2024-08-19T16:06:25.880684385Z caller=distributor.go:767 msg=discarded spanid=c2ebe710d2e2ce7a traceid=bd63605778e3dbe935b05e6afd291006
level=info ts=2024-08-19T16:06:25.881169385Z caller=distributor.go:767 msg=discarded spanid=5352b0cb176679c8 traceid=ba41cae5089c9284e18bca08fbf10ca2
```

## Unhealthy ingesters

Unhealthy ingesters can be caused by failing OOMs or scale down events.
Expand Down Expand Up @@ -41,3 +58,17 @@ tempo_discarded_spans_total
```

In this case, use available configuration options to [increase limits]({{< relref "../configuration#ingestion-limits" >}}).

## Client resets connection

When the client resets the connection before the distributor can consume the trace data, you see logs like this:

```
msg="pusher failed to consume trace data" err="context canceled"
```

This issue needs to be fixed on the client side. To inspect which clients are causing the issue, logging discarded spans
with `include_all_attributes: true` may help.

Note that there may be other reasons for a closed context as well. Identifying the reason for a closed context is
not straightforward and may require additional debugging.
2 changes: 2 additions & 0 deletions example/helm/microservices-tempo-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ distributor:
config:
log_received_spans:
enabled: true
log_discarded_spans:
enabled: true
2 changes: 2 additions & 0 deletions integration/e2e/deployments/config-all-in-one-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ distributor:
zipkin:
log_received_spans:
enabled: true
log_discarded_spans:
enabled: true

ingester:
lifecycler:
Expand Down
9 changes: 7 additions & 2 deletions modules/distributor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ type Config struct {
// otel collector: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver
Receivers map[string]interface{} `yaml:"receivers"`
OverrideRingKey string `yaml:"override_ring_key"`
LogReceivedSpans LogReceivedSpansConfig `yaml:"log_received_spans,omitempty"`
LogReceivedSpans LogSpansConfig `yaml:"log_received_spans,omitempty"`
LogDiscardedSpans LogSpansConfig `yaml:"log_discarded_spans,omitempty"`
MetricReceivedSpans MetricReceivedSpansConfig `yaml:"metric_received_spans,omitempty"`

Forwarders forwarder.ConfigList `yaml:"forwarders"`
Expand All @@ -51,7 +52,7 @@ type Config struct {
factory ring_client.PoolAddrFunc `yaml:"-"`
}

type LogReceivedSpansConfig struct {
type LogSpansConfig struct {
Enabled bool `yaml:"enabled"`
IncludeAllAttributes bool `yaml:"include_all_attributes"`
FilterByStatusError bool `yaml:"filter_by_status_error"`
Expand All @@ -75,4 +76,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
f.BoolVar(&cfg.LogReceivedSpans.Enabled, util.PrefixConfig(prefix, "log-received-spans.enabled"), false, "Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.")
f.BoolVar(&cfg.LogReceivedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-received-spans.include-attributes"), false, "Enable to include span attributes in the logs.")
f.BoolVar(&cfg.LogReceivedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-received-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.")

f.BoolVar(&cfg.LogDiscardedSpans.Enabled, util.PrefixConfig(prefix, "log-discarded-spans.enabled"), false, "Enable to log every discarded span to help debug ingestion or calculate span error distributions using the logs.")
f.BoolVar(&cfg.LogDiscardedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-discarded-spans.include-attributes"), false, "Enable to include span attributes in the logs.")
f.BoolVar(&cfg.LogDiscardedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-discarded-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.")
}
96 changes: 82 additions & 14 deletions modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,9 +348,7 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te

batches := trace.ResourceSpans

if d.cfg.LogReceivedSpans.Enabled {
logSpans(batches, &d.cfg.LogReceivedSpans, d.logger)
}
logReceivedSpans(batches, &d.cfg.LogReceivedSpans, d.logger)
if d.cfg.MetricReceivedSpans.Enabled {
metricSpans(batches, userID, &d.cfg.MetricReceivedSpans)
}
Expand All @@ -363,6 +361,7 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te
keys, rebatchedTraces, err := requestsByTraceID(batches, userID, spanCount)
if err != nil {
overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID)
logDiscardedResourceSpans(batches, userID, &d.cfg.LogDiscardedSpans, d.logger)
return nil, err
}

Expand Down Expand Up @@ -441,20 +440,18 @@ func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string

return nil
}, func() {})
// if err != nil, we discarded everything because of an internal error
// if err != nil, we discarded everything because of an internal error (like "context cancelled")
if err != nil {
overrides.RecordDiscardedSpans(totalSpanCount, reasonInternalError, userID)
logDiscardedRebatchedSpans(traces, userID, &d.cfg.LogDiscardedSpans, d.logger)
return err
}

// count discarded span count
mu.Lock()
defer mu.Unlock()

maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount := countDiscaredSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, writeRing.ReplicationFactor())
overrides.RecordDiscardedSpans(maxLiveDiscardedCount, reasonLiveTracesExceeded, userID)
overrides.RecordDiscardedSpans(traceTooLargeDiscardedCount, reasonTraceTooLarge, userID)
overrides.RecordDiscardedSpans(unknownErrorCount, reasonUnknown, userID)
recordDiscardedSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, writeRing, userID)
logDiscardedSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, writeRing, userID, &d.cfg.LogDiscardedSpans, d.logger)

return nil
}
Expand Down Expand Up @@ -581,12 +578,21 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int
return keys, traces, nil
}

func countDiscaredSpans(numSuccessByTraceIndex []int, lastErrorReasonByTraceIndex []tempopb.PushErrorReason, traces []*rebatchedTrace, repFactor int) (maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount int) {
// discardedPredicate determines if a trace is discarded based on the number of successful replications.
type discardedPredicate func(int) bool

func newDiscardedPredicate(repFactor int) discardedPredicate {
quorum := int(math.Floor(float64(repFactor)/2)) + 1 // min success required
return func(numSuccess int) bool {
return numSuccess < quorum
}
}

func countDiscardedSpans(numSuccessByTraceIndex []int, lastErrorReasonByTraceIndex []tempopb.PushErrorReason, traces []*rebatchedTrace, repFactor int) (maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount int) {
discarded := newDiscardedPredicate(repFactor)

for traceIndex, numSuccess := range numSuccessByTraceIndex {
// we will count anything that did not receive min success as discarded
if numSuccess >= quorum {
if !discarded(numSuccess) {
continue
}
spanCount := traces[traceIndex].spanCount
Expand Down Expand Up @@ -660,7 +666,69 @@ func metricSpans(batches []*v1.ResourceSpans, tenantID string, cfg *MetricReceiv
}
}

func logSpans(batches []*v1.ResourceSpans, cfg *LogReceivedSpansConfig, logger log.Logger) {
func recordDiscardedSpans(numSuccessByTraceIndex []int, lastErrorReasonByTraceIndex []tempopb.PushErrorReason, traces []*rebatchedTrace, writeRing ring.ReadRing, userID string) {
maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount := countDiscardedSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, writeRing.ReplicationFactor())
overrides.RecordDiscardedSpans(maxLiveDiscardedCount, reasonLiveTracesExceeded, userID)
overrides.RecordDiscardedSpans(traceTooLargeDiscardedCount, reasonTraceTooLarge, userID)
overrides.RecordDiscardedSpans(unknownErrorCount, reasonUnknown, userID)
}

func logDiscardedSpans(numSuccessByTraceIndex []int, lastErrorReasonByTraceIndex []tempopb.PushErrorReason, traces []*rebatchedTrace, writeRing ring.ReadRing, userID string, cfg *LogSpansConfig, logger log.Logger) {
if !cfg.Enabled {
return
}
discarded := newDiscardedPredicate(writeRing.ReplicationFactor())
for traceIndex, numSuccess := range numSuccessByTraceIndex {
if !discarded(numSuccess) {
continue
}
errorReason := lastErrorReasonByTraceIndex[traceIndex]
if errorReason != tempopb.PushErrorReason_NO_ERROR {
loggerWithAtts := logger
loggerWithAtts = log.With(
loggerWithAtts,
"push_error_reason", fmt.Sprintf("%v", errorReason),
)
logDiscardedResourceSpans(traces[traceIndex].trace.ResourceSpans, userID, cfg, loggerWithAtts)
}
}
}

func logDiscardedRebatchedSpans(batches []*rebatchedTrace, userID string, cfg *LogSpansConfig, logger log.Logger) {
if !cfg.Enabled {
return
}
for _, b := range batches {
logDiscardedResourceSpans(b.trace.ResourceSpans, userID, cfg, logger)
}
}

func logDiscardedResourceSpans(batches []*v1.ResourceSpans, userID string, cfg *LogSpansConfig, logger log.Logger) {
if !cfg.Enabled {
return
}
loggerWithAtts := logger
loggerWithAtts = log.With(
loggerWithAtts,
"msg", "discarded",
"tenant", userID,
)
logSpans(batches, cfg, loggerWithAtts)
}

func logReceivedSpans(batches []*v1.ResourceSpans, cfg *LogSpansConfig, logger log.Logger) {
if !cfg.Enabled {
return
}
loggerWithAtts := logger
loggerWithAtts = log.With(
loggerWithAtts,
"msg", "received",
)
logSpans(batches, cfg, loggerWithAtts)
}

func logSpans(batches []*v1.ResourceSpans, cfg *LogSpansConfig, logger log.Logger) {
for _, b := range batches {
loggerWithAtts := logger

Expand Down Expand Up @@ -703,7 +771,7 @@ func logSpan(s *v1.Span, allAttributes bool, logger log.Logger) {
"span_status", s.GetStatus().GetCode().String())
}

level.Info(logger).Log("msg", "received", "spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId))
level.Info(logger).Log("spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId))
}

// startEndFromSpan returns a unix epoch timestamp in seconds for the start and end of a span
Expand Down
Loading

0 comments on commit cb42bbc

Please sign in to comment.