Skip to content

Commit

Permalink
Send correct batch stats when SendBatchMaxSize is set (#5385)
Browse files Browse the repository at this point in the history
The stat was getting sent before the max batch size was
taken into account.
  • Loading branch information
njvrzm authored Jun 2, 2022
1 parent 04e92f2 commit 65b7b1b
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
### 🧰 Bug fixes 🧰

- Fixes the "service.version" label value for internal metrics, always was "latest" in core/contrib distros. (#5449).
- Send correct batch stats when SendBatchMaxSize is set (#5385)

## v0.52.0 Beta

Expand Down
52 changes: 37 additions & 15 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type batchProcessor struct {

type batch interface {
// export the current batch
export(ctx context.Context, sendBatchMaxSize int) error
export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, sentBatchBytes int, err error)

// itemCount returns the size of the current batch
itemCount() int
Expand Down Expand Up @@ -175,15 +175,16 @@ func (bp *batchProcessor) resetTimer() {
}

func (bp *batchProcessor) sendItems(triggerMeasure *stats.Int64Measure) {
// Add that it came form the trace pipeline?
stats.Record(bp.exportCtx, triggerMeasure.M(1), statBatchSendSize.M(int64(bp.batch.itemCount())))

if bp.telemetryLevel == configtelemetry.LevelDetailed {
stats.Record(bp.exportCtx, statBatchSendSizeBytes.M(int64(bp.batch.size())))
}

if err := bp.batch.export(bp.exportCtx, bp.sendBatchMaxSize); err != nil {
detailed := bp.telemetryLevel == configtelemetry.LevelDetailed
sent, bytes, err := bp.batch.export(bp.exportCtx, bp.sendBatchMaxSize, detailed)
if err != nil {
bp.logger.Warn("Sender failed", zap.Error(err))
} else {
// Add that it came form the trace pipeline?
stats.Record(bp.exportCtx, triggerMeasure.M(1), statBatchSendSize.M(int64(sent)))
if detailed {
stats.Record(bp.exportCtx, statBatchSendSizeBytes.M(int64(bytes)))
}
}
}

Expand Down Expand Up @@ -244,17 +245,24 @@ func (bt *batchTraces) add(item interface{}) {
td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans())
}

func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int) error {
func (bt *batchTraces) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) {
var req ptrace.Traces
var sent int
var bytes int
if sendBatchMaxSize > 0 && bt.itemCount() > sendBatchMaxSize {
req = splitTraces(sendBatchMaxSize, bt.traceData)
bt.spanCount -= sendBatchMaxSize
sent = sendBatchMaxSize
} else {
req = bt.traceData
sent = bt.spanCount
bt.traceData = ptrace.NewTraces()
bt.spanCount = 0
}
return bt.nextConsumer.ConsumeTraces(ctx, req)
if returnBytes {
bytes = bt.sizer.TracesSize(req)
}
return sent, bytes, bt.nextConsumer.ConsumeTraces(ctx, req)
}

func (bt *batchTraces) itemCount() int {
Expand All @@ -276,17 +284,24 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics {
return &batchMetrics{nextConsumer: nextConsumer, metricData: pmetric.NewMetrics(), sizer: pmetric.NewProtoMarshaler().(pmetric.Sizer)}
}

func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int) error {
func (bm *batchMetrics) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) {
var req pmetric.Metrics
var sent int
var bytes int
if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize {
req = splitMetrics(sendBatchMaxSize, bm.metricData)
bm.dataPointCount -= sendBatchMaxSize
sent = sendBatchMaxSize
} else {
req = bm.metricData
sent = bm.dataPointCount
bm.metricData = pmetric.NewMetrics()
bm.dataPointCount = 0
}
return bm.nextConsumer.ConsumeMetrics(ctx, req)
if returnBytes {
bytes = bm.sizer.MetricsSize(req)
}
return sent, bytes, bm.nextConsumer.ConsumeMetrics(ctx, req)
}

func (bm *batchMetrics) itemCount() int {
Expand Down Expand Up @@ -319,17 +334,24 @@ func newBatchLogs(nextConsumer consumer.Logs) *batchLogs {
return &batchLogs{nextConsumer: nextConsumer, logData: plog.NewLogs(), sizer: plog.NewProtoMarshaler().(plog.Sizer)}
}

func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int) error {
func (bl *batchLogs) export(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, error) {
var req plog.Logs
var sent int
var bytes int
if sendBatchMaxSize > 0 && bl.logCount > sendBatchMaxSize {
req = splitLogs(sendBatchMaxSize, bl.logData)
bl.logCount -= sendBatchMaxSize
sent = sendBatchMaxSize
} else {
req = bl.logData
sent = bl.logCount
bl.logData = plog.NewLogs()
bl.logCount = 0
}
return bl.nextConsumer.ConsumeLogs(ctx, req)
if returnBytes {
bytes = bl.sizer.LogsSize(req)
}
return sent, bytes, bl.nextConsumer.ConsumeLogs(ctx, req)
}

func (bl *batchLogs) itemCount() int {
Expand Down
54 changes: 53 additions & 1 deletion processor/batchprocessor/batch_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package batchprocessor
import (
"context"
"fmt"
"math"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -183,6 +184,55 @@ func TestBatchProcessorSentBySize(t *testing.T) {
assert.Equal(t, sizeSum, int(distData.Sum()))
}

func TestBatchProcessorSentBySize_withMaxSize(t *testing.T) {
views := MetricViews()
require.NoError(t, view.Register(views...))
defer view.Unregister(views...)

sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
sendBatchSize := 20
sendBatchMaxSize := 37
cfg.SendBatchSize = uint32(sendBatchSize)
cfg.SendBatchMaxSize = uint32(sendBatchMaxSize)
cfg.Timeout = 500 * time.Millisecond
creationSet := componenttest.NewNopProcessorCreateSettings()
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, configtelemetry.LevelDetailed)
require.NoError(t, err)
require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost()))

requestCount := 1
spansPerRequest := 500
totalSpans := requestCount * spansPerRequest

start := time.Now()
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTracesManySpansSameResource(spansPerRequest)
assert.NoError(t, batcher.ConsumeTraces(context.Background(), td))
}

require.NoError(t, batcher.Shutdown(context.Background()))

elapsed := time.Since(start)
require.LessOrEqual(t, elapsed.Nanoseconds(), cfg.Timeout.Nanoseconds())

// The max batch size is not a divisor of the total number of spans
expectedBatchesNum := int(math.Ceil(float64(totalSpans) / float64(sendBatchMaxSize)))

require.Equal(t, totalSpans, sink.SpanCount())
receivedTraces := sink.AllTraces()
require.EqualValues(t, expectedBatchesNum, len(receivedTraces))

viewData, err := view.RetrieveData("processor/batch/" + statBatchSendSize.Name())
require.NoError(t, err)
assert.Equal(t, 1, len(viewData))
distData := viewData[0].Data.(*view.DistributionData)
assert.Equal(t, int64(expectedBatchesNum), distData.Count)
assert.Equal(t, sink.SpanCount(), int(distData.Sum()))
assert.Equal(t, totalSpans%sendBatchMaxSize, int(distData.Min))
assert.Equal(t, sendBatchMaxSize, int(distData.Max))
}

func TestBatchProcessorSentByTimeout(t *testing.T) {
sink := new(consumertest.TracesSink)
cfg := createDefaultConfig().(*Config)
Expand Down Expand Up @@ -387,7 +437,9 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {

batchMetrics.add(md)
require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount)
require.NoError(t, batchMetrics.export(ctx, sendBatchMaxSize))
sent, _, sendErr := batchMetrics.export(ctx, sendBatchMaxSize, false)
require.NoError(t, sendErr)
require.Equal(t, sendBatchMaxSize, sent)
remainingDataPointCount := metricsCount*dataPointsPerMetric - sendBatchMaxSize
require.Equal(t, remainingDataPointCount, batchMetrics.dataPointCount)
}
Expand Down

0 comments on commit 65b7b1b

Please sign in to comment.