From 4cfcb12040de479eb71483c9a73060baf9df14f6 Mon Sep 17 00:00:00 2001 From: James Ryans Date: Fri, 22 Mar 2024 22:32:27 +0700 Subject: [PATCH 1/4] fix JaegerV2 storage receiver consumeTraces query params Signed-off-by: James Ryans --- .../datareceivers/jaegerstorage.go | 21 +++++++++++++++---- .../internal/integration/integration.go | 18 +++++++++++++--- .../receivers/storagereceiver/receiver.go | 5 ++++- .../storagereceiver/receiver_test.go | 13 +++++------- 4 files changed, 41 insertions(+), 16 deletions(-) diff --git a/cmd/jaeger/internal/integration/datareceivers/jaegerstorage.go b/cmd/jaeger/internal/integration/datareceivers/jaegerstorage.go index b6037dafb5e..65cebcebbc4 100644 --- a/cmd/jaeger/internal/integration/datareceivers/jaegerstorage.go +++ b/cmd/jaeger/internal/integration/datareceivers/jaegerstorage.go @@ -6,6 +6,7 @@ package datareceivers import ( "context" "fmt" + "time" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" @@ -15,20 +16,27 @@ import ( "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/receivers/storagereceiver" ) type jaegerStorageDataReceiver struct { + Logger *zap.Logger TraceStorage string StorageConfig *jaegerstorage.Config host *storagetest.StorageHost receiver receiver.Traces } -func NewJaegerStorageDataReceiver(traceStorage string, storageConfig *jaegerstorage.Config) testbed.DataReceiver { +func NewJaegerStorageDataReceiver( + logger *zap.Logger, + traceStorage string, + storageConfig *jaegerstorage.Config, +) testbed.DataReceiver { return &jaegerStorageDataReceiver{ + Logger: logger, TraceStorage: traceStorage, StorageConfig: storageConfig, } @@ -37,19 +45,24 @@ func NewJaegerStorageDataReceiver(traceStorage string, storageConfig *jaegerstor func (dr *jaegerStorageDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, _ consumer.Logs) error { ctx := context.Background() - extFactory := jaegerstorage.NewFactory() - ext, err := extFactory.CreateExtension(ctx, extension.CreateSettings{ + extSet := extension.CreateSettings{ + ID: jaegerstorage.ID, TelemetrySettings: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), - }, dr.StorageConfig) + } + extSet.TelemetrySettings.Logger = dr.Logger + extFactory := jaegerstorage.NewFactory() + ext, err := extFactory.CreateExtension(ctx, extSet, dr.StorageConfig) if err != nil { return err } rcvSet := receivertest.NewNopCreateSettings() + rcvSet.TelemetrySettings.Logger = dr.Logger rcvFactory := storagereceiver.NewFactory() rcvCfg := rcvFactory.CreateDefaultConfig().(*storagereceiver.Config) rcvCfg.TraceStorage = dr.TraceStorage + rcvCfg.PullInterval = 100 * time.Millisecond rcv, err := rcvFactory.CreateTracesReceiver(ctx, rcvSet, rcvCfg, tc) if err != nil { return err diff --git a/cmd/jaeger/internal/integration/integration.go b/cmd/jaeger/internal/integration/integration.go index 4ffe82219b5..205e262ea7b 100644 --- a/cmd/jaeger/internal/integration/integration.go +++ b/cmd/jaeger/internal/integration/integration.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/provider/fileprovider" "go.opentelemetry.io/collector/otelcol" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/jaeger/internal" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/exporters/storageexporter" @@ -31,7 +32,11 @@ type StorageIntegration struct { // Because of that, we need to host another jaeger storage extension // that is a duplication from the collector's extension. And get // the exporter TraceStorage name to set it to receiver TraceStorage. -func (s *StorageIntegration) newDataReceiver(t *testing.T, factories otelcol.Factories) testbed.DataReceiver { +func (s *StorageIntegration) newDataReceiver( + t *testing.T, + logger *zap.Logger, + factories otelcol.Factories, +) testbed.DataReceiver { fmp := fileprovider.New() configProvider, err := otelcol.NewConfigProvider( otelcol.ConfigProviderSettings{ @@ -52,7 +57,11 @@ func (s *StorageIntegration) newDataReceiver(t *testing.T, factories otelcol.Fac exporterCfg, ok := cfg.Exporters[storageexporter.ID].(*storageexporter.Config) require.True(t, ok, "no jaeger storage exporter found in the config") - receiver := datareceivers.NewJaegerStorageDataReceiver(exporterCfg.TraceStorage, storageCfg) + receiver := datareceivers.NewJaegerStorageDataReceiver( + logger, + exporterCfg.TraceStorage, + storageCfg, + ) return receiver } @@ -61,6 +70,9 @@ func (s *StorageIntegration) Test(t *testing.T) { t.Skipf("Integration test against Jaeger-V2 %[1]s skipped; set STORAGE env var to %[1]s to run this", s.Name) } + logger, err := zap.NewDevelopment() + require.NoError(t, err) + factories, err := internal.Components() require.NoError(t, err) @@ -70,7 +82,7 @@ func (s *StorageIntegration) Test(t *testing.T) { "", ) sender := testbed.NewOTLPTraceDataSender(testbed.DefaultHost, 4317) - receiver := s.newDataReceiver(t, factories) + receiver := s.newDataReceiver(t, logger, factories) runner := testbed.NewInProcessCollector(factories) validator := testbed.NewCorrectTestValidator( diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go index ad1c6953347..1e8400660ca 100644 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go +++ b/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go @@ -88,8 +88,11 @@ func (r *storageReceiver) consumeLoop(ctx context.Context) error { } func (r *storageReceiver) consumeTraces(ctx context.Context, serviceName string) error { + endTime := time.Now() traces, err := r.spanReader.FindTraces(ctx, &spanstore.TraceQueryParameters{ - ServiceName: serviceName, + ServiceName: serviceName, + StartTimeMin: endTime.Add(-1 * time.Hour), + StartTimeMax: endTime, }) if err != nil { return err diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver_test.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver_test.go index a996e0bece7..b71d03d56ea 100644 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver_test.go +++ b/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver_test.go @@ -23,7 +23,6 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage" factoryMocks "github.com/jaegertracing/jaeger/storage/mocks" - "github.com/jaegertracing/jaeger/storage/spanstore" spanStoreMocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" ) @@ -254,13 +253,11 @@ func TestReceiver_StartConsume(t *testing.T) { t.Run(test.name, func(t *testing.T) { reader := new(spanStoreMocks.Reader) reader.On("GetServices", mock.AnythingOfType("*context.cancelCtx")).Return(test.services, nil) - for _, service := range test.services { - reader.On( - "FindTraces", - mock.AnythingOfType("*context.cancelCtx"), - &spanstore.TraceQueryParameters{ServiceName: service}, - ).Return(test.traces, test.tracesErr) - } + reader.On( + "FindTraces", + mock.AnythingOfType("*context.cancelCtx"), + mock.AnythingOfType("*spanstore.TraceQueryParameters"), + ).Return(test.traces, test.tracesErr) r.receiver.spanReader = reader require.NoError(t, r.receiver.Shutdown(ctx)) From 3fca599ed548f38e91d3da026d89543fdb4c6ba4 Mon Sep 17 00:00:00 2001 From: James Ryans Date: Fri, 22 Mar 2024 23:00:04 +0700 Subject: [PATCH 2/4] use config file telemetry log instead of creating one Signed-off-by: James Ryans --- cmd/jaeger/internal/integration/integration.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cmd/jaeger/internal/integration/integration.go b/cmd/jaeger/internal/integration/integration.go index 205e262ea7b..593f63a9796 100644 --- a/cmd/jaeger/internal/integration/integration.go +++ b/cmd/jaeger/internal/integration/integration.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/provider/fileprovider" "go.opentelemetry.io/collector/otelcol" + "go.opentelemetry.io/collector/service/telemetry" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/jaeger/internal" @@ -51,6 +52,9 @@ func (s *StorageIntegration) newDataReceiver( cfg, err := configProvider.Get(context.Background(), factories) require.NoError(t, err) + tel, err := telemetry.New(context.Background(), telemetry.Settings{}, cfg.Service.Telemetry) + require.NoError(t, err) + storageCfg, ok := cfg.Extensions[jaegerstorage.ID].(*jaegerstorage.Config) require.True(t, ok, "no jaeger storage extension found in the config") @@ -58,7 +62,7 @@ func (s *StorageIntegration) newDataReceiver( require.True(t, ok, "no jaeger storage exporter found in the config") receiver := datareceivers.NewJaegerStorageDataReceiver( - logger, + tel.Logger(), exporterCfg.TraceStorage, storageCfg, ) From de57e56df3b79d2607ab2acc5531e097b20f3cd7 Mon Sep 17 00:00:00 2001 From: James Ryans Date: Sat, 23 Mar 2024 01:43:42 +0700 Subject: [PATCH 3/4] remove unused logger that passed to newDataReceiver Signed-off-by: James Ryans --- cmd/jaeger/internal/integration/integration.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/cmd/jaeger/internal/integration/integration.go b/cmd/jaeger/internal/integration/integration.go index 593f63a9796..d4d5b0950c3 100644 --- a/cmd/jaeger/internal/integration/integration.go +++ b/cmd/jaeger/internal/integration/integration.go @@ -15,7 +15,6 @@ import ( "go.opentelemetry.io/collector/confmap/provider/fileprovider" "go.opentelemetry.io/collector/otelcol" "go.opentelemetry.io/collector/service/telemetry" - "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/jaeger/internal" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/exporters/storageexporter" @@ -33,11 +32,7 @@ type StorageIntegration struct { // Because of that, we need to host another jaeger storage extension // that is a duplication from the collector's extension. And get // the exporter TraceStorage name to set it to receiver TraceStorage. -func (s *StorageIntegration) newDataReceiver( - t *testing.T, - logger *zap.Logger, - factories otelcol.Factories, -) testbed.DataReceiver { +func (s *StorageIntegration) newDataReceiver(t *testing.T, factories otelcol.Factories) testbed.DataReceiver { fmp := fileprovider.New() configProvider, err := otelcol.NewConfigProvider( otelcol.ConfigProviderSettings{ @@ -74,9 +69,6 @@ func (s *StorageIntegration) Test(t *testing.T) { t.Skipf("Integration test against Jaeger-V2 %[1]s skipped; set STORAGE env var to %[1]s to run this", s.Name) } - logger, err := zap.NewDevelopment() - require.NoError(t, err) - factories, err := internal.Components() require.NoError(t, err) @@ -86,7 +78,7 @@ func (s *StorageIntegration) Test(t *testing.T) { "", ) sender := testbed.NewOTLPTraceDataSender(testbed.DefaultHost, 4317) - receiver := s.newDataReceiver(t, logger, factories) + receiver := s.newDataReceiver(t, factories) runner := testbed.NewInProcessCollector(factories) validator := testbed.NewCorrectTestValidator( From 755dc7e99ec7f706da39612eec53ec8f700651f7 Mon Sep 17 00:00:00 2001 From: James Ryans Date: Sat, 23 Mar 2024 17:13:34 +0700 Subject: [PATCH 4/4] pass the whole TelemetrySettings instead of only a Logger Signed-off-by: James Ryans --- .../datareceivers/jaegerstorage.go | 31 +++++++++---------- .../internal/integration/integration.go | 6 +++- .../receivers/storagereceiver/factory.go | 3 ++ 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/cmd/jaeger/internal/integration/datareceivers/jaegerstorage.go b/cmd/jaeger/internal/integration/datareceivers/jaegerstorage.go index 65cebcebbc4..688d0866373 100644 --- a/cmd/jaeger/internal/integration/datareceivers/jaegerstorage.go +++ b/cmd/jaeger/internal/integration/datareceivers/jaegerstorage.go @@ -11,34 +11,31 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/receiver" - "go.opentelemetry.io/collector/receiver/receivertest" - "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/receivers/storagereceiver" ) type jaegerStorageDataReceiver struct { - Logger *zap.Logger - TraceStorage string - StorageConfig *jaegerstorage.Config - host *storagetest.StorageHost - receiver receiver.Traces + TelemetrySettings component.TelemetrySettings + TraceStorage string + StorageConfig *jaegerstorage.Config + host *storagetest.StorageHost + receiver receiver.Traces } func NewJaegerStorageDataReceiver( - logger *zap.Logger, + telemetrySettings component.TelemetrySettings, traceStorage string, storageConfig *jaegerstorage.Config, ) testbed.DataReceiver { return &jaegerStorageDataReceiver{ - Logger: logger, - TraceStorage: traceStorage, - StorageConfig: storageConfig, + TelemetrySettings: telemetrySettings, + TraceStorage: traceStorage, + StorageConfig: storageConfig, } } @@ -47,18 +44,18 @@ func (dr *jaegerStorageDataReceiver) Start(tc consumer.Traces, _ consumer.Metric extSet := extension.CreateSettings{ ID: jaegerstorage.ID, - TelemetrySettings: componenttest.NewNopTelemetrySettings(), - BuildInfo: component.NewDefaultBuildInfo(), + TelemetrySettings: dr.TelemetrySettings, } - extSet.TelemetrySettings.Logger = dr.Logger extFactory := jaegerstorage.NewFactory() ext, err := extFactory.CreateExtension(ctx, extSet, dr.StorageConfig) if err != nil { return err } - rcvSet := receivertest.NewNopCreateSettings() - rcvSet.TelemetrySettings.Logger = dr.Logger + rcvSet := receiver.CreateSettings{ + ID: storagereceiver.ID, + TelemetrySettings: dr.TelemetrySettings, + } rcvFactory := storagereceiver.NewFactory() rcvCfg := rcvFactory.CreateDefaultConfig().(*storagereceiver.Config) rcvCfg.TraceStorage = dr.TraceStorage diff --git a/cmd/jaeger/internal/integration/integration.go b/cmd/jaeger/internal/integration/integration.go index d4d5b0950c3..d567b025431 100644 --- a/cmd/jaeger/internal/integration/integration.go +++ b/cmd/jaeger/internal/integration/integration.go @@ -11,6 +11,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/provider/fileprovider" "go.opentelemetry.io/collector/otelcol" @@ -56,8 +57,11 @@ func (s *StorageIntegration) newDataReceiver(t *testing.T, factories otelcol.Fac exporterCfg, ok := cfg.Exporters[storageexporter.ID].(*storageexporter.Config) require.True(t, ok, "no jaeger storage exporter found in the config") + telemetrySettings := componenttest.NewNopTelemetrySettings() + telemetrySettings.Logger = tel.Logger() + receiver := datareceivers.NewJaegerStorageDataReceiver( - tel.Logger(), + telemetrySettings, exporterCfg.TraceStorage, storageCfg, ) diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/factory.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/factory.go index 89243e23155..3fcbf422133 100644 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/factory.go +++ b/cmd/jaeger/internal/integration/receivers/storagereceiver/factory.go @@ -14,6 +14,9 @@ import ( // componentType is the name of this extension in configuration. const componentType = component.Type("jaeger_storage_receiver") +// ID is the identifier of this extension. +var ID = component.NewID(componentType) + func NewFactory() receiver.Factory { return receiver.NewFactory( componentType,