From 8948ce44759adfa471572c6d1a5dec5ef7e80c92 Mon Sep 17 00:00:00 2001 From: Chen Xu Date: Sat, 15 Apr 2023 19:46:55 -0700 Subject: [PATCH] Expose drop span hook as an option in Collector SpanProcessor (#4387) ## Which problem is this PR solving? Resolves #4375 ## Short description of the changes Expose drop span hook as an option when creating SpanProcessor in collector so that users have the flexibility to get more insights on dropped spans in addition to the dropped count. Signed-off-by: Chen Xu Co-authored-by: Yuri Shkuro --- cmd/collector/app/options.go | 8 +++++++ cmd/collector/app/options_test.go | 3 +++ cmd/collector/app/span_processor.go | 3 +++ cmd/collector/app/span_processor_test.go | 30 +++++++++++++++++++++++- 4 files changed, 43 insertions(+), 1 deletion(-) diff --git a/cmd/collector/app/options.go b/cmd/collector/app/options.go index 4f2f6134d09..c25140a4090 100644 --- a/cmd/collector/app/options.go +++ b/cmd/collector/app/options.go @@ -42,6 +42,7 @@ type options struct { extraFormatTypes []processor.SpanFormat collectorTags map[string]string spanSizeMetricsEnabled bool + onDroppedSpan func(span *model.Span) } // Option is a function that sets some option on StorageBuilder. @@ -164,6 +165,13 @@ func (options) SpanSizeMetricsEnabled(spanSizeMetrics bool) Option { } } +// OnDroppedSpan creates an Option that initializes the onDroppedSpan function +func (options) OnDroppedSpan(onDroppedSpan func(span *model.Span)) Option { + return func(b *options) { + b.onDroppedSpan = onDroppedSpan + } +} + func (o options) apply(opts ...Option) options { ret := options{} for _, opt := range opts { diff --git a/cmd/collector/app/options_test.go b/cmd/collector/app/options_test.go index ecbf4d10bef..e416166829f 100644 --- a/cmd/collector/app/options_test.go +++ b/cmd/collector/app/options_test.go @@ -46,6 +46,7 @@ func TestAllOptionSet(t *testing.T) { Options.PreSave(func(span *model.Span, tenant string) {}), Options.CollectorTags(map[string]string{"extra": "tags"}), Options.SpanSizeMetricsEnabled(true), + Options.OnDroppedSpan(func(span *model.Span) {}), ) assert.EqualValues(t, 5, opts.numWorkers) assert.EqualValues(t, 10, opts.queueSize) @@ -53,6 +54,7 @@ func TestAllOptionSet(t *testing.T) { assert.EqualValues(t, 1000, opts.dynQueueSizeWarmup) assert.EqualValues(t, 1024, opts.dynQueueSizeMemory) assert.True(t, opts.spanSizeMetricsEnabled) + assert.NotNil(t, opts.onDroppedSpan) } func TestNoOptionsSet(t *testing.T) { @@ -69,4 +71,5 @@ func TestNoOptionsSet(t *testing.T) { assert.EqualValues(t, &span, opts.sanitizer(&span)) assert.EqualValues(t, 0, opts.dynQueueSizeWarmup) assert.False(t, opts.spanSizeMetricsEnabled) + assert.Nil(t, opts.onDroppedSpan) } diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index 140acbd3325..04e8d43ab36 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -95,6 +95,9 @@ func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opt options.extraFormatTypes) droppedItemHandler := func(item interface{}) { handlerMetrics.SpansDropped.Inc(1) + if options.onDroppedSpan != nil { + options.onDroppedSpan(item.(*queueItem).span) + } } boundedQueue := queue.NewBoundedQueue(options.queueSize, droppedItemHandler) diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index 00c87f871ff..d66383919a0 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -332,7 +332,7 @@ func TestSpanProcessorBusy(t *testing.T) { }, }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) - assert.Error(t, err, "expcting busy error") + assert.Error(t, err, "expecting busy error") assert.Nil(t, res) } @@ -653,3 +653,31 @@ func TestSpanProcessorContextPropagation(t *testing.T) { // Verify no other tenantKey context values made it to writer assert.True(t, reflect.DeepEqual(w.tenants, map[string]bool{dummyTenant: true})) } + +func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) { + var droppedOperations []string + customOnDroppedSpan := func(span *model.Span) { + droppedOperations = append(droppedOperations, span.OperationName) + } + + w := &blockingWriter{} + p := NewSpanProcessor(w, + nil, + Options.NumWorkers(1), + Options.QueueSize(1), + Options.OnDroppedSpan(customOnDroppedSpan), + ).(*spanProcessor) + defer p.Close() + // block the writer so that the first span is read from the queue and blocks the processor, and followings are dropped. + w.Lock() + defer w.Unlock() + + _, err := p.ProcessSpans([]*model.Span{ + {OperationName: "op1"}, + {OperationName: "op2"}, + {OperationName: "op3"}, + }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + + assert.NoError(t, err) + assert.Equal(t, []string{"op2", "op3"}, droppedOperations) +}