diff --git a/processor/processorhelper/documentation.md b/processor/processorhelper/documentation.md index 1c1b10f9f46..ba97bbb9e6d 100644 --- a/processor/processorhelper/documentation.md +++ b/processor/processorhelper/documentation.md @@ -6,6 +6,14 @@ The following telemetry is emitted by this component. +### otelcol_processor_errors + +Number of errors emitted from the processor [alpha] + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {errors} | Sum | Int | true | + ### otelcol_processor_incoming_items Number of items passed to the processor. [alpha] @@ -21,3 +29,11 @@ Number of items emitted from the processor. [alpha] | Unit | Metric Type | Value Type | Monotonic | | ---- | ----------- | ---------- | --------- | | {items} | Sum | Int | true | + +### otelcol_processor_skips + +Number of skips by processor [alpha] + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {errors} | Sum | Int | true | diff --git a/processor/processorhelper/internal/metadata/generated_telemetry.go b/processor/processorhelper/internal/metadata/generated_telemetry.go index cb5d9fb7ae6..a2eae06780f 100644 --- a/processor/processorhelper/internal/metadata/generated_telemetry.go +++ b/processor/processorhelper/internal/metadata/generated_telemetry.go @@ -29,8 +29,10 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer { // as defined in metadata and user config. type TelemetryBuilder struct { meter metric.Meter + ProcessorErrors metric.Int64Counter ProcessorIncomingItems metric.Int64Counter ProcessorOutgoingItems metric.Int64Counter + ProcessorSkips metric.Int64Counter meters map[configtelemetry.Level]metric.Meter } @@ -54,6 +56,12 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme } builder.meters[configtelemetry.LevelBasic] = LeveledMeter(settings, configtelemetry.LevelBasic) var err, errs error + builder.ProcessorErrors, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_processor_errors", + metric.WithDescription("Number of errors emitted from the processor [alpha]"), + metric.WithUnit("{errors}"), + ) + errs = errors.Join(errs, err) builder.ProcessorIncomingItems, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( "otelcol_processor_incoming_items", metric.WithDescription("Number of items passed to the processor. [alpha]"), @@ -66,5 +74,11 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme metric.WithUnit("{items}"), ) errs = errors.Join(errs, err) + builder.ProcessorSkips, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_processor_skips", + metric.WithDescription("Number of skips by processor [alpha]"), + metric.WithUnit("{errors}"), + ) + errs = errors.Join(errs, err) return &builder, errs } diff --git a/processor/processorhelper/logs.go b/processor/processorhelper/logs.go index a434fe8fb93..36ce22f4f69 100644 --- a/processor/processorhelper/logs.go +++ b/processor/processorhelper/logs.go @@ -51,17 +51,20 @@ func NewLogs( span.AddEvent("Start processing.", eventOptions) recordsIn := ld.LogRecordCount() + obs.recordIn(ctx, recordsIn) var errFunc error ld, errFunc = logsFunc(ctx, ld) span.AddEvent("End processing.", eventOptions) if errFunc != nil { if errors.Is(errFunc, ErrSkipProcessingData) { + obs.processorSkipped(ctx) return nil } + obs.processorError(ctx) return errFunc } recordsOut := ld.LogRecordCount() - obs.recordInOut(ctx, recordsIn, recordsOut) + obs.recordOut(ctx, recordsOut) return nextConsumer.ConsumeLogs(ctx, ld) }, bs.consumerOptions...) if err != nil { diff --git a/processor/processorhelper/metadata.yaml b/processor/processorhelper/metadata.yaml index e93c1ac5ee5..81e1eaf8ce3 100644 --- a/processor/processorhelper/metadata.yaml +++ b/processor/processorhelper/metadata.yaml @@ -28,3 +28,22 @@ telemetry: sum: value_type: int monotonic: true + + processor_errors: + enabled: true + stability: + level: alpha + description: Number of errors emitted from the processor + unit: "{errors}" + sum: + value_type: int + monotonic: true + processor_skips: + enabled: true + stability: + level: alpha + description: Number of skips by processor + unit: "{errors}" + sum: + value_type: int + monotonic: true diff --git a/processor/processorhelper/metrics.go b/processor/processorhelper/metrics.go index c8381fd9589..dace3b41a4b 100644 --- a/processor/processorhelper/metrics.go +++ b/processor/processorhelper/metrics.go @@ -50,18 +50,21 @@ func NewMetrics( span := trace.SpanFromContext(ctx) span.AddEvent("Start processing.", eventOptions) pointsIn := md.DataPointCount() + obs.recordIn(ctx, pointsIn) var errFunc error md, errFunc = metricsFunc(ctx, md) span.AddEvent("End processing.", eventOptions) if errFunc != nil { if errors.Is(errFunc, ErrSkipProcessingData) { + obs.processorSkipped(ctx) return nil } + obs.processorError(ctx) return errFunc } pointsOut := md.DataPointCount() - obs.recordInOut(ctx, pointsIn, pointsOut) + obs.recordOut(ctx, pointsOut) return nextConsumer.ConsumeMetrics(ctx, md) }, bs.consumerOptions...) if err != nil { diff --git a/processor/processorhelper/obsreport.go b/processor/processorhelper/obsreport.go index fd95f51fb9d..0fcd949a50f 100644 --- a/processor/processorhelper/obsreport.go +++ b/processor/processorhelper/obsreport.go @@ -62,7 +62,18 @@ func newObsReport(set processor.Settings, signal pipeline.Signal) (*obsReport, e }, nil } -func (or *obsReport) recordInOut(ctx context.Context, incoming, outgoing int) { +func (or *obsReport) recordIn(ctx context.Context, incoming int) { or.telemetryBuilder.ProcessorIncomingItems.Add(ctx, int64(incoming), metric.WithAttributes(or.otelAttrs...)) +} + +func (or *obsReport) recordOut(ctx context.Context, outgoing int) { or.telemetryBuilder.ProcessorOutgoingItems.Add(ctx, int64(outgoing), metric.WithAttributes(or.otelAttrs...)) } + +func (or *obsReport) processorError(ctx context.Context) { + or.telemetryBuilder.ProcessorErrors.Add(ctx, 1, metric.WithAttributes(or.otelAttrs...)) +} + +func (or *obsReport) processorSkipped(ctx context.Context) { + or.telemetryBuilder.ProcessorSkips.Add(ctx, 1, metric.WithAttributes(or.otelAttrs...)) +} diff --git a/processor/processorhelper/traces.go b/processor/processorhelper/traces.go index 492634541cc..0e10231c2d5 100644 --- a/processor/processorhelper/traces.go +++ b/processor/processorhelper/traces.go @@ -51,20 +51,22 @@ func NewTraces( span.AddEvent("Start processing.", eventOptions) spansIn := td.SpanCount() + obs.recordIn(ctx, spansIn) var errFunc error td, errFunc = tracesFunc(ctx, td) span.AddEvent("End processing.", eventOptions) if errFunc != nil { if errors.Is(errFunc, ErrSkipProcessingData) { + obs.processorSkipped(ctx) return nil } + obs.processorError(ctx) return errFunc } spansOut := td.SpanCount() - obs.recordInOut(ctx, spansIn, spansOut) + obs.recordOut(ctx, spansOut) return nextConsumer.ConsumeTraces(ctx, td) }, bs.consumerOptions...) - if err != nil { return nil, err }