diff --git a/receiver/sqlqueryreceiver/logs_receiver.go b/receiver/sqlqueryreceiver/logs_receiver.go index 04c37a67cade..1fed44d6818e 100644 --- a/receiver/sqlqueryreceiver/logs_receiver.go +++ b/receiver/sqlqueryreceiver/logs_receiver.go @@ -12,12 +12,14 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/extension/experimental/storage" + "go.opentelemetry.io/collector/obsreport" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver" "go.uber.org/multierr" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sqlqueryreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/sqlqueryreceiver/internal/observability" ) @@ -35,6 +37,7 @@ type logsReceiver struct { id component.ID storageClient storage.Client + obsrecv *obsreport.Receiver } func newLogsReceiver( @@ -43,7 +46,16 @@ func newLogsReceiver( sqlOpenerFunc sqlOpenerFunc, createClient clientProviderFunc, nextConsumer consumer.Logs, -) *logsReceiver { +) (*logsReceiver, error) { + + obsr, err := obsreport.NewReceiver(obsreport.ReceiverSettings{ + ReceiverID: settings.ID, + ReceiverCreateSettings: settings, + }) + if err != nil { + return nil, err + } + receiver := &logsReceiver{ config: config, settings: settings, @@ -54,9 +66,10 @@ func newLogsReceiver( nextConsumer: nextConsumer, shutdownRequested: make(chan struct{}), id: settings.ID, + obsrecv: obsr, } - return receiver + return receiver, nil } func (receiver *logsReceiver) Start(ctx context.Context, host component.Host) error { @@ -135,6 +148,7 @@ func (receiver *logsReceiver) collect() { logsChannel := make(chan plog.Logs) for _, queryReceiver := range receiver.queryReceivers { go func(queryReceiver *logsQueryReceiver) { + logs, err := queryReceiver.collect(context.Background()) if err != nil { receiver.settings.Logger.Error("error collecting logs", zap.Error(err), zap.String("query", queryReceiver.ID())) @@ -160,8 +174,12 @@ func (receiver *logsReceiver) collect() { logs := <-logsChannel logs.ResourceLogs().MoveAndAppendTo(allLogs.ResourceLogs()) } - if allLogs.LogRecordCount() > 0 { + + logRecordCount := allLogs.LogRecordCount() + if logRecordCount > 0 { + ctx := receiver.obsrecv.StartLogsOp(context.Background()) err := receiver.nextConsumer.ConsumeLogs(context.Background(), allLogs) + receiver.obsrecv.EndLogsOp(ctx, metadata.Type, logRecordCount, err) if err != nil { receiver.settings.Logger.Error("failed to send logs: %w", zap.Error(err)) } diff --git a/receiver/sqlqueryreceiver/receiver.go b/receiver/sqlqueryreceiver/receiver.go index 973243566b69..ef28b3c0092f 100644 --- a/receiver/sqlqueryreceiver/receiver.go +++ b/receiver/sqlqueryreceiver/receiver.go @@ -29,7 +29,7 @@ func createLogsReceiverFunc(sqlOpenerFunc sqlOpenerFunc, clientProviderFunc clie consumer consumer.Logs, ) (receiver.Logs, error) { sqlQueryConfig := config.(*Config) - return newLogsReceiver(sqlQueryConfig, settings, sqlOpenerFunc, clientProviderFunc, consumer), nil + return newLogsReceiver(sqlQueryConfig, settings, sqlOpenerFunc, clientProviderFunc, consumer) } }