From 1d43f01bcde7526ba1e49878a4e614af37d51164 Mon Sep 17 00:00:00 2001 From: Aryan Goyal <137564277+ary82@users.noreply.github.com> Date: Tue, 28 Jan 2025 01:18:43 +0530 Subject: [PATCH 1/3] [storage] Remove usages of GetStorageFactory Signed-off-by: Aryan Goyal <137564277+ary82@users.noreply.github.com> --- .../extension/remotesampling/extension.go | 2 +- .../integration/storagecleaner/extension.go | 2 +- storage_v2/v1adapter/factory.go | 33 +++++++++++++++++++ 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/cmd/jaeger/internal/extension/remotesampling/extension.go b/cmd/jaeger/internal/extension/remotesampling/extension.go index 3caa071595e..fb3294e6d80 100644 --- a/cmd/jaeger/internal/extension/remotesampling/extension.go +++ b/cmd/jaeger/internal/extension/remotesampling/extension.go @@ -184,7 +184,7 @@ func (ext *rsExtension) startFileBasedStrategyProvider(_ context.Context) error func (ext *rsExtension) startAdaptiveStrategyProvider(host component.Host) error { storageName := ext.cfg.Adaptive.SamplingStore - f, err := jaegerstorage.GetStorageFactory(storageName, host) + f, err := jaegerstorage.GetTraceStoreFactory(storageName, host) if err != nil { return fmt.Errorf("cannot find storage factory: %w", err) } diff --git a/cmd/jaeger/internal/integration/storagecleaner/extension.go b/cmd/jaeger/internal/integration/storagecleaner/extension.go index 8d2a8f7bf2b..c8d1142537a 100644 --- a/cmd/jaeger/internal/integration/storagecleaner/extension.go +++ b/cmd/jaeger/internal/integration/storagecleaner/extension.go @@ -45,7 +45,7 @@ func newStorageCleaner(config *Config, telset component.TelemetrySettings) *stor } func (c *storageCleaner) Start(_ context.Context, host component.Host) error { - storageFactory, err := jaegerstorage.GetStorageFactory(c.config.TraceStorage, host) + storageFactory, err := jaegerstorage.GetTraceStoreFactory(c.config.TraceStorage, host) if err != nil { return fmt.Errorf("cannot find storage factory '%s': %w", c.config.TraceStorage, err) } diff --git a/storage_v2/v1adapter/factory.go b/storage_v2/v1adapter/factory.go index 243fccebf65..11392c78ed3 100644 --- a/storage_v2/v1adapter/factory.go +++ b/storage_v2/v1adapter/factory.go @@ -5,9 +5,12 @@ package v1adapter import ( "context" + "errors" "io" + "github.com/jaegertracing/jaeger/pkg/distributedlock" storage_v1 "github.com/jaegertracing/jaeger/storage" + "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage_v2/depstore" "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) @@ -61,3 +64,33 @@ func (f *Factory) CreateDependencyReader() (depstore.Reader, error) { } return NewDependencyReader(dr), nil } + +// CreateLock implements storage_v1.SamplingStoreFactory +func (f *Factory) CreateLock() (distributedlock.Lock, error) { + ss, ok := f.ss.(storage_v1.SamplingStoreFactory) + if !ok { + return nil, errors.New("storage backend does not support sampling store") + } + lock, err := ss.CreateLock() + return lock, err +} + +// CreateSamplingStore implements storage_v1.SamplingStoreFactory +func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) { + ss, ok := f.ss.(storage_v1.SamplingStoreFactory) + if !ok { + return nil, errors.New("storage backend does not support sampling store") + } + store, err := ss.CreateSamplingStore(maxBuckets) + return store, err +} + +// Purge implements storage_v1.Purger +func (f *Factory) Purge(ctx context.Context) error { + p, ok := f.ss.(storage_v1.Purger) + if !ok { + return errors.New("storage backend does not support Purger") + } + err := p.Purge(ctx) + return err +} From 2fe515720abe1893c5baf593dfff1505cbb3bdef Mon Sep 17 00:00:00 2001 From: Aryan Goyal <137564277+ary82@users.noreply.github.com> Date: Wed, 29 Jan 2025 18:50:18 +0530 Subject: [PATCH 2/3] remove GetStorageFactory Signed-off-by: Aryan Goyal <137564277+ary82@users.noreply.github.com> --- .../storageexporter/exporter_test.go | 16 ++++++----- .../extension/jaegerstorage/extension.go | 27 +++++++------------ .../extension/jaegerstorage/extension_test.go | 6 ++--- 3 files changed, 22 insertions(+), 27 deletions(-) diff --git a/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go b/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go index b7542d729cb..fa79f9c9475 100644 --- a/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go +++ b/cmd/jaeger/internal/exporters/storageexporter/exporter_test.go @@ -25,7 +25,8 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/jaegertracing/jaeger/storage" factoryMocks "github.com/jaegertracing/jaeger/storage/mocks" - "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) type mockStorageExt struct { @@ -143,17 +144,20 @@ func TestExporter(t *testing.T) { err = tracesExporter.ConsumeTraces(ctx, traces) require.NoError(t, err) - storageFactory, err := jaegerstorage.GetStorageFactory(memstoreName, host) + storageFactory, err := jaegerstorage.GetTraceStoreFactory(memstoreName, host) require.NoError(t, err) - spanReader, err := storageFactory.CreateSpanReader() + traceReader, err := storageFactory.CreateTraceReader() require.NoError(t, err) requiredTraceID := model.NewTraceID(0, 1) // 00000000000000000000000000000001 - requiredTrace, err := spanReader.GetTrace(ctx, spanstore.GetTraceParameters{TraceID: requiredTraceID}) + requiredTraceIter := traceReader.GetTraces(ctx, tracestore.GetTraceParams{ + TraceID: v1adapter.FromV1TraceID(requiredTraceID), + }) + requiredTrace, err := v1adapter.V1TracesFromSeq2(requiredTraceIter) require.NoError(t, err) - assert.Equal(t, spanID.String(), requiredTrace.Spans[0].SpanID.String()) + assert.Equal(t, spanID.String(), requiredTrace[0].Spans[0].SpanID.String()) // check that the service name attribute was added by the sanitizer - require.Equal(t, "missing-service-name", requiredTrace.Spans[0].Process.ServiceName) + require.Equal(t, "missing-service-name", requiredTrace[0].Spans[0].Process.ServiceName) } func makeStorageExtension(t *testing.T, memstoreName string) component.Host { diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index 53e29a07d7e..516bf683a4b 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -40,23 +40,6 @@ type storageExt struct { metricsFactories map[string]storage.MetricStoreFactory } -// GetStorageFactory locates the extension in Host and retrieves -// a trace storage factory from it with the given name. -func GetStorageFactory(name string, host component.Host) (storage.Factory, error) { - ext, err := findExtension(host) - if err != nil { - return nil, err - } - f, ok := ext.TraceStorageFactory(name) - if !ok { - return nil, fmt.Errorf( - "cannot find definition of storage '%s' in the configuration for extension '%s'", - name, componentType, - ) - } - return f, nil -} - // GetMetricStorageFactory locates the extension in Host and retrieves // a metric storage factory from it with the given name. func GetMetricStorageFactory(name string, host component.Host) (storage.MetricStoreFactory, error) { @@ -75,11 +58,19 @@ func GetMetricStorageFactory(name string, host component.Host) (storage.MetricSt } func GetTraceStoreFactory(name string, host component.Host) (tracestore.Factory, error) { - f, err := GetStorageFactory(name, host) + ext, err := findExtension(host) if err != nil { return nil, err } + f, ok := ext.TraceStorageFactory(name) + if !ok { + return nil, fmt.Errorf( + "cannot find definition of storage '%s' in the configuration for extension '%s'", + name, componentType, + ) + } + return v1adapter.NewFactory(f), nil } diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go index b202ee4bb7e..49e12689c37 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension_test.go @@ -58,13 +58,13 @@ func (e errorFactory) Close() error { } func TestStorageFactoryBadHostError(t *testing.T) { - _, err := GetStorageFactory("something", componenttest.NewNopHost()) + _, err := GetTraceStoreFactory("something", componenttest.NewNopHost()) require.ErrorContains(t, err, "cannot find extension") } func TestStorageFactoryBadNameError(t *testing.T) { host := storagetest.NewStorageHost().WithExtension(ID, startStorageExtension(t, "foo", "")) - _, err := GetStorageFactory("bar", host) + _, err := GetTraceStoreFactory("bar", host) require.ErrorContains(t, err, "cannot find definition of storage 'bar'") } @@ -106,7 +106,7 @@ func TestGetFactory(t *testing.T) { const name = "foo" const metricname = "bar" host := storagetest.NewStorageHost().WithExtension(ID, startStorageExtension(t, name, metricname)) - f, err := GetStorageFactory(name, host) + f, err := GetTraceStoreFactory(name, host) require.NoError(t, err) require.NotNil(t, f) From 8b0bc9a4a22cc3a4b73863c085346d248e7d2135 Mon Sep 17 00:00:00 2001 From: Aryan Goyal <137564277+ary82@users.noreply.github.com> Date: Wed, 29 Jan 2025 19:08:26 +0530 Subject: [PATCH 3/3] fix errors Signed-off-by: Aryan Goyal <137564277+ary82@users.noreply.github.com> --- .../internal/extension/remotesampling/extension.go | 7 +++++++ .../internal/integration/storagecleaner/extension.go | 7 ++++++- storage_v2/v1adapter/factory.go | 11 ++++++++--- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/cmd/jaeger/internal/extension/remotesampling/extension.go b/cmd/jaeger/internal/extension/remotesampling/extension.go index fb3294e6d80..4ae6172b91b 100644 --- a/cmd/jaeger/internal/extension/remotesampling/extension.go +++ b/cmd/jaeger/internal/extension/remotesampling/extension.go @@ -31,6 +31,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/samplingstore" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) var _ extension.Extension = (*rsExtension)(nil) @@ -195,6 +196,9 @@ func (ext *rsExtension) startAdaptiveStrategyProvider(host component.Host) error } store, err := storeFactory.CreateSamplingStore(ext.cfg.Adaptive.AggregationBuckets) + if errors.Is(err, v1adapter.ErrSamplingStoreNotImplemented) { + return fmt.Errorf("storage '%s' does not support sampling store", storageName) + } if err != nil { return fmt.Errorf("failed to create the sampling store: %w", err) } @@ -202,6 +206,9 @@ func (ext *rsExtension) startAdaptiveStrategyProvider(host component.Host) error { lock, err := storeFactory.CreateLock() + if errors.Is(err, v1adapter.ErrSamplingStoreNotImplemented) { + return fmt.Errorf("storage '%s' does not support sampling store", storageName) + } if err != nil { return fmt.Errorf("failed to create the distributed lock: %w", err) } diff --git a/cmd/jaeger/internal/integration/storagecleaner/extension.go b/cmd/jaeger/internal/integration/storagecleaner/extension.go index c8d1142537a..796cba52232 100644 --- a/cmd/jaeger/internal/integration/storagecleaner/extension.go +++ b/cmd/jaeger/internal/integration/storagecleaner/extension.go @@ -19,6 +19,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage" "github.com/jaegertracing/jaeger/storage" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) var ( @@ -55,7 +56,11 @@ func (c *storageCleaner) Start(_ context.Context, host component.Host) error { if !ok { return fmt.Errorf("storage %s does not implement Purger interface", c.config.TraceStorage) } - if err := purger.Purge(httpContext); err != nil { + err = purger.Purge(httpContext) + if errors.Is(err, v1adapter.ErrPurgerNotImplemented) { + return fmt.Errorf("storage %s does not implement Purger interface", c.config.TraceStorage) + } + if err != nil { return fmt.Errorf("error purging storage: %w", err) } return nil diff --git a/storage_v2/v1adapter/factory.go b/storage_v2/v1adapter/factory.go index 11392c78ed3..8acafce0873 100644 --- a/storage_v2/v1adapter/factory.go +++ b/storage_v2/v1adapter/factory.go @@ -15,6 +15,11 @@ import ( "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) +var ( + ErrPurgerNotImplemented = errors.New("storage backend does not support Purger") + ErrSamplingStoreNotImplemented = errors.New("storage backend does not support sampling store") +) + type Factory struct { ss storage_v1.Factory } @@ -69,7 +74,7 @@ func (f *Factory) CreateDependencyReader() (depstore.Reader, error) { func (f *Factory) CreateLock() (distributedlock.Lock, error) { ss, ok := f.ss.(storage_v1.SamplingStoreFactory) if !ok { - return nil, errors.New("storage backend does not support sampling store") + return nil, ErrSamplingStoreNotImplemented } lock, err := ss.CreateLock() return lock, err @@ -79,7 +84,7 @@ func (f *Factory) CreateLock() (distributedlock.Lock, error) { func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) { ss, ok := f.ss.(storage_v1.SamplingStoreFactory) if !ok { - return nil, errors.New("storage backend does not support sampling store") + return nil, ErrSamplingStoreNotImplemented } store, err := ss.CreateSamplingStore(maxBuckets) return store, err @@ -89,7 +94,7 @@ func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, erro func (f *Factory) Purge(ctx context.Context) error { p, ok := f.ss.(storage_v1.Purger) if !ok { - return errors.New("storage backend does not support Purger") + return ErrPurgerNotImplemented } err := p.Purge(ctx) return err