diff --git a/internal/telemetry/region_pusher.go b/internal/telemetry/region_pusher.go index d0aba49c5..4963bbe08 100644 --- a/internal/telemetry/region_pusher.go +++ b/internal/telemetry/region_pusher.go @@ -2,6 +2,7 @@ package telemetry import ( "context" + "fmt" "sync" "time" @@ -173,7 +174,7 @@ func (p *RegionPusher) push(m sm.RegionTelemetry) { ) start := p.metrics.start() - defer p.metrics.end(err, start) + defer func() { p.metrics.end(err, start) }() // We don't want to cancel a possibly ongoing request even if the agent // context is done, therefore use background context @@ -184,7 +185,9 @@ func (p *RegionPusher) push(m sm.RegionTelemetry) { return } if r.Status.Code != sm.StatusCode_OK { - p.logger.Error(). + // create an error so it's handled by metrics.end() on defer + err = fmt.Errorf("unexpected status code") + p.logger.Err(err). Int32("statusCode", int32(r.Status.Code)). Str("statusMessage", r.Status.Message). Msg("error pushing telemetry") diff --git a/internal/telemetry/region_pusher_test.go b/internal/telemetry/region_pusher_test.go index 23c8a8799..a32590ad1 100644 --- a/internal/telemetry/region_pusher_test.go +++ b/internal/telemetry/region_pusher_test.go @@ -2,6 +2,7 @@ package telemetry import ( "context" + "errors" "reflect" "sync" "testing" @@ -10,6 +11,7 @@ import ( sm "github.com/grafana/synthetic-monitoring-agent/pkg/pb/synthetic_monitoring" prom "github.com/prometheus/client_golang/prometheus" + prommodel "github.com/prometheus/client_model/go" "github.com/rs/zerolog" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -331,6 +333,87 @@ func TestTenantPusher(t *testing.T) { // Verify sent data on exit testClient.assert(t, []sm.RegionTelemetry{mm[1]}) }) + + t.Run("should report push error", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + t.Cleanup(func() { + shutdownAndWait(cancel) + resetTestClient() + }) + + ticker := &testTicker{ + c: make(chan time.Time), + } + var opt withTicker = ticker + + metrics := RegionMetrics{ + pushRequestsActive: prom.NewGauge(prom.GaugeOpts{}), + pushRequestsDuration: prom.NewHistogram(prom.HistogramOpts{}), + pushRequestsTotal: prom.NewCounter(prom.CounterOpts{}), + pushRequestsError: prom.NewCounter(prom.CounterOpts{}), + addExecutionDuration: prom.NewHistogram(prom.HistogramOpts{}), + } + + // Setup test client to return err on push + testClient.rr.err = errors.New("test error") + + pusher := NewRegionPusher(ctx, timeSpan, testClient, logger, instance, regionID, metrics, opt) + + // Add some executions + addExecutions(pusher, 0, 4) + + // Tick once, which should make the push fail + tickAndWait(ticker) + + // Verify sent data + testClient.assert(t, []sm.RegionTelemetry{mm[0]}) + + // Verify error metric + errsMetric := getMetricFromCollector(t, metrics.pushRequestsError) + require.Equal(t, 1, int(*errsMetric.Counter.Value)) + }) + + t.Run("should report push error on unexpected status", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + t.Cleanup(func() { + shutdownAndWait(cancel) + resetTestClient() + }) + + ticker := &testTicker{ + c: make(chan time.Time), + } + var opt withTicker = ticker + + metrics := RegionMetrics{ + pushRequestsActive: prom.NewGauge(prom.GaugeOpts{}), + pushRequestsDuration: prom.NewHistogram(prom.HistogramOpts{}), + pushRequestsTotal: prom.NewCounter(prom.CounterOpts{}), + pushRequestsError: prom.NewCounter(prom.CounterOpts{}), + addExecutionDuration: prom.NewHistogram(prom.HistogramOpts{}), + } + + pusher := NewRegionPusher(ctx, timeSpan, testClient, logger, instance, regionID, metrics, opt) + + // Add some executions + addExecutions(pusher, 0, 4) + + // Set mock response for client + // with unexpected status code + testClient.rr = testPushRespKO + + // Tick once, which should make the push fail + tickAndWait(ticker) + + // Verify sent data + testClient.assert(t, []sm.RegionTelemetry{mm[0]}) + + // Verify error metric + errsMetric := getMetricFromCollector(t, metrics.pushRequestsError) + require.Equal(t, 1, int(*errsMetric.Counter.Value)) + }) } type testTicker struct { @@ -422,3 +505,17 @@ LOOP: t.Fatalf("telemetry not found: %v", expTele) } } + +func getMetricFromCollector(t *testing.T, c prom.Collector) *prommodel.Metric { + t.Helper() + + metricCh := make(chan prom.Metric) + defer close(metricCh) + go c.Collect(metricCh) + metric := <-metricCh + + m := &prommodel.Metric{} + require.NoError(t, metric.Write(m)) + + return m +}