Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
shivanthzen committed Oct 4, 2024
1 parent 080b765 commit 27b87bf
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 5 deletions.
16 changes: 16 additions & 0 deletions processor/processorhelper/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@

The following telemetry is emitted by this component.

### otelcol_processor_errors

Number of errors emitted from the processor [alpha]

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {errors} | Sum | Int | true |

### otelcol_processor_incoming_items

Number of items passed to the processor. [alpha]
Expand All @@ -21,3 +29,11 @@ Number of items emitted from the processor. [alpha]
| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {items} | Sum | Int | true |

### otelcol_processor_skips

Number of skips by processor [alpha]

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {errors} | Sum | Int | true |
14 changes: 14 additions & 0 deletions processor/processorhelper/internal/metadata/generated_telemetry.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion processor/processorhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,20 @@ func NewLogs(
span.AddEvent("Start processing.", eventOptions)
recordsIn := ld.LogRecordCount()

obs.recordIn(ctx, recordsIn)
var errFunc error
ld, errFunc = logsFunc(ctx, ld)
span.AddEvent("End processing.", eventOptions)
if errFunc != nil {
if errors.Is(errFunc, ErrSkipProcessingData) {
obs.processorSkipped(ctx)
return nil
}
obs.processorError(ctx)
return errFunc
}
recordsOut := ld.LogRecordCount()
obs.recordInOut(ctx, recordsIn, recordsOut)
obs.recordOut(ctx, recordsOut)
return nextConsumer.ConsumeLogs(ctx, ld)
}, bs.consumerOptions...)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions processor/processorhelper/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,22 @@ telemetry:
sum:
value_type: int
monotonic: true

processor_errors:
enabled: true
stability:
level: alpha
description: Number of errors emitted from the processor
unit: "{errors}"
sum:
value_type: int
monotonic: true
processor_skips:
enabled: true
stability:
level: alpha
description: Number of skips by processor
unit: "{errors}"
sum:
value_type: int
monotonic: true
5 changes: 4 additions & 1 deletion processor/processorhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,21 @@ func NewMetrics(
span := trace.SpanFromContext(ctx)
span.AddEvent("Start processing.", eventOptions)
pointsIn := md.DataPointCount()
obs.recordIn(ctx, pointsIn)

var errFunc error
md, errFunc = metricsFunc(ctx, md)
span.AddEvent("End processing.", eventOptions)
if errFunc != nil {
if errors.Is(errFunc, ErrSkipProcessingData) {
obs.processorSkipped(ctx)
return nil
}
obs.processorError(ctx)
return errFunc
}
pointsOut := md.DataPointCount()
obs.recordInOut(ctx, pointsIn, pointsOut)
obs.recordOut(ctx, pointsOut)
return nextConsumer.ConsumeMetrics(ctx, md)
}, bs.consumerOptions...)
if err != nil {
Expand Down
13 changes: 12 additions & 1 deletion processor/processorhelper/obsreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,18 @@ func newObsReport(set processor.Settings, signal pipeline.Signal) (*obsReport, e
}, nil
}

func (or *obsReport) recordInOut(ctx context.Context, incoming, outgoing int) {
func (or *obsReport) recordIn(ctx context.Context, incoming int) {
or.telemetryBuilder.ProcessorIncomingItems.Add(ctx, int64(incoming), metric.WithAttributes(or.otelAttrs...))
}

func (or *obsReport) recordOut(ctx context.Context, outgoing int) {
or.telemetryBuilder.ProcessorOutgoingItems.Add(ctx, int64(outgoing), metric.WithAttributes(or.otelAttrs...))
}

func (or *obsReport) processorError(ctx context.Context) {
or.telemetryBuilder.ProcessorErrors.Add(ctx, 1, metric.WithAttributes(or.otelAttrs...))
}

func (or *obsReport) processorSkipped(ctx context.Context) {
or.telemetryBuilder.ProcessorSkips.Add(ctx, 1, metric.WithAttributes(or.otelAttrs...))
}
6 changes: 4 additions & 2 deletions processor/processorhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,22 @@ func NewTraces(
span.AddEvent("Start processing.", eventOptions)
spansIn := td.SpanCount()

obs.recordIn(ctx, spansIn)
var errFunc error
td, errFunc = tracesFunc(ctx, td)
span.AddEvent("End processing.", eventOptions)
if errFunc != nil {
if errors.Is(errFunc, ErrSkipProcessingData) {
obs.processorSkipped(ctx)
return nil
}
obs.processorError(ctx)
return errFunc
}
spansOut := td.SpanCount()
obs.recordInOut(ctx, spansIn, spansOut)
obs.recordOut(ctx, spansOut)
return nextConsumer.ConsumeTraces(ctx, td)
}, bs.consumerOptions...)

if err != nil {
return nil, err
}
Expand Down

0 comments on commit 27b87bf

Please sign in to comment.