Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose drop span hook as an option in Collector SpanProcessor #4387

Merged
merged 2 commits into from
Apr 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type options struct {
extraFormatTypes []processor.SpanFormat
collectorTags map[string]string
spanSizeMetricsEnabled bool
onDroppedSpan func(span *model.Span)
}

// Option is a function that sets some option on StorageBuilder.
Expand Down Expand Up @@ -164,6 +165,13 @@ func (options) SpanSizeMetricsEnabled(spanSizeMetrics bool) Option {
}
}

// OnDroppedSpan creates an Option that initializes the onDroppedSpan function
func (options) OnDroppedSpan(onDroppedSpan func(span *model.Span)) Option {
return func(b *options) {
b.onDroppedSpan = onDroppedSpan
}
}

func (o options) apply(opts ...Option) options {
ret := options{}
for _, opt := range opts {
Expand Down
3 changes: 3 additions & 0 deletions cmd/collector/app/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ func TestAllOptionSet(t *testing.T) {
Options.PreSave(func(span *model.Span, tenant string) {}),
Options.CollectorTags(map[string]string{"extra": "tags"}),
Options.SpanSizeMetricsEnabled(true),
Options.OnDroppedSpan(func(span *model.Span) {}),
)
assert.EqualValues(t, 5, opts.numWorkers)
assert.EqualValues(t, 10, opts.queueSize)
assert.EqualValues(t, map[string]string{"extra": "tags"}, opts.collectorTags)
assert.EqualValues(t, 1000, opts.dynQueueSizeWarmup)
assert.EqualValues(t, 1024, opts.dynQueueSizeMemory)
assert.True(t, opts.spanSizeMetricsEnabled)
assert.NotNil(t, opts.onDroppedSpan)
}

func TestNoOptionsSet(t *testing.T) {
Expand All @@ -69,4 +71,5 @@ func TestNoOptionsSet(t *testing.T) {
assert.EqualValues(t, &span, opts.sanitizer(&span))
assert.EqualValues(t, 0, opts.dynQueueSizeWarmup)
assert.False(t, opts.spanSizeMetricsEnabled)
assert.Nil(t, opts.onDroppedSpan)
}
3 changes: 3 additions & 0 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opt
options.extraFormatTypes)
droppedItemHandler := func(item interface{}) {
handlerMetrics.SpansDropped.Inc(1)
if options.onDroppedSpan != nil {
options.onDroppedSpan(item.(*queueItem).span)
}
}
boundedQueue := queue.NewBoundedQueue(options.queueSize, droppedItemHandler)

Expand Down
30 changes: 29 additions & 1 deletion cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func TestSpanProcessorBusy(t *testing.T) {
},
}, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})

assert.Error(t, err, "expcting busy error")
assert.Error(t, err, "expecting busy error")
assert.Nil(t, res)
}

Expand Down Expand Up @@ -653,3 +653,31 @@ func TestSpanProcessorContextPropagation(t *testing.T) {
// Verify no other tenantKey context values made it to writer
assert.True(t, reflect.DeepEqual(w.tenants, map[string]bool{dummyTenant: true}))
}

func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) {
var droppedOperations []string
customOnDroppedSpan := func(span *model.Span) {
droppedOperations = append(droppedOperations, span.OperationName)
}

w := &blockingWriter{}
p := NewSpanProcessor(w,
nil,
Options.NumWorkers(1),
Options.QueueSize(1),
Options.OnDroppedSpan(customOnDroppedSpan),
).(*spanProcessor)
defer p.Close()
// block the writer so that the first span is read from the queue and blocks the processor, and followings are dropped.
w.Lock()
defer w.Unlock()

_, err := p.ProcessSpans([]*model.Span{
{OperationName: "op1"},
{OperationName: "op2"},
{OperationName: "op3"},
}, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})

assert.NoError(t, err)
assert.Equal(t, []string{"op2", "op3"}, droppedOperations)
}