Skip to content

Commit

Permalink
Expose drop span hook as an option in Collector SpanProcessor (jaeger…
Browse files Browse the repository at this point in the history
…tracing#4387)

## Which problem is this PR solving?
Resolves jaegertracing#4375

## Short description of the changes
Expose drop span hook as an option when creating SpanProcessor in
collector so that users have the flexibility to get more insights on
dropped spans in addition to the dropped count.

Signed-off-by: Chen Xu <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
ChenX1993 and yurishkuro authored Apr 16, 2023
1 parent 6d06780 commit 8948ce4
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 1 deletion.
8 changes: 8 additions & 0 deletions cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type options struct {
extraFormatTypes []processor.SpanFormat
collectorTags map[string]string
spanSizeMetricsEnabled bool
onDroppedSpan func(span *model.Span)
}

// Option is a function that sets some option on StorageBuilder.
Expand Down Expand Up @@ -164,6 +165,13 @@ func (options) SpanSizeMetricsEnabled(spanSizeMetrics bool) Option {
}
}

// OnDroppedSpan creates an Option that initializes the onDroppedSpan function
func (options) OnDroppedSpan(onDroppedSpan func(span *model.Span)) Option {
return func(b *options) {
b.onDroppedSpan = onDroppedSpan
}
}

func (o options) apply(opts ...Option) options {
ret := options{}
for _, opt := range opts {
Expand Down
3 changes: 3 additions & 0 deletions cmd/collector/app/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ func TestAllOptionSet(t *testing.T) {
Options.PreSave(func(span *model.Span, tenant string) {}),
Options.CollectorTags(map[string]string{"extra": "tags"}),
Options.SpanSizeMetricsEnabled(true),
Options.OnDroppedSpan(func(span *model.Span) {}),
)
assert.EqualValues(t, 5, opts.numWorkers)
assert.EqualValues(t, 10, opts.queueSize)
assert.EqualValues(t, map[string]string{"extra": "tags"}, opts.collectorTags)
assert.EqualValues(t, 1000, opts.dynQueueSizeWarmup)
assert.EqualValues(t, 1024, opts.dynQueueSizeMemory)
assert.True(t, opts.spanSizeMetricsEnabled)
assert.NotNil(t, opts.onDroppedSpan)
}

func TestNoOptionsSet(t *testing.T) {
Expand All @@ -69,4 +71,5 @@ func TestNoOptionsSet(t *testing.T) {
assert.EqualValues(t, &span, opts.sanitizer(&span))
assert.EqualValues(t, 0, opts.dynQueueSizeWarmup)
assert.False(t, opts.spanSizeMetricsEnabled)
assert.Nil(t, opts.onDroppedSpan)
}
3 changes: 3 additions & 0 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opt
options.extraFormatTypes)
droppedItemHandler := func(item interface{}) {
handlerMetrics.SpansDropped.Inc(1)
if options.onDroppedSpan != nil {
options.onDroppedSpan(item.(*queueItem).span)
}
}
boundedQueue := queue.NewBoundedQueue(options.queueSize, droppedItemHandler)

Expand Down
30 changes: 29 additions & 1 deletion cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func TestSpanProcessorBusy(t *testing.T) {
},
}, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})

assert.Error(t, err, "expcting busy error")
assert.Error(t, err, "expecting busy error")
assert.Nil(t, res)
}

Expand Down Expand Up @@ -653,3 +653,31 @@ func TestSpanProcessorContextPropagation(t *testing.T) {
// Verify no other tenantKey context values made it to writer
assert.True(t, reflect.DeepEqual(w.tenants, map[string]bool{dummyTenant: true}))
}

func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) {
var droppedOperations []string
customOnDroppedSpan := func(span *model.Span) {
droppedOperations = append(droppedOperations, span.OperationName)
}

w := &blockingWriter{}
p := NewSpanProcessor(w,
nil,
Options.NumWorkers(1),
Options.QueueSize(1),
Options.OnDroppedSpan(customOnDroppedSpan),
).(*spanProcessor)
defer p.Close()
// block the writer so that the first span is read from the queue and blocks the processor, and followings are dropped.
w.Lock()
defer w.Unlock()

_, err := p.ProcessSpans([]*model.Span{
{OperationName: "op1"},
{OperationName: "op2"},
{OperationName: "op3"},
}, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})

assert.NoError(t, err)
assert.Equal(t, []string{"op2", "op3"}, droppedOperations)
}

0 comments on commit 8948ce4

Please sign in to comment.