Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Send correct batch stats when SendBatchMaxSize is set #5385

Merged
merged 1 commit into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
### 🧰 Bug fixes 🧰

- Fixes the "service.version" label value for internal metrics, always was "latest" in core/contrib distros. (#5449).
- Send correct batch stats when SendBatchMaxSize is set (#5385)

## v0.52.0 Beta

Expand Down
52 changes: 37 additions & 15 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type batchProcessor struct {

type batch interface {
// export the current batch
export(ctx context.Context, sendBatchMaxSize int) error
export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, sentBatchBytes int, err error)

// itemCount returns the size of the current batch
itemCount() int
Expand Down Expand Up @@ -175,15 +175,16 @@ func (bp *batchProcessor) resetTimer() {
}

func (bp *batchProcessor) sendItems(triggerMeasure *stats.Int64Measure) {
// Add that it came form the trace pipeline?
stats.Record(bp.exportCtx, triggerMeasure.M(1), statBatchSendSize.M(int64(bp.batch.itemCount())))

if bp.telemetryLevel == configtelemetry.LevelDetailed {
stats.Record(bp.exportCtx, statBatchSendSizeBytes.M(int64(bp.batch.size())))
}

if err := bp.batch.export(bp.exportCtx, bp.sendBatchMaxSize); err != nil {
detailed := bp.telemetryLevel == configtelemetry.LevelDetailed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: should we call out that we may want to remove this given it's barely used and almost always desired in a future PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the default for this setting is LevelBasic, so eliminating the check here would be a behavior change.

sent, bytes, err := bp.batch.export(bp.exportCtx, bp.sendBatchMaxSize, detailed)
if err != nil {
bp.logger.Warn("Sender failed", zap.Error(err))
} else {
// Add that it came form the trace pipeline?
stats.Record(bp.exportCtx, triggerMeasure.M(1), statBatchSendSize.M(int64(sent)))
if detailed {
stats.Record(bp.exportCtx, statBatchSendSizeBytes.M(int64(bytes)))
}
}
}

Expand Down Expand Up @@ -244,17 +245,24 @@ func (bt *batchTraces) add(item interface{}) {
td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans())
}

func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int) error {
func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: should we do the stats.Record call within export rather than returning it? That way we don't need to make any function signature changes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can see why that would be annoying because then instead of a single stats record call, you have to make one in each batch processor. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd rather change the signature, especially since it's only used in one place, than duplicate the stats recording code.

var req ptrace.Traces
var sent int
var bytes int
if sendBatchMaxSize > 0 && bt.itemCount() > sendBatchMaxSize {
req = splitTraces(sendBatchMaxSize, bt.traceData)
bt.spanCount -= sendBatchMaxSize
sent = sendBatchMaxSize
} else {
req = bt.traceData
sent = bt.spanCount
bt.traceData = ptrace.NewTraces()
bt.spanCount = 0
}
return bt.nextConsumer.ConsumeTraces(ctx, req)
if returnBytes {
bytes = bt.sizer.TracesSize(req)
}
return sent, bytes, bt.nextConsumer.ConsumeTraces(ctx, req)
}

func (bt *batchTraces) itemCount() int {
Expand All @@ -276,17 +284,24 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
return &batchMetrics{nextConsumer: nextConsumer, metricData: pmetric.NewMetrics(), sizer: pmetric.NewProtoMarshaler().(pmetric.Sizer)}
}

func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error {
func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) {
var req pmetric.Metrics
var sent int
var bytes int
if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize {
req = splitMetrics(sendBatchMaxSize, bm.metricData)
bm.dataPointCount -= sendBatchMaxSize
sent = sendBatchMaxSize
} else {
req = bm.metricData
sent = bm.dataPointCount
bm.metricData = pmetric.NewMetrics()
bm.dataPointCount = 0
}
return bm.nextConsumer.ConsumeMetrics(ctx, req)
if returnBytes {
bytes = bm.sizer.MetricsSize(req)
}
return sent, bytes, bm.nextConsumer.ConsumeMetrics(ctx, req)
}

func (bm *batchMetrics) itemCount() int {
Expand Down Expand Up @@ -319,17 +334,24 @@ func newBatchLogs(nextConsumer consumer.Logs) *batchLogs {
return &batchLogs{nextConsumer: nextConsumer, logData: plog.NewLogs(), sizer: plog.NewProtoMarshaler().(plog.Sizer)}
}

func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int) error {
func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) {
var req plog.Logs
var sent int
var bytes int
if sendBatchMaxSize > 0 && bl.logCount > sendBatchMaxSize {
req = splitLogs(sendBatchMaxSize, bl.logData)
bl.logCount -= sendBatchMaxSize
sent = sendBatchMaxSize
} else {
req = bl.logData
sent = bl.logCount
bl.logData = plog.NewLogs()
bl.logCount = 0
}
return bl.nextConsumer.ConsumeLogs(ctx, req)
if returnBytes {
bytes = bl.sizer.LogsSize(req)
}
return sent, bytes, bl.nextConsumer.ConsumeLogs(ctx, req)
}

func (bl *batchLogs) itemCount() int {
Expand Down
54 changes: 53 additions & 1 deletion processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package batchprocessor
import (
"context"
"fmt"
"math"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -183,6 +184,55 @@ func TestBatchProcessorSentBySize(t *testing.T) {
assert.Equal(t, sizeSum, int(distData.Sum()))
}

func TestBatchProcessorSentBySize_withMaxSize(t *testing.T) {
views := MetricViews()
require.NoError(t, view.Register(views...))
defer view.Unregister(views...)

sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
sendBatchSize := 20
sendBatchMaxSize := 37
cfg.SendBatchSize = uint32(sendBatchSize)
cfg.SendBatchMaxSize = uint32(sendBatchMaxSize)
cfg.Timeout = 500 * time.Millisecond
creationSet := componenttest.NewNopProcessorCreateSettings()
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelDetailed)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 1
spansPerRequest := 500
totalSpans := requestCount * spansPerRequest

start := time.Now()
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTracesManySpansSameResource(spansPerRequest)
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
}

require.NoError(t, batcher.Shutdown(context.Background()))

elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())

// The max batch size is not a divisor of the total number of spans
expectedBatchesNum := int(math.Ceil(float64(totalSpans) / float64(sendBatchMaxSize)))

require.Equal(t, totalSpans, sink.SpanCount())
receivedTraces := sink.AllTraces()
require.EqualValues(t, expectedBatchesNum, len(receivedTraces))

viewData, err := view.RetrieveData("processor/batch/" + statBatchSendSize.Name())
require.NoError(t, err)
assert.Equal(t, 1, len(viewData))
distData := viewData[0].Data.(*view.DistributionData)
assert.Equal(t, int64(expectedBatchesNum), distData.Count)
assert.Equal(t, sink.SpanCount(), int(distData.Sum()))
assert.Equal(t, totalSpans%sendBatchMaxSize, int(distData.Min))
assert.Equal(t, sendBatchMaxSize, int(distData.Max))
}

func TestBatchProcessorSentByTimeout(t *testing.T) {
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
Expand Down Expand Up @@ -387,7 +437,9 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {

batchMetrics.add(md)
require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount)
require.NoError(t, batchMetrics.export(ctx, sendBatchMaxSize))
sent, _, sendErr := batchMetrics.export(ctx, sendBatchMaxSize, false)
require.NoError(t, sendErr)
require.Equal(t, sendBatchMaxSize, sent)
remainingDataPointCount := metricsCount*dataPointsPerMetric - sendBatchMaxSize
require.Equal(t, remainingDataPointCount, batchMetrics.dataPointCount)
}
Expand Down