diff --git a/cmd/jaeger/internal/integration/datareceivers/jaegerstorage.go b/cmd/jaeger/internal/integration/datareceivers/jaegerstorage.go index b6037dafb5e..65cebcebbc4 100644 --- a/cmd/jaeger/internal/integration/datareceivers/jaegerstorage.go +++ b/cmd/jaeger/internal/integration/datareceivers/jaegerstorage.go @@ -6,6 +6,7 @@ package datareceivers import ( "context" "fmt" + "time" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest" "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" @@ -15,20 +16,27 @@ import ( "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/receivers/storagereceiver" ) type jaegerStorageDataReceiver struct { + Logger *zap.Logger TraceStorage string StorageConfig *jaegerstorage.Config host *storagetest.StorageHost receiver receiver.Traces } -func NewJaegerStorageDataReceiver(traceStorage string, storageConfig *jaegerstorage.Config) testbed.DataReceiver { +func NewJaegerStorageDataReceiver( + logger *zap.Logger, + traceStorage string, + storageConfig *jaegerstorage.Config, +) testbed.DataReceiver { return &jaegerStorageDataReceiver{ + Logger: logger, TraceStorage: traceStorage, StorageConfig: storageConfig, } @@ -37,19 +45,24 @@ func NewJaegerStorageDataReceiver(traceStorage string, storageConfig *jaegerstor func (dr *jaegerStorageDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, _ consumer.Logs) error { ctx := context.Background() - extFactory := jaegerstorage.NewFactory() - ext, err := extFactory.CreateExtension(ctx, extension.CreateSettings{ + extSet := extension.CreateSettings{ + ID: jaegerstorage.ID, TelemetrySettings: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), - }, dr.StorageConfig) + } + extSet.TelemetrySettings.Logger = dr.Logger + extFactory := jaegerstorage.NewFactory() + ext, err := extFactory.CreateExtension(ctx, extSet, dr.StorageConfig) if err != nil { return err } rcvSet := receivertest.NewNopCreateSettings() + rcvSet.TelemetrySettings.Logger = dr.Logger rcvFactory := storagereceiver.NewFactory() rcvCfg := rcvFactory.CreateDefaultConfig().(*storagereceiver.Config) rcvCfg.TraceStorage = dr.TraceStorage + rcvCfg.PullInterval = 100 * time.Millisecond rcv, err := rcvFactory.CreateTracesReceiver(ctx, rcvSet, rcvCfg, tc) if err != nil { return err diff --git a/cmd/jaeger/internal/integration/integration.go b/cmd/jaeger/internal/integration/integration.go index 4ffe82219b5..205e262ea7b 100644 --- a/cmd/jaeger/internal/integration/integration.go +++ b/cmd/jaeger/internal/integration/integration.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/provider/fileprovider" "go.opentelemetry.io/collector/otelcol" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/jaeger/internal" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/exporters/storageexporter" @@ -31,7 +32,11 @@ type StorageIntegration struct { // Because of that, we need to host another jaeger storage extension // that is a duplication from the collector's extension. And get // the exporter TraceStorage name to set it to receiver TraceStorage. -func (s *StorageIntegration) newDataReceiver(t *testing.T, factories otelcol.Factories) testbed.DataReceiver { +func (s *StorageIntegration) newDataReceiver( + t *testing.T, + logger *zap.Logger, + factories otelcol.Factories, +) testbed.DataReceiver { fmp := fileprovider.New() configProvider, err := otelcol.NewConfigProvider( otelcol.ConfigProviderSettings{ @@ -52,7 +57,11 @@ func (s *StorageIntegration) newDataReceiver(t *testing.T, factories otelcol.Fac exporterCfg, ok := cfg.Exporters[storageexporter.ID].(*storageexporter.Config) require.True(t, ok, "no jaeger storage exporter found in the config") - receiver := datareceivers.NewJaegerStorageDataReceiver(exporterCfg.TraceStorage, storageCfg) + receiver := datareceivers.NewJaegerStorageDataReceiver( + logger, + exporterCfg.TraceStorage, + storageCfg, + ) return receiver } @@ -61,6 +70,9 @@ func (s *StorageIntegration) Test(t *testing.T) { t.Skipf("Integration test against Jaeger-V2 %[1]s skipped; set STORAGE env var to %[1]s to run this", s.Name) } + logger, err := zap.NewDevelopment() + require.NoError(t, err) + factories, err := internal.Components() require.NoError(t, err) @@ -70,7 +82,7 @@ func (s *StorageIntegration) Test(t *testing.T) { "", ) sender := testbed.NewOTLPTraceDataSender(testbed.DefaultHost, 4317) - receiver := s.newDataReceiver(t, factories) + receiver := s.newDataReceiver(t, logger, factories) runner := testbed.NewInProcessCollector(factories) validator := testbed.NewCorrectTestValidator( diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go index ad1c6953347..1e8400660ca 100644 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go +++ b/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver.go @@ -88,8 +88,11 @@ func (r *storageReceiver) consumeLoop(ctx context.Context) error { } func (r *storageReceiver) consumeTraces(ctx context.Context, serviceName string) error { + endTime := time.Now() traces, err := r.spanReader.FindTraces(ctx, &spanstore.TraceQueryParameters{ - ServiceName: serviceName, + ServiceName: serviceName, + StartTimeMin: endTime.Add(-1 * time.Hour), + StartTimeMax: endTime, }) if err != nil { return err diff --git a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver_test.go b/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver_test.go index a996e0bece7..b71d03d56ea 100644 --- a/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver_test.go +++ b/cmd/jaeger/internal/integration/receivers/storagereceiver/receiver_test.go @@ -23,7 +23,6 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage" factoryMocks "github.com/jaegertracing/jaeger/storage/mocks" - "github.com/jaegertracing/jaeger/storage/spanstore" spanStoreMocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" ) @@ -254,13 +253,11 @@ func TestReceiver_StartConsume(t *testing.T) { t.Run(test.name, func(t *testing.T) { reader := new(spanStoreMocks.Reader) reader.On("GetServices", mock.AnythingOfType("*context.cancelCtx")).Return(test.services, nil) - for _, service := range test.services { - reader.On( - "FindTraces", - mock.AnythingOfType("*context.cancelCtx"), - &spanstore.TraceQueryParameters{ServiceName: service}, - ).Return(test.traces, test.tracesErr) - } + reader.On( + "FindTraces", + mock.AnythingOfType("*context.cancelCtx"), + mock.AnythingOfType("*spanstore.TraceQueryParameters"), + ).Return(test.traces, test.tracesErr) r.receiver.spanReader = reader require.NoError(t, r.receiver.Shutdown(ctx))