From 97f39ade64d742a47ac9dc2eacae19870c318ea2 Mon Sep 17 00:00:00 2001 From: Ridwan Sharif <18472685+ridwanmsharif@users.noreply.github.com> Date: Tue, 11 Feb 2025 11:19:22 -0500 Subject: [PATCH] receiver/prometheusreceiver: allow cumulative resets when using the adjuster (#37718) Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37717 Prior to this change, when the start time metric adjuster was used all points used the same start timestamp. Even after a reset, which makes no sense for a counter which is not supposed to go down. Instead this change makes it so that when a reset is detected, the the reset points timestamp is used as the next start time. Signed-off-by: Ridwan Sharif --- .chloggen/metricadjuster-reset.yaml | 27 +++ .../prometheusreceiver/internal/appendable.go | 2 +- .../internal/metrics_adjuster.go | 21 ++ .../internal/starttimemetricadjuster.go | 13 +- .../internal/starttimemetricadjuster_test.go | 225 +++++++++++++++++- 5 files changed, 283 insertions(+), 5 deletions(-) create mode 100644 .chloggen/metricadjuster-reset.yaml diff --git a/.chloggen/metricadjuster-reset.yaml b/.chloggen/metricadjuster-reset.yaml new file mode 100644 index 000000000000..5e7ef48ef293 --- /dev/null +++ b/.chloggen/metricadjuster-reset.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "bug_fix" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Start time metric adjuster now handles reset points correctly + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37717] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/prometheusreceiver/internal/appendable.go b/receiver/prometheusreceiver/internal/appendable.go index 086d2d639a29..5aa061f07c32 100644 --- a/receiver/prometheusreceiver/internal/appendable.go +++ b/receiver/prometheusreceiver/internal/appendable.go @@ -45,7 +45,7 @@ func NewAppendable( if !useStartTimeMetric { metricAdjuster = NewInitialPointAdjuster(set.Logger, gcInterval, useCreatedMetric) } else { - metricAdjuster = NewStartTimeMetricAdjuster(set.Logger, startTimeMetricRegex) + metricAdjuster = NewStartTimeMetricAdjuster(set.Logger, startTimeMetricRegex, gcInterval) } obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverID: set.ID, Transport: transport, ReceiverCreateSettings: set}) diff --git a/receiver/prometheusreceiver/internal/metrics_adjuster.go b/receiver/prometheusreceiver/internal/metrics_adjuster.go index d6e9a36e083b..70dd6b6a411f 100644 --- a/receiver/prometheusreceiver/internal/metrics_adjuster.go +++ b/receiver/prometheusreceiver/internal/metrics_adjuster.go @@ -241,6 +241,11 @@ type initialPointAdjuster struct { jobsMap *JobsMap logger *zap.Logger useCreatedMetric bool + // usePointTimeForReset forces the adjuster to use the timestamp of the + // point instead of the start timestamp when it detects resets. This is + // useful when this adjuster is used after another adjuster that + // pre-populated start times. + usePointTimeForReset bool } // NewInitialPointAdjuster returns a new MetricsAdjuster that adjust metrics' start times based on the initial received points. @@ -347,6 +352,10 @@ func (a *initialPointAdjuster) adjustMetricHistogram(tsm *timeseriesMap, current if currentDist.Count() < tsi.histogram.previousCount || currentDist.Sum() < tsi.histogram.previousSum { // reset re-initialize everything. tsi.histogram.startTime = currentDist.StartTimestamp() + if a.usePointTimeForReset { + tsi.histogram.startTime = currentDist.Timestamp() + currentDist.SetStartTimestamp(tsi.histogram.startTime) + } tsi.histogram.previousCount = currentDist.Count() tsi.histogram.previousSum = currentDist.Sum() continue @@ -395,6 +404,10 @@ func (a *initialPointAdjuster) adjustMetricExponentialHistogram(tsm *timeseriesM if currentDist.Count() < tsi.histogram.previousCount || currentDist.Sum() < tsi.histogram.previousSum { // reset re-initialize everything. tsi.histogram.startTime = currentDist.StartTimestamp() + if a.usePointTimeForReset { + tsi.histogram.startTime = currentDist.Timestamp() + currentDist.SetStartTimestamp(tsi.histogram.startTime) + } tsi.histogram.previousCount = currentDist.Count() tsi.histogram.previousSum = currentDist.Sum() continue @@ -436,6 +449,10 @@ func (a *initialPointAdjuster) adjustMetricSum(tsm *timeseriesMap, current pmetr if currentSum.DoubleValue() < tsi.number.previousValue { // reset re-initialize everything. tsi.number.startTime = currentSum.StartTimestamp() + if a.usePointTimeForReset { + tsi.number.startTime = currentSum.Timestamp() + currentSum.SetStartTimestamp(tsi.number.startTime) + } tsi.number.previousValue = currentSum.DoubleValue() continue } @@ -482,6 +499,10 @@ func (a *initialPointAdjuster) adjustMetricSummary(tsm *timeseriesMap, current p currentSummary.Sum() < tsi.summary.previousSum) { // reset re-initialize everything. tsi.summary.startTime = currentSummary.StartTimestamp() + if a.usePointTimeForReset { + tsi.summary.startTime = currentSummary.Timestamp() + currentSummary.SetStartTimestamp(tsi.summary.startTime) + } tsi.summary.previousCount = currentSummary.Count() tsi.summary.previousSum = currentSummary.Sum() continue diff --git a/receiver/prometheusreceiver/internal/starttimemetricadjuster.go b/receiver/prometheusreceiver/internal/starttimemetricadjuster.go index e740c891a495..dde13880950d 100644 --- a/receiver/prometheusreceiver/internal/starttimemetricadjuster.go +++ b/receiver/prometheusreceiver/internal/starttimemetricadjuster.go @@ -40,13 +40,21 @@ func init() { type startTimeMetricAdjuster struct { startTimeMetricRegex *regexp.Regexp + resetPointAdjuster *initialPointAdjuster logger *zap.Logger } // NewStartTimeMetricAdjuster returns a new MetricsAdjuster that adjust metrics' start times based on a start time metric. -func NewStartTimeMetricAdjuster(logger *zap.Logger, startTimeMetricRegex *regexp.Regexp) MetricsAdjuster { +func NewStartTimeMetricAdjuster(logger *zap.Logger, startTimeMetricRegex *regexp.Regexp, gcInterval time.Duration) MetricsAdjuster { + resetPointAdjuster := &initialPointAdjuster{ + jobsMap: NewJobsMap(gcInterval), + logger: logger, + useCreatedMetric: false, + usePointTimeForReset: true, + } return &startTimeMetricAdjuster{ startTimeMetricRegex: startTimeMetricRegex, + resetPointAdjuster: resetPointAdjuster, logger: logger, } } @@ -110,7 +118,8 @@ func (stma *startTimeMetricAdjuster) AdjustMetrics(metrics pmetric.Metrics) erro } } - return nil + // Handle resets. + return stma.resetPointAdjuster.AdjustMetrics(metrics) } func (stma *startTimeMetricAdjuster) getStartTime(metrics pmetric.Metrics) (float64, error) { diff --git a/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go b/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go index 4bfc6abc2237..07b3c8a56524 100644 --- a/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go +++ b/receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" + semconv "go.opentelemetry.io/collector/semconv/v1.27.0" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" @@ -116,11 +117,17 @@ func TestStartTimeMetricMatch(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex) + gcInterval := 10 * time.Millisecond + stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex, gcInterval) if tt.expectedErr != nil { assert.ErrorIs(t, stma.AdjustMetrics(tt.inputs), tt.expectedErr) return } + + // We need to make sure the job and instance labels are set before the adjuster is used. + pmetrics := tt.inputs + pmetrics.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceInstanceID, "0") + pmetrics.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceName, "job") assert.NoError(t, stma.AdjustMetrics(tt.inputs)) for i := 0; i < tt.inputs.ResourceMetrics().Len(); i++ { rm := tt.inputs.ResourceMetrics().At(i) @@ -210,7 +217,8 @@ func TestStartTimeMetricFallback(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { testutil.SetFeatureGateForTest(t, useCollectorStartTimeFallbackGate, true) - stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex) + gcInterval := 10 * time.Millisecond + stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex, gcInterval) if tt.expectedErr != nil { assert.ErrorIs(t, stma.AdjustMetrics(tt.inputs), tt.expectedErr) return @@ -220,6 +228,10 @@ func TestStartTimeMetricFallback(t *testing.T) { // directly. approximateCollectorStartTime = mockStartTime + // We need to make sure the job and instance labels are set before the adjuster is used. + pmetrics := tt.inputs + pmetrics.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceInstanceID, "0") + pmetrics.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceName, "job") assert.NoError(t, stma.AdjustMetrics(tt.inputs)) for i := 0; i < tt.inputs.ResourceMetrics().Len(); i++ { rm := tt.inputs.ResourceMetrics().At(i) @@ -250,3 +262,212 @@ func TestStartTimeMetricFallback(t *testing.T) { }) } } + +func TestFallbackAndReset(t *testing.T) { + mockStartTime := time.Now().Add(-10 * time.Hour).Truncate(time.Second) + mockTimestamp := pcommon.NewTimestampFromTime(mockStartTime) + t1 := pcommon.Timestamp(126 * 1e9) + t2 := pcommon.NewTimestampFromTime(t1.AsTime().Add(1 * time.Hour)) + t3 := pcommon.NewTimestampFromTime(t2.AsTime().Add(1 * time.Hour)) + t4 := pcommon.NewTimestampFromTime(t3.AsTime().Add(1 * time.Hour)) + t5 := pcommon.NewTimestampFromTime(t4.AsTime().Add(1 * time.Hour)) + tests := []struct { + name string + useFallback bool + scenario []*metricsAdjusterTest + }{ + { + name: "sum no fallback and reset", + useFallback: false, + scenario: []*metricsAdjusterTest{ + { + description: "Sum: round 1 - initial instance, start time is established", + metrics: metrics( + sumMetric("test_sum", doublePoint(nil, t1, t1, 44)), + gaugeMetric("process_start_time_seconds", doublePoint(nil, t1, t1, float64(mockTimestamp.AsTime().Unix()))), + ), + adjusted: metrics( + sumMetric("test_sum", doublePoint(nil, mockTimestamp, t1, 44)), + gaugeMetric("process_start_time_seconds", doublePoint(nil, t1, t1, float64(mockTimestamp.AsTime().Unix()))), + ), + }, + { + description: "Sum: round 2 - instance reset (value less than previous value), start time is reset", + metrics: metrics( + sumMetric("test_sum", doublePoint(nil, t2, t2, 33)), + gaugeMetric("process_start_time_seconds", doublePoint(nil, t2, t2, float64(mockTimestamp.AsTime().Unix()))), + ), + adjusted: metrics( + sumMetric("test_sum", doublePoint(nil, t2, t2, 33)), + gaugeMetric("process_start_time_seconds", doublePoint(nil, t2, t2, float64(mockTimestamp.AsTime().Unix()))), + ), + }, + { + description: "Sum: round 3 - instance adjusted based on round 2", + metrics: metrics( + sumMetric("test_sum", doublePoint(nil, t3, t3, 55)), + gaugeMetric("process_start_time_seconds", doublePoint(nil, t2, t2, float64(mockTimestamp.AsTime().Unix()))), + ), + adjusted: metrics( + sumMetric("test_sum", doublePoint(nil, t2, t3, 55)), + gaugeMetric("process_start_time_seconds", doublePoint(nil, t2, t2, float64(mockTimestamp.AsTime().Unix()))), + ), + }, + }, + }, + { + name: "sum fallback and reset", + useFallback: true, + scenario: []*metricsAdjusterTest{ + { + description: "Sum: round 1 - initial instance, start time is established", + metrics: metrics(sumMetric("test_sum", doublePoint(nil, t1, t1, 44))), + adjusted: metrics(sumMetric("test_sum", doublePoint(nil, mockTimestamp, t1, 44))), + }, + { + description: "Sum: round 2 - instance adjusted based on round 1", + metrics: metrics(sumMetric("test_sum", doublePoint(nil, t2, t2, 66))), + adjusted: metrics(sumMetric("test_sum", doublePoint(nil, mockTimestamp, t2, 66))), + }, + { + description: "Sum: round 3 - instance reset (value less than previous value), start time is reset", + metrics: metrics(sumMetric("test_sum", doublePoint(nil, t3, t3, 55))), + adjusted: metrics(sumMetric("test_sum", doublePoint(nil, t3, t3, 55))), + }, + { + description: "Sum: round 4 - instance adjusted based on round 3", + metrics: metrics(sumMetric("test_sum", doublePoint(nil, t4, t4, 72))), + adjusted: metrics(sumMetric("test_sum", doublePoint(nil, t3, t4, 72))), + }, + { + description: "Sum: round 5 - instance adjusted based on round 4", + metrics: metrics(sumMetric("test_sum", doublePoint(nil, t5, t5, 72))), + adjusted: metrics(sumMetric("test_sum", doublePoint(nil, t3, t5, 72))), + }, + }, + }, + { + name: "gauge fallback and reset", + useFallback: true, + scenario: []*metricsAdjusterTest{ + { + description: "Gauge: round 1 - gauge not adjusted", + metrics: metrics(gaugeMetric("test_gauge", doublePoint(nil, t1, t1, 44))), + adjusted: metrics(gaugeMetric("test_gauge", doublePoint(nil, t1, t1, 44))), + }, + { + description: "Gauge: round 2 - gauge not adjusted", + metrics: metrics(gaugeMetric("test_gauge", doublePoint(nil, t2, t2, 66))), + adjusted: metrics(gaugeMetric("test_gauge", doublePoint(nil, t2, t2, 66))), + }, + { + description: "Gauge: round 3 - value less than previous value - gauge is not adjusted", + metrics: metrics(gaugeMetric("test_gauge", doublePoint(nil, t3, t3, 55))), + adjusted: metrics(gaugeMetric("test_gauge", doublePoint(nil, t3, t3, 55))), + }, + }, + }, + { + name: "histogram fallback and reset", + useFallback: true, + scenario: []*metricsAdjusterTest{ + { + description: "Histogram: round 1 - initial instance, start time is established", + metrics: metrics(histogramMetric("test_histogram", histogramPoint(nil, t1, t1, bounds0, []uint64{4, 2, 3, 7}))), + adjusted: metrics(histogramMetric("test_histogram", histogramPoint(nil, mockTimestamp, t1, bounds0, []uint64{4, 2, 3, 7}))), + }, { + description: "Histogram: round 2 - instance adjusted based on round 1", + metrics: metrics(histogramMetric("test_histogram", histogramPoint(nil, t2, t2, bounds0, []uint64{6, 3, 4, 8}))), + adjusted: metrics(histogramMetric("test_histogram", histogramPoint(nil, mockTimestamp, t2, bounds0, []uint64{6, 3, 4, 8}))), + }, { + description: "Histogram: round 3 - instance reset (value less than previous value), start time is reset", + metrics: metrics(histogramMetric("test_histogram", histogramPoint(nil, t3, t3, bounds0, []uint64{5, 3, 2, 7}))), + adjusted: metrics(histogramMetric("test_histogram", histogramPoint(nil, t3, t3, bounds0, []uint64{5, 3, 2, 7}))), + }, { + description: "Histogram: round 4 - instance adjusted based on round 3", + metrics: metrics(histogramMetric("test_histogram", histogramPoint(nil, t4, t4, bounds0, []uint64{7, 4, 2, 12}))), + adjusted: metrics(histogramMetric("test_histogram", histogramPoint(nil, t3, t4, bounds0, []uint64{7, 4, 2, 12}))), + }, + }, + }, + { + name: "exponential histogram fallback and reset", + useFallback: true, + scenario: []*metricsAdjusterTest{ + { + description: "Exponential Histogram: round 1 - initial instance, start time is established", + metrics: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, t1, t1, 3, 1, 0, []uint64{}, -2, []uint64{4, 2, 3, 7}))), + adjusted: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, mockTimestamp, t1, 3, 1, 0, []uint64{}, -2, []uint64{4, 2, 3, 7}))), + }, + { + description: "Exponential Histogram: round 2 - instance adjusted based on round 1", + metrics: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, t2, t2, 3, 1, 0, []uint64{}, -2, []uint64{6, 2, 3, 7}))), + adjusted: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, mockTimestamp, t2, 3, 1, 0, []uint64{}, -2, []uint64{6, 2, 3, 7}))), + }, + { + description: "Exponential Histogram: round 3 - instance reset (value less than previous value), start time is reset", + metrics: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, t3, t3, 3, 1, 0, []uint64{}, -2, []uint64{5, 3, 2, 7}))), + adjusted: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, t3, t3, 3, 1, 0, []uint64{}, -2, []uint64{5, 3, 2, 7}))), + }, + { + description: "Exponential Histogram: round 4 - instance adjusted based on round 3", + metrics: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, t4, t4, 3, 1, 0, []uint64{}, -2, []uint64{7, 4, 2, 12}))), + adjusted: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, t3, t4, 3, 1, 0, []uint64{}, -2, []uint64{7, 4, 2, 12}))), + }, + }, + }, + { + name: "summary fallback and reset", + useFallback: true, + scenario: []*metricsAdjusterTest{ + { + description: "Summary: round 1 - initial instance, start time is established", + metrics: metrics( + summaryMetric("test_summary", summaryPoint(nil, t1, t1, 10, 40, percent0, []float64{1, 5, 8})), + ), + adjusted: metrics( + summaryMetric("test_summary", summaryPoint(nil, mockTimestamp, t1, 10, 40, percent0, []float64{1, 5, 8})), + ), + }, + { + description: "Summary: round 2 - instance adjusted based on round 1", + metrics: metrics( + summaryMetric("test_summary", summaryPoint(nil, t2, t2, 15, 70, percent0, []float64{7, 44, 9})), + ), + adjusted: metrics( + summaryMetric("test_summary", summaryPoint(nil, mockTimestamp, t2, 15, 70, percent0, []float64{7, 44, 9})), + ), + }, + { + description: "Summary: round 3 - instance reset (count less than previous), start time is reset", + metrics: metrics( + summaryMetric("test_summary", summaryPoint(nil, t3, t3, 12, 66, percent0, []float64{3, 22, 5})), + ), + adjusted: metrics( + summaryMetric("test_summary", summaryPoint(nil, t3, t3, 12, 66, percent0, []float64{3, 22, 5})), + ), + }, + { + description: "Summary: round 4 - instance adjusted based on round 3", + metrics: metrics( + summaryMetric("test_summary", summaryPoint(nil, t4, t4, 14, 96, percent0, []float64{9, 47, 8})), + ), + adjusted: metrics( + summaryMetric("test_summary", summaryPoint(nil, t3, t4, 14, 96, percent0, []float64{9, 47, 8})), + ), + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testutil.SetFeatureGateForTest(t, useCollectorStartTimeFallbackGate, tt.useFallback) + gcInterval := 10 * time.Millisecond + stma := NewStartTimeMetricAdjuster(zap.NewNop(), nil, gcInterval) + // To test that the adjuster is using the fallback correctly, override the fallback time to use + // directly. + approximateCollectorStartTime = mockStartTime + runScript(t, stma, "job", "0", tt.scenario) + }) + } +}