Skip to content

Commit

Permalink
[receiver/prometheus]: Fix segfault that can happen when adjusting st…
Browse files Browse the repository at this point in the history
…ale metrics (open-telemetry#8056)

Signed-off-by: Anthony J Mirabella <[email protected]>
  • Loading branch information
Aneurysm9 authored and tomsanbear committed Mar 2, 2022
1 parent 440e047 commit d7572dc
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

### 🧰 Bug fixes 🧰

- `prometheusreceiver`: Fix segfault that can occur after receiving stale metrics (#8056)

### 🚀 New components 🚀

## v0.45.1
Expand Down
35 changes: 35 additions & 0 deletions receiver/prometheusreceiver/internal/otlp_metrics_adjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,22 @@ func (ma *MetricsAdjusterPdata) adjustMetricGauge(current *pdata.Metric) (resets
for i := 0; i < currentPoints.Len(); i++ {
currentGauge := currentPoints.At(i)
tsi := ma.tsm.get(current, currentGauge.Attributes())

previous := tsi.previous
if previous == nil {
// no previous data point with values
// use the initial data point
previous = tsi.initial
}
tsi.previous = current

if tsi.initial == nil {
// initial || reset timeseries.
tsi.initial = current
resets++
continue
}

initialPoints := tsi.initial.Gauge().DataPoints()
previousPoints := previous.Gauge().DataPoints()
if i >= initialPoints.Len() || i >= previousPoints.Len() {
Expand Down Expand Up @@ -351,16 +359,25 @@ func (ma *MetricsAdjusterPdata) adjustMetricHistogram(current *pdata.Metric) (re
for i := 0; i < currentPoints.Len(); i++ {
currentDist := currentPoints.At(i)
tsi := ma.tsm.get(current, currentDist.Attributes())

previous := tsi.previous
if previous == nil {
// no previous data point with values
// use the initial data point
previous = tsi.initial
}

if !currentDist.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
tsi.previous = current
}

if tsi.initial == nil {
// initial || reset timeseries.
tsi.initial = current
resets++
continue
}

initialPoints := tsi.initial.Histogram().DataPoints()
previousPoints := previous.Histogram().DataPoints()
if i >= initialPoints.Len() || i >= previousPoints.Len() {
Expand Down Expand Up @@ -396,16 +413,25 @@ func (ma *MetricsAdjusterPdata) adjustMetricSum(current *pdata.Metric) (resets i
for i := 0; i < currentPoints.Len(); i++ {
currentSum := currentPoints.At(i)
tsi := ma.tsm.get(current, currentSum.Attributes())

previous := tsi.previous
if previous == nil {
// no previous data point with values
// use the initial data point
previous = tsi.initial
}

if !currentSum.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
tsi.previous = current
}

if tsi.initial == nil {
// initial || reset timeseries.
tsi.initial = current
resets++
continue
}

initialPoints := tsi.initial.Sum().DataPoints()
previousPoints := previous.Sum().DataPoints()
if i >= initialPoints.Len() || i >= previousPoints.Len() {
Expand Down Expand Up @@ -441,16 +467,25 @@ func (ma *MetricsAdjusterPdata) adjustMetricSummary(current *pdata.Metric) (rese
for i := 0; i < currentPoints.Len(); i++ {
currentSummary := currentPoints.At(i)
tsi := ma.tsm.get(current, currentSummary.Attributes())

previous := tsi.previous
if previous == nil {
// no previous data point with values
// use the initial data point
previous = tsi.initial
}

if !currentSummary.Flags().HasFlag(pdata.MetricDataPointFlagNoRecordedValue) {
tsi.previous = current
}

if tsi.initial == nil {
// initial || reset timeseries.
tsi.initial = current
resets++
continue
}

initialPoints := tsi.initial.Summary().DataPoints()
previousPoints := previous.Summary().DataPoints()
if i >= initialPoints.Len() || i >= previousPoints.Len() {
Expand Down
168 changes: 168 additions & 0 deletions receiver/prometheusreceiver/internal/otlp_metrics_adjuster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,174 @@ func Test_histogram_flag_norecordedvalue(t *testing.T) {
runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script)
}

func Test_histogram_flag_norecordedvalue_first_observation(t *testing.T) {
m1 := func() *pdata.MetricSlice {
metric := pdata.NewMetric()
metric.SetName(cd1)
metric.SetDataType(pdata.MetricDataTypeHistogram)
histogram := metric.Histogram()
histogram.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative)
destPointL := histogram.DataPoints()
dp := destPointL.AppendEmpty()
dp.SetTimestamp(pdt1Ms)
dp.SetFlags(1)
return metricSlice(histogramMetric(cd1, k1v1k2v2, pdt1Ms, &dp))
}()
m2 := func() *pdata.MetricSlice {
metric := pdata.NewMetric()
metric.SetName(cd1)
metric.SetDataType(pdata.MetricDataTypeHistogram)
histogram := metric.Histogram()
histogram.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative)
destPointL := histogram.DataPoints()
dp := destPointL.AppendEmpty()
dp.SetTimestamp(pdt2Ms)
dp.SetFlags(1)
return metricSlice(histogramMetric(cd1, k1v1k2v2, pdt2Ms, &dp))
}()
script := []*metricsAdjusterTestPdata{
{
"Histogram: round 1 - initial instance, start time is unknown",
m1,
m1,
1,
},
{
"Histogram: round 2 - instance unchanged",
m2,
m2,
0,
},
}

runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script)
}

func Test_summary_flag_norecordedvalue_first_observation(t *testing.T) {
m1 := func() *pdata.MetricSlice {
metric := pdata.NewMetric()
metric.SetName(cd1)
metric.SetDataType(pdata.MetricDataTypeSummary)
summary := metric.Summary()
destPointL := summary.DataPoints()
dp := destPointL.AppendEmpty()
dp.SetTimestamp(pdt1Ms)
dp.SetFlags(1)
return metricSlice(summaryMetric(cd1, k1v1k2v2, pdt1Ms, &dp))
}()
m2 := func() *pdata.MetricSlice {
metric := pdata.NewMetric()
metric.SetName(cd1)
metric.SetDataType(pdata.MetricDataTypeSummary)
summary := metric.Summary()
destPointL := summary.DataPoints()
dp := destPointL.AppendEmpty()
dp.SetTimestamp(pdt2Ms)
dp.SetFlags(1)
return metricSlice(summaryMetric(cd1, k1v1k2v2, pdt2Ms, &dp))
}()
script := []*metricsAdjusterTestPdata{
{
"Summary: round 1 - initial instance, start time is unknown",
m1,
m1,
1,
},
{
"Summary: round 2 - instance unchanged",
m2,
m2,
0,
},
}

runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script)
}

func Test_gauge_flag_norecordedvalue_first_observation(t *testing.T) {
m1 := func() *pdata.MetricSlice {
metric := pdata.NewMetric()
metric.SetName(cd1)
metric.SetDataType(pdata.MetricDataTypeGauge)
gauge := metric.Gauge()
destPointL := gauge.DataPoints()
dp := destPointL.AppendEmpty()
dp.SetTimestamp(pdt1Ms)
dp.SetFlags(1)
return metricSlice(gaugeMetric(cd1, k1v1k2v2, pdt1Ms, &dp))
}()
m2 := func() *pdata.MetricSlice {
metric := pdata.NewMetric()
metric.SetName(cd1)
metric.SetDataType(pdata.MetricDataTypeGauge)
gauge := metric.Gauge()
destPointL := gauge.DataPoints()
dp := destPointL.AppendEmpty()
dp.SetTimestamp(pdt2Ms)
dp.SetFlags(1)
return metricSlice(gaugeMetric(cd1, k1v1k2v2, pdt2Ms, &dp))
}()
script := []*metricsAdjusterTestPdata{
{
"Gauge: round 1 - initial instance, start time is unknown",
m1,
m1,
0,
},
{
"Gauge: round 2 - instance unchanged",
m2,
m2,
0,
},
}

runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script)
}

func Test_sum_flag_norecordedvalue_first_observation(t *testing.T) {
m1 := func() *pdata.MetricSlice {
metric := pdata.NewMetric()
metric.SetName(cd1)
metric.SetDataType(pdata.MetricDataTypeSum)
sum := metric.Sum()
sum.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative)
destPointL := sum.DataPoints()
dp := destPointL.AppendEmpty()
dp.SetTimestamp(pdt1Ms)
dp.SetFlags(1)
return metricSlice(sumMetric(cd1, k1v1k2v2, pdt1Ms, &dp))
}()
m2 := func() *pdata.MetricSlice {
metric := pdata.NewMetric()
metric.SetName(cd1)
metric.SetDataType(pdata.MetricDataTypeSum)
sum := metric.Sum()
sum.SetAggregationTemporality(pdata.MetricAggregationTemporalityCumulative)
destPointL := sum.DataPoints()
dp := destPointL.AppendEmpty()
dp.SetTimestamp(pdt2Ms)
dp.SetFlags(1)
return metricSlice(sumMetric(cd1, k1v1k2v2, pdt2Ms, &dp))
}()
script := []*metricsAdjusterTestPdata{
{
"Sum: round 1 - initial instance, start time is unknown",
m1,
m1,
1,
},
{
"Sum: round 2 - instance unchanged",
m2,
m2,
0,
},
}

runScriptPdata(t, NewJobsMapPdata(time.Minute).get("job", "0"), script)
}

func Test_multiMetrics_pdata(t *testing.T) {
g1 := "gauge1"
script := []*metricsAdjusterTestPdata{
Expand Down

0 comments on commit d7572dc

Please sign in to comment.