diff --git a/CHANGELOG.md b/CHANGELOG.md index bd528f8cf1ab..e41bcbf1e457 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ ### 🧰 Bug fixes 🧰 +- `prometheusreceiver`: Fix segfault that can occur after receiving stale metrics (#8056) + ### 🚀 New components 🚀 ## v0.45.1 diff --git a/receiver/prometheusreceiver/internal/otlp_metrics_adjuster.go b/receiver/prometheusreceiver/internal/otlp_metrics_adjuster.go index da8f095ad2cd..10c8533ecae1 100644 --- a/receiver/prometheusreceiver/internal/otlp_metrics_adjuster.go +++ b/receiver/prometheusreceiver/internal/otlp_metrics_adjuster.go @@ -300,14 +300,22 @@ func (ma *MetricsAdjusterPdata) adjustMetricGauge(current *pdata.Metric) (resets for i := 0; i < currentPoints.Len(); i++ { currentGauge := currentPoints.At(i) tsi := ma.tsm.get(current, currentGauge.Attributes()) + previous := tsi.previous + if previous == nil { + // no previous data point with values + // use the initial data point + previous = tsi.initial + } tsi.previous = current + if tsi.initial == nil { // initial || reset timeseries. tsi.initial = current resets++ continue } + initialPoints := tsi.initial.Gauge().DataPoints() previousPoints := previous.Gauge().DataPoints() if i >= initialPoints.Len() || i >= previousPoints.Len() { @@ -351,16 +359,25 @@ func (ma *MetricsAdjusterPdata) adjustMetricHistogram(current *pdata.Metric) (re for i := 0; i < currentPoints.Len(); i++ { currentDist := currentPoints.At(i) tsi := ma.tsm.get(current, currentDist.Attributes()) + previous := tsi.previous + if previous == nil { + // no previous data point with values + // use the initial data point + previous = tsi.initial + } + if !currentDist.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) { tsi.previous = current } + if tsi.initial == nil { // initial || reset timeseries. tsi.initial = current resets++ continue } + initialPoints := tsi.initial.Histogram().DataPoints() previousPoints := previous.Histogram().DataPoints() if i >= initialPoints.Len() || i >= previousPoints.Len() { @@ -396,16 +413,25 @@ func (ma *MetricsAdjusterPdata) adjustMetricSum(current *pdata.Metric) (resets i for i := 0; i < currentPoints.Len(); i++ { currentSum := currentPoints.At(i) tsi := ma.tsm.get(current, currentSum.Attributes()) + previous := tsi.previous + if previous == nil { + // no previous data point with values + // use the initial data point + previous = tsi.initial + } + if !currentSum.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) { tsi.previous = current } + if tsi.initial == nil { // initial || reset timeseries. tsi.initial = current resets++ continue } + initialPoints := tsi.initial.Sum().DataPoints() previousPoints := previous.Sum().DataPoints() if i >= initialPoints.Len() || i >= previousPoints.Len() { @@ -441,16 +467,25 @@ func (ma *MetricsAdjusterPdata) adjustMetricSummary(current *pdata.Metric) (rese for i := 0; i < currentPoints.Len(); i++ { currentSummary := currentPoints.At(i) tsi := ma.tsm.get(current, currentSummary.Attributes()) + previous := tsi.previous + if previous == nil { + // no previous data point with values + // use the initial data point + previous = tsi.initial + } + if !currentSummary.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) { tsi.previous = current } + if tsi.initial == nil { // initial || reset timeseries. tsi.initial = current resets++ continue } + initialPoints := tsi.initial.Summary().DataPoints() previousPoints := previous.Summary().DataPoints() if i >= initialPoints.Len() || i >= previousPoints.Len() { diff --git a/receiver/prometheusreceiver/internal/otlp_metrics_adjuster_test.go b/receiver/prometheusreceiver/internal/otlp_metrics_adjuster_test.go index be8c2d970eb3..e6365658de8b 100644 --- a/receiver/prometheusreceiver/internal/otlp_metrics_adjuster_test.go +++ b/receiver/prometheusreceiver/internal/otlp_metrics_adjuster_test.go @@ -660,6 +660,174 @@ func Test_histogram_flag_norecordedvalue(t *testing.T) { runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) } +func Test_histogram_flag_norecordedvalue_first_observation(t *testing.T) { + m1 := func() *pdata.MetricSlice { + metric := pdata.NewMetric() + metric.SetName(cd1) + metric.SetDataType(pdata.MetricDataTypeHistogram) + histogram := metric.Histogram() + histogram.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) + destPointL := histogram.DataPoints() + dp := destPointL.AppendEmpty() + dp.SetTimestamp(pdt1Ms) + dp.SetFlags(1) + return metricSlice(histogramMetric(cd1, k1v1k2v2, pdt1Ms, &dp)) + }() + m2 := func() *pdata.MetricSlice { + metric := pdata.NewMetric() + metric.SetName(cd1) + metric.SetDataType(pdata.MetricDataTypeHistogram) + histogram := metric.Histogram() + histogram.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) + destPointL := histogram.DataPoints() + dp := destPointL.AppendEmpty() + dp.SetTimestamp(pdt2Ms) + dp.SetFlags(1) + return metricSlice(histogramMetric(cd1, k1v1k2v2, pdt2Ms, &dp)) + }() + script := []*metricsAdjusterTestPdata{ + { + "Histogram: round 1 - initial instance, start time is unknown", + m1, + m1, + 1, + }, + { + "Histogram: round 2 - instance unchanged", + m2, + m2, + 0, + }, + } + + runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) +} + +func Test_summary_flag_norecordedvalue_first_observation(t *testing.T) { + m1 := func() *pdata.MetricSlice { + metric := pdata.NewMetric() + metric.SetName(cd1) + metric.SetDataType(pdata.MetricDataTypeSummary) + summary := metric.Summary() + destPointL := summary.DataPoints() + dp := destPointL.AppendEmpty() + dp.SetTimestamp(pdt1Ms) + dp.SetFlags(1) + return metricSlice(summaryMetric(cd1, k1v1k2v2, pdt1Ms, &dp)) + }() + m2 := func() *pdata.MetricSlice { + metric := pdata.NewMetric() + metric.SetName(cd1) + metric.SetDataType(pdata.MetricDataTypeSummary) + summary := metric.Summary() + destPointL := summary.DataPoints() + dp := destPointL.AppendEmpty() + dp.SetTimestamp(pdt2Ms) + dp.SetFlags(1) + return metricSlice(summaryMetric(cd1, k1v1k2v2, pdt2Ms, &dp)) + }() + script := []*metricsAdjusterTestPdata{ + { + "Summary: round 1 - initial instance, start time is unknown", + m1, + m1, + 1, + }, + { + "Summary: round 2 - instance unchanged", + m2, + m2, + 0, + }, + } + + runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) +} + +func Test_gauge_flag_norecordedvalue_first_observation(t *testing.T) { + m1 := func() *pdata.MetricSlice { + metric := pdata.NewMetric() + metric.SetName(cd1) + metric.SetDataType(pdata.MetricDataTypeGauge) + gauge := metric.Gauge() + destPointL := gauge.DataPoints() + dp := destPointL.AppendEmpty() + dp.SetTimestamp(pdt1Ms) + dp.SetFlags(1) + return metricSlice(gaugeMetric(cd1, k1v1k2v2, pdt1Ms, &dp)) + }() + m2 := func() *pdata.MetricSlice { + metric := pdata.NewMetric() + metric.SetName(cd1) + metric.SetDataType(pdata.MetricDataTypeGauge) + gauge := metric.Gauge() + destPointL := gauge.DataPoints() + dp := destPointL.AppendEmpty() + dp.SetTimestamp(pdt2Ms) + dp.SetFlags(1) + return metricSlice(gaugeMetric(cd1, k1v1k2v2, pdt2Ms, &dp)) + }() + script := []*metricsAdjusterTestPdata{ + { + "Gauge: round 1 - initial instance, start time is unknown", + m1, + m1, + 0, + }, + { + "Gauge: round 2 - instance unchanged", + m2, + m2, + 0, + }, + } + + runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) +} + +func Test_sum_flag_norecordedvalue_first_observation(t *testing.T) { + m1 := func() *pdata.MetricSlice { + metric := pdata.NewMetric() + metric.SetName(cd1) + metric.SetDataType(pdata.MetricDataTypeSum) + sum := metric.Sum() + sum.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) + destPointL := sum.DataPoints() + dp := destPointL.AppendEmpty() + dp.SetTimestamp(pdt1Ms) + dp.SetFlags(1) + return metricSlice(sumMetric(cd1, k1v1k2v2, pdt1Ms, &dp)) + }() + m2 := func() *pdata.MetricSlice { + metric := pdata.NewMetric() + metric.SetName(cd1) + metric.SetDataType(pdata.MetricDataTypeSum) + sum := metric.Sum() + sum.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative) + destPointL := sum.DataPoints() + dp := destPointL.AppendEmpty() + dp.SetTimestamp(pdt2Ms) + dp.SetFlags(1) + return metricSlice(sumMetric(cd1, k1v1k2v2, pdt2Ms, &dp)) + }() + script := []*metricsAdjusterTestPdata{ + { + "Sum: round 1 - initial instance, start time is unknown", + m1, + m1, + 1, + }, + { + "Sum: round 2 - instance unchanged", + m2, + m2, + 0, + }, + } + + runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script) +} + func Test_multiMetrics_pdata(t *testing.T) { g1 := "gauge1" script := []*metricsAdjusterTestPdata{