diff --git a/cmd/collector/app/options.go b/cmd/collector/app/options.go index 4f2f6134d09..c25140a4090 100644 --- a/cmd/collector/app/options.go +++ b/cmd/collector/app/options.go @@ -42,6 +42,7 @@ type options struct { extraFormatTypes []processor.SpanFormat collectorTags map[string]string spanSizeMetricsEnabled bool + onDroppedSpan func(span *model.Span) } // Option is a function that sets some option on StorageBuilder. @@ -164,6 +165,13 @@ func (options) SpanSizeMetricsEnabled(spanSizeMetrics bool) Option { } } +// OnDroppedSpan creates an Option that initializes the onDroppedSpan function +func (options) OnDroppedSpan(onDroppedSpan func(span *model.Span)) Option { + return func(b *options) { + b.onDroppedSpan = onDroppedSpan + } +} + func (o options) apply(opts ...Option) options { ret := options{} for _, opt := range opts { diff --git a/cmd/collector/app/options_test.go b/cmd/collector/app/options_test.go index ecbf4d10bef..e416166829f 100644 --- a/cmd/collector/app/options_test.go +++ b/cmd/collector/app/options_test.go @@ -46,6 +46,7 @@ func TestAllOptionSet(t *testing.T) { Options.PreSave(func(span *model.Span, tenant string) {}), Options.CollectorTags(map[string]string{"extra": "tags"}), Options.SpanSizeMetricsEnabled(true), + Options.OnDroppedSpan(func(span *model.Span) {}), ) assert.EqualValues(t, 5, opts.numWorkers) assert.EqualValues(t, 10, opts.queueSize) @@ -53,6 +54,7 @@ func TestAllOptionSet(t *testing.T) { assert.EqualValues(t, 1000, opts.dynQueueSizeWarmup) assert.EqualValues(t, 1024, opts.dynQueueSizeMemory) assert.True(t, opts.spanSizeMetricsEnabled) + assert.NotNil(t, opts.onDroppedSpan) } func TestNoOptionsSet(t *testing.T) { @@ -69,4 +71,5 @@ func TestNoOptionsSet(t *testing.T) { assert.EqualValues(t, &span, opts.sanitizer(&span)) assert.EqualValues(t, 0, opts.dynQueueSizeWarmup) assert.False(t, opts.spanSizeMetricsEnabled) + assert.Nil(t, opts.onDroppedSpan) } diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index 140acbd3325..04e8d43ab36 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -95,6 +95,9 @@ func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opt options.extraFormatTypes) droppedItemHandler := func(item interface{}) { handlerMetrics.SpansDropped.Inc(1) + if options.onDroppedSpan != nil { + options.onDroppedSpan(item.(*queueItem).span) + } } boundedQueue := queue.NewBoundedQueue(options.queueSize, droppedItemHandler) diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index 00c87f871ff..d66383919a0 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -332,7 +332,7 @@ func TestSpanProcessorBusy(t *testing.T) { }, }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) - assert.Error(t, err, "expcting busy error") + assert.Error(t, err, "expecting busy error") assert.Nil(t, res) } @@ -653,3 +653,31 @@ func TestSpanProcessorContextPropagation(t *testing.T) { // Verify no other tenantKey context values made it to writer assert.True(t, reflect.DeepEqual(w.tenants, map[string]bool{dummyTenant: true})) } + +func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) { + var droppedOperations []string + customOnDroppedSpan := func(span *model.Span) { + droppedOperations = append(droppedOperations, span.OperationName) + } + + w := &blockingWriter{} + p := NewSpanProcessor(w, + nil, + Options.NumWorkers(1), + Options.QueueSize(1), + Options.OnDroppedSpan(customOnDroppedSpan), + ).(*spanProcessor) + defer p.Close() + // block the writer so that the first span is read from the queue and blocks the processor, and followings are dropped. + w.Lock() + defer w.Unlock() + + _, err := p.ProcessSpans([]*model.Span{ + {OperationName: "op1"}, + {OperationName: "op2"}, + {OperationName: "op3"}, + }, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat}) + + assert.NoError(t, err) + assert.Equal(t, []string{"op2", "op3"}, droppedOperations) +}