diff --git a/CHANGELOG.md b/CHANGELOG.md index 10ec52b36d02..328a3499a90b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * [CHANGE] Azure v2 backend becomes the only and primary Azure backend [#3875](https://github.com/grafana/tempo/pull/3875) (@zalegrala) **BREAKING CHANGE** The `use_v2_sdk` configuration option has been removed. * [CHANGE] BlockMeta improvements to reduce the size [#3950](https://github.com/grafana/tempo/pull/3950) [#3951](https://github.com/grafana/tempo/pull/3951) [#3952](https://github.com/grafana/tempo/pull/3952)(@zalegrala) +* [FEATURE] Discarded span logging `log_discarded_spans` [#3957](https://github.com/grafana/tempo/issues/3957) (@dastrobu) * [FEATURE] TraceQL support for link scope and link:traceID and link:spanID [#3741](https://github.com/grafana/tempo/pull/3741) (@stoewer) * [FEATURE] TraceQL support for link attribute querying [#3814](https://github.com/grafana/tempo/pull/3814) (@ie-pham) * [FEATURE] TraceQL support for event scope and event:name intrinsic [#3708](https://github.com/grafana/tempo/pull/3708) (@stoewer) diff --git a/cmd/tempo/app/config.go b/cmd/tempo/app/config.go index 996de77c5ee2..475aeeb7a2ce 100644 --- a/cmd/tempo/app/config.go +++ b/cmd/tempo/app/config.go @@ -266,7 +266,7 @@ var ( Explain: fmt.Sprintf("default=%d", tempodb.DefaultBlocklistPollConcurrency), } warnLogReceivedTraces = ConfigWarning{ - Message: "Span logging is enabled. This is for debuging only and not recommended for production deployments.", + Message: "Span logging is enabled. This is for debugging only and not recommended for production deployments.", } warnStorageTraceBackendLocal = ConfigWarning{ Message: "Local backend will not correctly retrieve traces with a distributed deployment unless all components have access to the same disk. You should probably be using object storage as a backend.", diff --git a/cmd/tempo/app/config_test.go b/cmd/tempo/app/config_test.go index 9e3bc4e6d038..9ae597a721f1 100644 --- a/cmd/tempo/app/config_test.go +++ b/cmd/tempo/app/config_test.go @@ -45,7 +45,7 @@ func TestConfig_CheckConfig(t *testing.T) { }, }, Distributor: distributor.Config{ - LogReceivedSpans: distributor.LogReceivedSpansConfig{ + LogReceivedSpans: distributor.LogSpansConfig{ Enabled: true, }, }, diff --git a/docs/sources/tempo/configuration/_index.md b/docs/sources/tempo/configuration/_index.md index 4f5342a6d478..2680cde63560 100644 --- a/docs/sources/tempo/configuration/_index.md +++ b/docs/sources/tempo/configuration/_index.md @@ -202,6 +202,13 @@ distributor: [enabled: | default = false] [include_all_attributes: | default = false] [filter_by_status_error: | default = false] + + # Optional. + # Enable to log every discarded span to help debug ingestion or calculate span error distributions using the logs + log_discarded_spans: + [enabled: | default = false] + [include_all_attributes: | default = false] + [filter_by_status_error: | default = false] # Optional. # Enable to metric every received span to help debug ingestion diff --git a/docs/sources/tempo/troubleshooting/max-trace-limit-reached.md b/docs/sources/tempo/troubleshooting/max-trace-limit-reached.md index 8f5c84a26ebd..bb927226d7c7 100644 --- a/docs/sources/tempo/troubleshooting/max-trace-limit-reached.md +++ b/docs/sources/tempo/troubleshooting/max-trace-limit-reached.md @@ -10,6 +10,23 @@ aliases: The two most likely causes of refused spans are unhealthy ingesters or trace limits being exceeded. +To log spans that are discarded, add the `--distributor.log_discarded_spans.enabled` flag to the distributor or +adjust the [distributor config]({{< relref "../configuration#distributor" >}}): + +```yaml +distributor: + log_discarded_spans: + enabled: true + include_all_attributes: false # set to true for more verbose logs +``` + +This will log all discarded spans, as shown below: + +``` +level=info ts=2024-08-19T16:06:25.880684385Z caller=distributor.go:767 msg=discarded spanid=c2ebe710d2e2ce7a traceid=bd63605778e3dbe935b05e6afd291006 +level=info ts=2024-08-19T16:06:25.881169385Z caller=distributor.go:767 msg=discarded spanid=5352b0cb176679c8 traceid=ba41cae5089c9284e18bca08fbf10ca2 +``` + ## Unhealthy ingesters Unhealthy ingesters can be caused by failing OOMs or scale down events. @@ -41,3 +58,17 @@ tempo_discarded_spans_total ``` In this case, use available configuration options to [increase limits]({{< relref "../configuration#ingestion-limits" >}}). + +## Client Resets Connection + +When the client resets the connection before the distributor can consume the trace data, you will see logs like this: + +``` +msg="pusher failed to consume trace data" err="context canceled" +``` + +This issue needs to be fixed on the client side. To inspect which clients are causing the issue, logging discarded spans +with `include_all_attributes: true` may help. + +Note that there may be other reasons for a closed context as well. Identifying the reason for a closed context is +not straightforward and may require additional debugging. \ No newline at end of file diff --git a/example/helm/microservices-tempo-values.yaml b/example/helm/microservices-tempo-values.yaml index db691e89af88..83c4c32d2909 100644 --- a/example/helm/microservices-tempo-values.yaml +++ b/example/helm/microservices-tempo-values.yaml @@ -24,3 +24,5 @@ distributor: config: log_received_spans: enabled: true + log_discarded_spans: + enabled: true diff --git a/integration/e2e/deployments/config-all-in-one-local.yaml b/integration/e2e/deployments/config-all-in-one-local.yaml index b5180a142738..c0133165d7be 100644 --- a/integration/e2e/deployments/config-all-in-one-local.yaml +++ b/integration/e2e/deployments/config-all-in-one-local.yaml @@ -21,6 +21,8 @@ distributor: zipkin: log_received_spans: enabled: true + log_discarded_spans: + enabled: true ingester: lifecycler: diff --git a/modules/distributor/config.go b/modules/distributor/config.go index 43d94845c1d7..d814247a41a9 100644 --- a/modules/distributor/config.go +++ b/modules/distributor/config.go @@ -34,7 +34,8 @@ type Config struct { // otel collector: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver Receivers map[string]interface{} `yaml:"receivers"` OverrideRingKey string `yaml:"override_ring_key"` - LogReceivedSpans LogReceivedSpansConfig `yaml:"log_received_spans,omitempty"` + LogReceivedSpans LogSpansConfig `yaml:"log_received_spans,omitempty"` + LogDiscardedSpans LogSpansConfig `yaml:"log_discarded_spans,omitempty"` MetricReceivedSpans MetricReceivedSpansConfig `yaml:"metric_received_spans,omitempty"` Forwarders forwarder.ConfigList `yaml:"forwarders"` @@ -51,7 +52,7 @@ type Config struct { factory ring_client.PoolAddrFunc `yaml:"-"` } -type LogReceivedSpansConfig struct { +type LogSpansConfig struct { Enabled bool `yaml:"enabled"` IncludeAllAttributes bool `yaml:"include_all_attributes"` FilterByStatusError bool `yaml:"filter_by_status_error"` @@ -75,4 +76,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) f.BoolVar(&cfg.LogReceivedSpans.Enabled, util.PrefixConfig(prefix, "log-received-spans.enabled"), false, "Enable to log every received span to help debug ingestion or calculate span error distributions using the logs.") f.BoolVar(&cfg.LogReceivedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-received-spans.include-attributes"), false, "Enable to include span attributes in the logs.") f.BoolVar(&cfg.LogReceivedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-received-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.") + + f.BoolVar(&cfg.LogDiscardedSpans.Enabled, util.PrefixConfig(prefix, "log-discarded-spans.enabled"), false, "Enable to log every discarded span to help debug ingestion or calculate span error distributions using the logs.") + f.BoolVar(&cfg.LogDiscardedSpans.IncludeAllAttributes, util.PrefixConfig(prefix, "log-discarded-spans.include-attributes"), false, "Enable to include span attributes in the logs.") + f.BoolVar(&cfg.LogDiscardedSpans.FilterByStatusError, util.PrefixConfig(prefix, "log-discarded-spans.filter-by-status-error"), false, "Enable to filter out spans without status error.") } diff --git a/modules/distributor/distributor.go b/modules/distributor/distributor.go index b56d2d523142..d0fa4242fa47 100644 --- a/modules/distributor/distributor.go +++ b/modules/distributor/distributor.go @@ -120,6 +120,11 @@ type rebatchedTrace struct { spanCount int } +type discardedTrace struct { + *rebatchedTrace + errorReason tempopb.PushErrorReason +} + // Distributor coordinates replicates and distribution of log streams. type Distributor struct { services.Service @@ -344,9 +349,7 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te batches := trace.ResourceSpans - if d.cfg.LogReceivedSpans.Enabled { - logSpans(batches, &d.cfg.LogReceivedSpans, d.logger) - } + logReceivedSpans(batches, &d.cfg.LogReceivedSpans, d.logger) if d.cfg.MetricReceivedSpans.Enabled { metricSpans(batches, userID, &d.cfg.MetricReceivedSpans) } @@ -357,6 +360,7 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te keys, rebatchedTraces, err := requestsByTraceID(batches, userID, spanCount) if err != nil { overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID) + logDiscardedResourceSpans(batches, &d.cfg.LogDiscardedSpans, d.logger) return nil, err } @@ -435,9 +439,10 @@ func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string return nil }, func() {}) - // if err != nil, we discarded everything because of an internal error + // if err != nil, we discarded everything because of an internal error (like "context cancelled") if err != nil { overrides.RecordDiscardedSpans(totalSpanCount, reasonInternalError, userID) + logDiscardedRebatchedSpans(traces, &d.cfg.LogDiscardedSpans, d.logger) return err } @@ -445,7 +450,11 @@ func (d *Distributor) sendToIngestersViaBytes(ctx context.Context, userID string mu.Lock() defer mu.Unlock() - maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount := countDiscaredSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, writeRing.ReplicationFactor()) + discardedTraces := discardedTraces(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, writeRing.ReplicationFactor()) + logDiscardedSpans(discardedTraces, &d.cfg.LogDiscardedSpans, d.logger) + + maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount := countDiscardedSpans(discardedTraces) + overrides.RecordDiscardedSpans(maxLiveDiscardedCount, reasonLiveTracesExceeded, userID) overrides.RecordDiscardedSpans(traceTooLargeDiscardedCount, reasonTraceTooLarge, userID) overrides.RecordDiscardedSpans(unknownErrorCount, reasonUnknown, userID) @@ -575,16 +584,32 @@ func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int return keys, traces, nil } -func countDiscaredSpans(numSuccessByTraceIndex []int, lastErrorReasonByTraceIndex []tempopb.PushErrorReason, traces []*rebatchedTrace, repFactor int) (maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount int) { +func discardedTraces(numSuccessByTraceIndex []int, lastErrorReasonByTraceIndex []tempopb.PushErrorReason, traces []*rebatchedTrace, repFactor int) []*discardedTrace { quorum := int(math.Floor(float64(repFactor)/2)) + 1 // min success required + discardedTraces := make([]*discardedTrace, 0) for traceIndex, numSuccess := range numSuccessByTraceIndex { // we will count anything that did not receive min success as discarded if numSuccess >= quorum { continue } - spanCount := traces[traceIndex].spanCount - switch lastErrorReasonByTraceIndex[traceIndex] { + trace := traces[traceIndex] + errorReason := lastErrorReasonByTraceIndex[traceIndex] + if errorReason != tempopb.PushErrorReason_NO_ERROR { + discardedTraces = append(discardedTraces, &discardedTrace{ + trace, + errorReason, + }) + } + } + + return discardedTraces +} + +func countDiscardedSpans(discardedTraces []*discardedTrace) (maxLiveDiscardedCount, traceTooLargeDiscardedCount, unknownErrorCount int) { + for _, discardedTrace := range discardedTraces { + spanCount := discardedTrace.spanCount + switch discardedTrace.errorReason { case tempopb.PushErrorReason_MAX_LIVE_TRACES: maxLiveDiscardedCount += spanCount case tempopb.PushErrorReason_TRACE_TOO_LARGE: @@ -654,7 +679,49 @@ func metricSpans(batches []*v1.ResourceSpans, tenantID string, cfg *MetricReceiv } } -func logSpans(batches []*v1.ResourceSpans, cfg *LogReceivedSpansConfig, logger log.Logger) { +func logDiscardedSpans(batches []*discardedTrace, cfg *LogSpansConfig, logger log.Logger) { + if !cfg.Enabled { + return + } + for _, b := range batches { + logDiscardedResourceSpans(b.trace.ResourceSpans, cfg, logger) + } +} + +func logDiscardedRebatchedSpans(batches []*rebatchedTrace, cfg *LogSpansConfig, logger log.Logger) { + if !cfg.Enabled { + return + } + for _, b := range batches { + logDiscardedResourceSpans(b.trace.ResourceSpans, cfg, logger) + } +} + +func logDiscardedResourceSpans(batches []*v1.ResourceSpans, cfg *LogSpansConfig, logger log.Logger) { + if !cfg.Enabled { + return + } + loggerWithAtts := logger + loggerWithAtts = log.With( + loggerWithAtts, + "msg", "discarded", + ) + logSpans(batches, cfg, loggerWithAtts) +} + +func logReceivedSpans(batches []*v1.ResourceSpans, cfg *LogSpansConfig, logger log.Logger) { + if !cfg.Enabled { + return + } + loggerWithAtts := logger + loggerWithAtts = log.With( + loggerWithAtts, + "msg", "received", + ) + logSpans(batches, cfg, loggerWithAtts) +} + +func logSpans(batches []*v1.ResourceSpans, cfg *LogSpansConfig, logger log.Logger) { for _, b := range batches { loggerWithAtts := logger @@ -697,7 +764,7 @@ func logSpan(s *v1.Span, allAttributes bool, logger log.Logger) { "span_status", s.GetStatus().GetCode().String()) } - level.Info(logger).Log("msg", "received", "spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId)) + level.Info(logger).Log("spanid", hex.EncodeToString(s.SpanId), "traceid", hex.EncodeToString(s.TraceId)) } // startEndFromSpan returns a unix epoch timestamp in seconds for the start and end of a span diff --git a/modules/distributor/distributor_test.go b/modules/distributor/distributor_test.go index b0fa2aec60d6..21e22137a3c7 100644 --- a/modules/distributor/distributor_test.go +++ b/modules/distributor/distributor_test.go @@ -13,6 +13,8 @@ import ( "testing" "time" + "golang.org/x/exp/maps" + kitlog "github.com/go-kit/log" "github.com/gogo/status" "github.com/golang/protobuf/proto" // nolint: all //ProtoReflect @@ -771,7 +773,7 @@ func TestDistributor(t *testing.T) { limits.RegisterFlagsAndApplyDefaults(&flag.FlagSet{}) // todo: test limits - d := prepare(t, limits, nil) + d, _ := prepare(t, limits, nil) b := test.MakeBatch(tc.lines, []byte{}) traces := batchesToTraces(t, []*v1.ResourceSpans{b}) @@ -783,7 +785,7 @@ func TestDistributor(t *testing.T) { } } -func TestLogSpans(t *testing.T) { +func TestLogReceivedSpans(t *testing.T) { for i, tc := range []struct { LogReceivedSpansEnabled bool filterByStatusError bool @@ -947,15 +949,15 @@ func TestLogSpans(t *testing.T) { }, }, } { - t.Run(fmt.Sprintf("[%d] TestLogSpans LogReceivedSpansEnabled=%v filterByStatusError=%v includeAllAttributes=%v", i, tc.LogReceivedSpansEnabled, tc.filterByStatusError, tc.includeAllAttributes), func(t *testing.T) { + t.Run(fmt.Sprintf("[%d] TestLogReceivedSpans LogReceivedSpansEnabled=%v filterByStatusError=%v includeAllAttributes=%v", i, tc.LogReceivedSpansEnabled, tc.filterByStatusError, tc.includeAllAttributes), func(t *testing.T) { limits := overrides.Config{} limits.RegisterFlagsAndApplyDefaults(&flag.FlagSet{}) buf := &bytes.Buffer{} logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) - d := prepare(t, limits, logger) - d.cfg.LogReceivedSpans = LogReceivedSpansConfig{ + d, _ := prepare(t, limits, logger) + d.cfg.LogReceivedSpans = LogSpansConfig{ Enabled: tc.LogReceivedSpansEnabled, FilterByStatusError: tc.filterByStatusError, IncludeAllAttributes: tc.includeAllAttributes, @@ -967,21 +969,210 @@ func TestLogSpans(t *testing.T) { t.Fatal(err) } - bufJSON := "[" + strings.TrimRight(strings.ReplaceAll(buf.String(), "\n", ","), ",") + "]" - var actualLogsSpan []testLogSpan - err = json.Unmarshal([]byte(bufJSON), &actualLogsSpan) - if err != nil { - t.Fatal(err) + assert.ElementsMatch(t, tc.expectedLogsSpan, actualLogSpan(t, buf)) + }) + } +} + +func TestLogDiscardedSpansWhenContextCancelled(t *testing.T) { + for i, tc := range []struct { + LogDiscardedSpansEnabled bool + filterByStatusError bool + includeAllAttributes bool + batches []*v1.ResourceSpans + expectedLogsSpan []testLogSpan + }{ + { + LogDiscardedSpansEnabled: false, + batches: []*v1.ResourceSpans{ + makeResourceSpans("test", []*v1.ScopeSpans{ + makeScope( + makeSpan("0a0102030405060708090a0b0c0d0e0f", "dad44adc9a83b370", "Test Span", nil)), + }), + }, + expectedLogsSpan: []testLogSpan{}, + }, + { + LogDiscardedSpansEnabled: true, + batches: []*v1.ResourceSpans{ + makeResourceSpans("test", []*v1.ScopeSpans{ + makeScope( + makeSpan("0a0102030405060708090a0b0c0d0e0f", "dad44adc9a83b370", "Test Span", nil)), + }), + }, + expectedLogsSpan: []testLogSpan{ + { + Msg: "discarded", + Level: "info", + TraceID: "0a0102030405060708090a0b0c0d0e0f", + SpanID: "dad44adc9a83b370", + }, + }, + }, + { + LogDiscardedSpansEnabled: true, + includeAllAttributes: true, + batches: []*v1.ResourceSpans{ + makeResourceSpans("test-service2", []*v1.ScopeSpans{ + makeScope( + makeSpan("b1c792dea27d511c145df8402bdd793a", "56afb9fe18b6c2d6", "Test Span", &v1.Status{Code: v1.Status_STATUS_CODE_ERROR})), + }, makeAttribute("resource_attribute2", "value2")), + }, + expectedLogsSpan: []testLogSpan{ + { + Name: "Test Span", + Msg: "discarded", + Level: "info", + TraceID: "b1c792dea27d511c145df8402bdd793a", + SpanID: "56afb9fe18b6c2d6", + SpanServiceName: "test-service2", + SpanStatus: "STATUS_CODE_ERROR", + SpanKind: "SPAN_KIND_SERVER", + ResourceAttribute2: "value2", + }, + }, + }, + { + LogDiscardedSpansEnabled: true, + filterByStatusError: true, + batches: []*v1.ResourceSpans{ + makeResourceSpans("test-service", []*v1.ScopeSpans{ + makeScope( + makeSpan("0a0102030405060708090a0b0c0d0e0f", "dad44adc9a83b370", "Test Span1", nil), + makeSpan("e3210a2b38097332d1fe43083ea93d29", "6c21c48da4dbd1a7", "Test Span2", &v1.Status{Code: v1.Status_STATUS_CODE_ERROR})), + makeScope( + makeSpan("bb42ec04df789ff04b10ea5274491685", "1b3a296034f4031e", "Test Span3", nil)), + }), + makeResourceSpans("test-service2", []*v1.ScopeSpans{ + makeScope( + makeSpan("b1c792dea27d511c145df8402bdd793a", "56afb9fe18b6c2d6", "Test Span", &v1.Status{Code: v1.Status_STATUS_CODE_ERROR})), + }), + }, + expectedLogsSpan: []testLogSpan{ + { + Msg: "discarded", + Level: "info", + TraceID: "e3210a2b38097332d1fe43083ea93d29", + SpanID: "6c21c48da4dbd1a7", + }, + { + Msg: "discarded", + Level: "info", + TraceID: "b1c792dea27d511c145df8402bdd793a", + SpanID: "56afb9fe18b6c2d6", + }, + }, + }, + } { + t.Run(fmt.Sprintf("[%d] TestLogDiscardedSpansWhenContextCancelled LogDiscardedSpansEnabled=%v filterByStatusError=%v includeAllAttributes=%v", i, tc.LogDiscardedSpansEnabled, tc.filterByStatusError, tc.includeAllAttributes), func(t *testing.T) { + limits := overrides.Config{} + limits.RegisterFlagsAndApplyDefaults(&flag.FlagSet{}) + + buf := &bytes.Buffer{} + logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) + + d, _ := prepare(t, limits, logger) + d.cfg.LogDiscardedSpans = LogSpansConfig{ + Enabled: tc.LogDiscardedSpansEnabled, + FilterByStatusError: tc.filterByStatusError, + IncludeAllAttributes: tc.includeAllAttributes, + } + + traces := batchesToTraces(t, tc.batches) + ctx, cancelFunc := context.WithCancel(ctx) + cancelFunc() // cancel to force all spans to be discarded + + _, err := d.PushTraces(ctx, traces) + assert.ErrorContains(t, err, "context canceled") + + assert.ElementsMatch(t, tc.expectedLogsSpan, actualLogSpan(t, buf)) + }) + } +} + +func TestLogDiscardedSpansWhenPushToIngesterFails(t *testing.T) { + for i, tc := range []struct { + LogDiscardedSpansEnabled bool + filterByStatusError bool + includeAllAttributes bool + batches []*v1.ResourceSpans + expectedLogsSpan []testLogSpan + pushErrorByTrace []tempopb.PushErrorReason + }{ + { + LogDiscardedSpansEnabled: false, + batches: []*v1.ResourceSpans{ + makeResourceSpans("test", []*v1.ScopeSpans{ + makeScope( + makeSpan("0a0102030405060708090a0b0c0d0e0f", "dad44adc9a83b370", "Test Span", nil)), + }), + }, + pushErrorByTrace: []tempopb.PushErrorReason{traceTooLargeError}, + expectedLogsSpan: []testLogSpan{}, + }, + { + LogDiscardedSpansEnabled: true, + batches: []*v1.ResourceSpans{ + makeResourceSpans("test", []*v1.ScopeSpans{ + makeScope( + makeSpan("0a0102030405060708090a0b0c0d0e0f", "dad44adc9a83b370", "Test Span", nil)), + }), + }, + pushErrorByTrace: []tempopb.PushErrorReason{traceTooLargeError}, + expectedLogsSpan: []testLogSpan{ + { + Msg: "discarded", + Level: "info", + TraceID: "0a0102030405060708090a0b0c0d0e0f", + SpanID: "dad44adc9a83b370", + }, + }, + }, + } { + t.Run(fmt.Sprintf("[%d] TestLogDiscardedSpansWhenPushToIngesterFails LogDiscardedSpansEnabled=%v filterByStatusError=%v includeAllAttributes=%v", i, tc.LogDiscardedSpansEnabled, tc.filterByStatusError, tc.includeAllAttributes), func(t *testing.T) { + limits := overrides.Config{} + limits.RegisterFlagsAndApplyDefaults(&flag.FlagSet{}) + + buf := &bytes.Buffer{} + logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) + + d, ingesters := prepare(t, limits, logger) + d.cfg.LogDiscardedSpans = LogSpansConfig{ + Enabled: tc.LogDiscardedSpansEnabled, + FilterByStatusError: tc.filterByStatusError, + IncludeAllAttributes: tc.includeAllAttributes, } - assert.Equal(t, len(tc.expectedLogsSpan), len(actualLogsSpan)) - for i, expectedLogSpan := range tc.expectedLogsSpan { - assert.EqualValues(t, expectedLogSpan, actualLogsSpan[i]) + // mock ingester errors + for _, ingester := range maps.Values(ingesters) { + ingester.pushBytesV2 = func(_ context.Context, _ *tempopb.PushBytesRequest, _ ...grpc.CallOption) (*tempopb.PushResponse, error) { + return &tempopb.PushResponse{ + ErrorsByTrace: tc.pushErrorByTrace, + }, nil + } + } + + traces := batchesToTraces(t, tc.batches) + + _, err := d.PushTraces(ctx, traces) + if err != nil { + t.Fatal(err) } + assert.ElementsMatch(t, tc.expectedLogsSpan, actualLogSpan(t, buf)) }) } } +func actualLogSpan(t *testing.T, buf *bytes.Buffer) []testLogSpan { + bufJSON := "[" + strings.TrimRight(strings.ReplaceAll(buf.String(), "\n", ","), ",") + "]" + var actualLogsSpan []testLogSpan + err := json.Unmarshal([]byte(bufJSON), &actualLogsSpan) + if err != nil { + t.Fatal(err) + } + return actualLogsSpan +} + func TestRateLimitRespected(t *testing.T) { // prepare test data overridesConfig := overrides.Config{ @@ -995,7 +1186,7 @@ func TestRateLimitRespected(t *testing.T) { } buf := &bytes.Buffer{} logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) - d := prepare(t, overridesConfig, logger) + d, _ := prepare(t, overridesConfig, logger) batches := []*v1.ResourceSpans{ makeResourceSpans("test-service", []*v1.ScopeSpans{ makeScope( @@ -1173,7 +1364,8 @@ func TestDiscardCountReplicationFactor(t *testing.T) { } } - liveTraceDiscardedCount, traceTooLongDiscardedCount, _ := countDiscaredSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traceByID, tc.replicationFactor) + discardedTraces := discardedTraces(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traceByID, tc.replicationFactor) + liveTraceDiscardedCount, traceTooLongDiscardedCount, _ := countDiscardedSpans(discardedTraces) require.Equal(t, tc.expectedLiveTracesDiscardedCount, liveTraceDiscardedCount) require.Equal(t, tc.expectedTraceTooLargeDiscardedCount, traceTooLongDiscardedCount) @@ -1225,7 +1417,7 @@ func TestProcessIngesterPushByteResponse(t *testing.T) { overridesConfig := overrides.Config{} buf := &bytes.Buffer{} logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) - d := prepare(t, overridesConfig, logger) + d, _ := prepare(t, overridesConfig, logger) for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { @@ -1246,7 +1438,7 @@ func TestIngesterPushBytes(t *testing.T) { overridesConfig := overrides.Config{} buf := &bytes.Buffer{} logger := kitlog.NewJSONLogger(kitlog.NewSyncWriter(buf)) - d := prepare(t, overridesConfig, logger) + d, _ := prepare(t, overridesConfig, logger) traces := []*rebatchedTrace{ { @@ -1299,7 +1491,8 @@ func TestIngesterPushBytes(t *testing.T) { d.processPushResponse(pushResponse, numSuccessByTraceIndex, lastErrorReasonByTraceIndex, numOfTraces, indexes) } - maxLiveDiscardedCount, traceTooLargeDiscardedCount, _ := countDiscaredSpans(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, 3) + discardedTraces := discardedTraces(numSuccessByTraceIndex, lastErrorReasonByTraceIndex, traces, 3) + maxLiveDiscardedCount, traceTooLargeDiscardedCount, _ := countDiscardedSpans(discardedTraces) assert.Equal(t, traceTooLargeDiscardedCount, 6) assert.Equal(t, maxLiveDiscardedCount, 35) } @@ -1382,7 +1575,7 @@ func makeResourceSpans(serviceName string, ils []*v1.ScopeSpans, attributes ...* return rs } -func prepare(t *testing.T, limits overrides.Config, logger kitlog.Logger) *Distributor { +func prepare(t *testing.T, limits overrides.Config, logger kitlog.Logger) (*Distributor, map[string]*mockIngester) { if logger == nil { logger = kitlog.NewNopLogger() } @@ -1399,7 +1592,10 @@ func prepare(t *testing.T, limits overrides.Config, logger kitlog.Logger) *Distr // Mock the ingesters ring ingesters := map[string]*mockIngester{} for i := 0; i < numIngesters; i++ { - ingesters[fmt.Sprintf("ingester%d", i)] = &mockIngester{} + ingesters[fmt.Sprintf("ingester%d", i)] = &mockIngester{ + pushBytes: pushBytesNoOp, + pushBytesV2: pushBytesNoOp, + } } ingestersRing := &mockRing{ @@ -1425,21 +1621,29 @@ func prepare(t *testing.T, limits overrides.Config, logger kitlog.Logger) *Distr d, err := New(distributorConfig, clientConfig, ingestersRing, generator_client.Config{}, nil, overrides, mw, logger, l, prometheus.NewPedanticRegistry()) require.NoError(t, err) - return d + return d, ingesters } type mockIngester struct { grpc_health_v1.HealthClient + // pushBytes mock to be overridden in test scenarios if needed + pushBytes func(ctx context.Context, in *tempopb.PushBytesRequest, opts ...grpc.CallOption) (*tempopb.PushResponse, error) + // pushBytesV2 mock to be overridden in test scenarios if needed + pushBytesV2 func(ctx context.Context, in *tempopb.PushBytesRequest, opts ...grpc.CallOption) (*tempopb.PushResponse, error) +} + +func pushBytesNoOp(context.Context, *tempopb.PushBytesRequest, ...grpc.CallOption) (*tempopb.PushResponse, error) { + return &tempopb.PushResponse{}, nil } var _ tempopb.PusherClient = (*mockIngester)(nil) -func (i *mockIngester) PushBytes(context.Context, *tempopb.PushBytesRequest, ...grpc.CallOption) (*tempopb.PushResponse, error) { - return &tempopb.PushResponse{}, nil +func (i *mockIngester) PushBytes(ctx context.Context, in *tempopb.PushBytesRequest, opts ...grpc.CallOption) (*tempopb.PushResponse, error) { + return i.pushBytes(ctx, in, opts...) } -func (i *mockIngester) PushBytesV2(context.Context, *tempopb.PushBytesRequest, ...grpc.CallOption) (*tempopb.PushResponse, error) { - return &tempopb.PushResponse{}, nil +func (i *mockIngester) PushBytesV2(ctx context.Context, in *tempopb.PushBytesRequest, opts ...grpc.CallOption) (*tempopb.PushResponse, error) { + return i.pushBytesV2(ctx, in, opts...) } func (i *mockIngester) Close() error {