diff --git a/receiver/sqlqueryreceiver/internal/observability/observability.go b/receiver/sqlqueryreceiver/internal/observability/observability.go index 2750f30893ec..ea366eaf4416 100644 --- a/receiver/sqlqueryreceiver/internal/observability/observability.go +++ b/receiver/sqlqueryreceiver/internal/observability/observability.go @@ -25,16 +25,25 @@ import ( func init() { err := view.Register( viewAcceptedLogs, + viewErrorCount, ) if err != nil { fmt.Printf("Failed to register sqlquery receiver's views: %v\n", err) } } +const ( + StartError = "start_error" + CollectError = "collect_error" +) + var ( - mAcceptedLogs = stats.Int64("receiver/accepted/log/records", "Number of log record pushed into the pipeline.", "") - receiverKey, _ = tag.NewKey("receiver") // nolint:errcheck - queryKey, _ = tag.NewKey("query") // nolint:errcheck + mAcceptedLogs = stats.Int64("receiver/accepted/log/records", "Number of log record pushed into the pipeline.", "") + mErrorCount = stats.Int64("receiver/errors", "Number of errors", "") + + receiverKey, _ = tag.NewKey("receiver") // nolint:errcheck + queryKey, _ = tag.NewKey("query") // nolint:errcheck + errorTypeKey, _ = tag.NewKey("error_type") // nolint:errcheck ) var viewAcceptedLogs = &view.View{ @@ -45,6 +54,14 @@ var viewAcceptedLogs = &view.View{ Aggregation: view.Sum(), } +var viewErrorCount = &view.View{ + Name: mErrorCount.Name(), + Description: mErrorCount.Description(), + Measure: mErrorCount, + TagKeys: []tag.Key{receiverKey, queryKey, errorTypeKey}, + Aggregation: view.Sum(), +} + func RecordAcceptedLogs(acceptedLogs int64, receiver string, query string) error { return stats.RecordWithTags( context.Background(), @@ -55,3 +72,27 @@ func RecordAcceptedLogs(acceptedLogs int64, receiver string, query string) error mAcceptedLogs.M(acceptedLogs), ) } + +func RecordErrors(errorType string, receiver string, query string) error { + return stats.RecordWithTags( + context.Background(), + []tag.Mutator{ + tag.Insert(receiverKey, receiver), + tag.Insert(queryKey, query), + tag.Insert(errorTypeKey, errorType), + }, + mErrorCount.M(int64(1)), + ) +} + +func RecordNoErrors(errorType string, receiver string, query string) error { + return stats.RecordWithTags( + context.Background(), + []tag.Mutator{ + tag.Insert(receiverKey, receiver), + tag.Insert(queryKey, query), + tag.Insert(errorTypeKey, errorType), + }, + mErrorCount.M(int64(0)), + ) +} diff --git a/receiver/sqlqueryreceiver/internal/observability/observability_test.go b/receiver/sqlqueryreceiver/internal/observability/observability_test.go index 97925bcc1bf0..de97241febe9 100644 --- a/receiver/sqlqueryreceiver/internal/observability/observability_test.go +++ b/receiver/sqlqueryreceiver/internal/observability/observability_test.go @@ -86,20 +86,34 @@ func metricReader(chData chan []*metricdata.Metric, fail chan struct{}, count in } func TestMetrics(t *testing.T) { - const ( - receiver = "sqlquery/my-name" - queryReceiver = "query-0: select * from simple_logs" - ) type testCase struct { - name string - recordFunc string - value int64 + name string + recordFunc string + value int64 + labels map[string]string + labelsOrder []string // order of labels in timeseries, label values have the same order as keys in the metric descriptor } tests := []testCase{ { name: "receiver/accepted/log/records", recordFunc: "RecordAcceptedLogs", value: 10, + labels: map[string]string{ + "receiver": "sqlquery/my-name", + "query": "query-0: select * from simple_logs", + }, + labelsOrder: []string{"query", "receiver"}, + }, + { + name: "receiver/errors", + recordFunc: "RecordErrors", + value: 1, + labels: map[string]string{ + "receiver": "sqlquery/my-logs", + "query": "query-1: select * from simple_logs", + "error_type": StartError, + }, + labelsOrder: []string{"error_type", "query", "receiver"}, }, } @@ -113,7 +127,11 @@ func TestMetrics(t *testing.T) { for _, tt := range tests { switch tt.recordFunc { case "RecordAcceptedLogs": - require.NoError(t, RecordAcceptedLogs(tt.value, receiver, queryReceiver)) + require.NoError(t, RecordAcceptedLogs(tt.value, tt.labels["receiver"], tt.labels["query"])) + case "RecordErrors": + require.NoError(t, RecordErrors(tt.labels["error_type"], tt.labels["receiver"], tt.labels["query"])) + case "RecordNoErrors": + require.NoError(t, RecordNoErrors(tt.labels["error_type"], tt.labels["receiver"], tt.labels["query"])) } } @@ -141,11 +159,14 @@ func TestMetrics(t *testing.T) { require.Len(t, d.TimeSeries, 1) require.Len(t, d.TimeSeries[0].Points, 1) assert.Equal(t, d.TimeSeries[0].Points[0].Value, tt.value) - require.Len(t, d.TimeSeries[0].LabelValues, 2) - require.True(t, d.TimeSeries[0].LabelValues[0].Present) - assert.Equal(t, d.TimeSeries[0].LabelValues[0].Value, queryReceiver) - require.True(t, d.TimeSeries[0].LabelValues[1].Present) - assert.Equal(t, d.TimeSeries[0].LabelValues[1].Value, receiver) + require.Len(t, d.TimeSeries[0].LabelValues, len(tt.labels)) + + for i, label := range d.TimeSeries[0].LabelValues { + val, ok := tt.labels[tt.labelsOrder[i]] + assert.True(t, ok) + assert.True(t, label.Present) + assert.Equal(t, label.Value, val) + } }) } } diff --git a/receiver/sqlqueryreceiver/logs_receiver.go b/receiver/sqlqueryreceiver/logs_receiver.go index a628a23d77bb..1a3d4536707e 100644 --- a/receiver/sqlqueryreceiver/logs_receiver.go +++ b/receiver/sqlqueryreceiver/logs_receiver.go @@ -97,9 +97,17 @@ func (receiver *logsReceiver) Start(ctx context.Context, host component.Host) er for _, queryReceiver := range receiver.queryReceivers { err := queryReceiver.start() if err != nil { + if err := observability.RecordErrors(observability.StartError, receiver.id.String(), queryReceiver.ID()); err != nil { + receiver.settings.Logger.Debug("error recording metric for errors count", zap.Error(err)) + } return err } + + if err := observability.RecordNoErrors(observability.StartError, receiver.id.String(), queryReceiver.ID()); err != nil { + receiver.settings.Logger.Debug("error recording metric for errors count", zap.Error(err)) + } } + receiver.startCollecting() receiver.settings.Logger.Debug("started.") return nil @@ -127,6 +135,13 @@ func (receiver *logsReceiver) collect() { logs, err := queryReceiver.collect(context.Background()) if err != nil { receiver.settings.Logger.Error("Error collecting logs", zap.Error(err), zap.String("query", queryReceiver.ID())) + if err := observability.RecordErrors(observability.CollectError, receiver.id.String(), queryReceiver.ID()); err != nil { + receiver.settings.Logger.Debug("error for recording metric for errors count", zap.Error(err)) + } + } + + if err := observability.RecordNoErrors(observability.CollectError, receiver.id.String(), queryReceiver.ID()); err != nil { + receiver.settings.Logger.Debug("error for recording metric for errors count", zap.Error(err)) } if err := observability.RecordAcceptedLogs(int64(logs.LogRecordCount()), receiver.id.String(), queryReceiver.id); err != nil {