Skip to content

Commit

Permalink
fix JaegerV2 storage receiver consumeTraces query params
Browse files Browse the repository at this point in the history
Signed-off-by: James Ryans <[email protected]>
  • Loading branch information
james-ryans committed Mar 22, 2024
1 parent c56e724 commit 4cfcb12
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 16 deletions.
21 changes: 17 additions & 4 deletions cmd/jaeger/internal/integration/datareceivers/jaegerstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package datareceivers
import (
"context"
"fmt"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/storagetest"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
Expand All @@ -15,20 +16,27 @@ import (
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/receivers/storagereceiver"
)

type jaegerStorageDataReceiver struct {
Logger *zap.Logger
TraceStorage string
StorageConfig *jaegerstorage.Config
host *storagetest.StorageHost
receiver receiver.Traces
}

func NewJaegerStorageDataReceiver(traceStorage string, storageConfig *jaegerstorage.Config) testbed.DataReceiver {
func NewJaegerStorageDataReceiver(
logger *zap.Logger,
traceStorage string,
storageConfig *jaegerstorage.Config,
) testbed.DataReceiver {
return &jaegerStorageDataReceiver{
Logger: logger,
TraceStorage: traceStorage,
StorageConfig: storageConfig,
}
Expand All @@ -37,19 +45,24 @@ func NewJaegerStorageDataReceiver(traceStorage string, storageConfig *jaegerstor
func (dr *jaegerStorageDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, _ consumer.Logs) error {
ctx := context.Background()

extFactory := jaegerstorage.NewFactory()
ext, err := extFactory.CreateExtension(ctx, extension.CreateSettings{
extSet := extension.CreateSettings{
ID: jaegerstorage.ID,
TelemetrySettings: componenttest.NewNopTelemetrySettings(),
BuildInfo: component.NewDefaultBuildInfo(),
}, dr.StorageConfig)
}
extSet.TelemetrySettings.Logger = dr.Logger
extFactory := jaegerstorage.NewFactory()
ext, err := extFactory.CreateExtension(ctx, extSet, dr.StorageConfig)
if err != nil {
return err
}

rcvSet := receivertest.NewNopCreateSettings()
rcvSet.TelemetrySettings.Logger = dr.Logger
rcvFactory := storagereceiver.NewFactory()
rcvCfg := rcvFactory.CreateDefaultConfig().(*storagereceiver.Config)
rcvCfg.TraceStorage = dr.TraceStorage
rcvCfg.PullInterval = 100 * time.Millisecond
rcv, err := rcvFactory.CreateTracesReceiver(ctx, rcvSet, rcvCfg, tc)
if err != nil {
return err
Expand Down
18 changes: 15 additions & 3 deletions cmd/jaeger/internal/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/provider/fileprovider"
"go.opentelemetry.io/collector/otelcol"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/jaeger/internal"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/exporters/storageexporter"
Expand All @@ -31,7 +32,11 @@ type StorageIntegration struct {
// Because of that, we need to host another jaeger storage extension
// that is a duplication from the collector's extension. And get
// the exporter TraceStorage name to set it to receiver TraceStorage.
func (s *StorageIntegration) newDataReceiver(t *testing.T, factories otelcol.Factories) testbed.DataReceiver {
func (s *StorageIntegration) newDataReceiver(
t *testing.T,
logger *zap.Logger,
factories otelcol.Factories,
) testbed.DataReceiver {
fmp := fileprovider.New()
configProvider, err := otelcol.NewConfigProvider(
otelcol.ConfigProviderSettings{
Expand All @@ -52,7 +57,11 @@ func (s *StorageIntegration) newDataReceiver(t *testing.T, factories otelcol.Fac
exporterCfg, ok := cfg.Exporters[storageexporter.ID].(*storageexporter.Config)
require.True(t, ok, "no jaeger storage exporter found in the config")

receiver := datareceivers.NewJaegerStorageDataReceiver(exporterCfg.TraceStorage, storageCfg)
receiver := datareceivers.NewJaegerStorageDataReceiver(
logger,
exporterCfg.TraceStorage,
storageCfg,
)
return receiver
}

Expand All @@ -61,6 +70,9 @@ func (s *StorageIntegration) Test(t *testing.T) {
t.Skipf("Integration test against Jaeger-V2 %[1]s skipped; set STORAGE env var to %[1]s to run this", s.Name)
}

logger, err := zap.NewDevelopment()
require.NoError(t, err)

factories, err := internal.Components()
require.NoError(t, err)

Expand All @@ -70,7 +82,7 @@ func (s *StorageIntegration) Test(t *testing.T) {
"",
)
sender := testbed.NewOTLPTraceDataSender(testbed.DefaultHost, 4317)
receiver := s.newDataReceiver(t, factories)
receiver := s.newDataReceiver(t, logger, factories)

runner := testbed.NewInProcessCollector(factories)
validator := testbed.NewCorrectTestValidator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ func (r *storageReceiver) consumeLoop(ctx context.Context) error {
}

func (r *storageReceiver) consumeTraces(ctx context.Context, serviceName string) error {
endTime := time.Now()
traces, err := r.spanReader.FindTraces(ctx, &spanstore.TraceQueryParameters{
ServiceName: serviceName,
ServiceName: serviceName,
StartTimeMin: endTime.Add(-1 * time.Hour),
StartTimeMax: endTime,
})
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage"
factoryMocks "github.com/jaegertracing/jaeger/storage/mocks"
"github.com/jaegertracing/jaeger/storage/spanstore"
spanStoreMocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)

Expand Down Expand Up @@ -254,13 +253,11 @@ func TestReceiver_StartConsume(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
reader := new(spanStoreMocks.Reader)
reader.On("GetServices", mock.AnythingOfType("*context.cancelCtx")).Return(test.services, nil)
for _, service := range test.services {
reader.On(
"FindTraces",
mock.AnythingOfType("*context.cancelCtx"),
&spanstore.TraceQueryParameters{ServiceName: service},
).Return(test.traces, test.tracesErr)
}
reader.On(
"FindTraces",
mock.AnythingOfType("*context.cancelCtx"),
mock.AnythingOfType("*spanstore.TraceQueryParameters"),
).Return(test.traces, test.tracesErr)
r.receiver.spanReader = reader

require.NoError(t, r.receiver.Shutdown(ctx))
Expand Down

0 comments on commit 4cfcb12

Please sign in to comment.