From 5497453ea8cb9203bb3ed5c73919bbcd7ab56593 Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Sat, 30 Mar 2024 18:19:16 +0530 Subject: [PATCH 01/18] Intialized ES factory Signed-off-by: Wise-Wizard --- .../storage/integration/elasticsearch_test.go | 130 +++++++++++------- 1 file changed, 78 insertions(+), 52 deletions(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 7a92224d906..afaeb6d69f3 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -29,15 +29,13 @@ import ( "github.com/olivere/elastic" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/sdk/trace/tracetest" - "go.opentelemetry.io/otel/trace" "go.uber.org/zap" estemplate "github.com/jaegertracing/jaeger/pkg/es" eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/testutils" + "github.com/jaegertracing/jaeger/plugin/storage/es" "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore" "github.com/jaegertracing/jaeger/plugin/storage/es/mappings" "github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore" @@ -66,21 +64,22 @@ type ESStorageIntegration struct { v8Client *elasticsearch8.Client bulkProcessor *elastic.BulkProcessor logger *zap.Logger + factory *es.Factory } -func (s *ESStorageIntegration) tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func()) { - exporter := tracetest.NewInMemoryExporter() - tp := sdktrace.NewTracerProvider( - sdktrace.WithSampler(sdktrace.AlwaysSample()), - sdktrace.WithSyncer(exporter), - ) - closer := func() { - if err := tp.Shutdown(context.Background()); err != nil { - s.logger.Error("failed to close tracer", zap.Error(err)) - } - } - return tp, exporter, closer -} +// func (s *ESStorageIntegration) tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func()) { +// exporter := tracetest.NewInMemoryExporter() +// tp := sdktrace.NewTracerProvider( +// sdktrace.WithSampler(sdktrace.AlwaysSample()), +// sdktrace.WithSyncer(exporter), +// ) +// closer := func() { +// if err := tp.Shutdown(context.Background()); err != nil { +// s.logger.Error("failed to close tracer", zap.Error(err)) +// } +// } +// return tp, exporter, closer +// } func (s *ESStorageIntegration) getVersion() (uint, error) { pingResult, _, err := s.client.Ping(queryURL).Do(context.Background()) @@ -101,12 +100,38 @@ func (s *ESStorageIntegration) getVersion() (uint, error) { } func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) error { + // Intialize ES Factory + s.factory = es.NewFactory() + err := s.factory.Initialize(metrics.NullFactory, zap.NewNop()) + if err != nil { + return err + } + + // Create Span Writer and Reader + s.SpanWriter, err = s.factory.CreateSpanWriter() + if err != nil { + return err + } + s.SpanReader, err = s.factory.CreateSpanReader() + if err != nil { + return err + } + + // Create Archive Span Writer and Reader + s.ArchiveSpanWriter, err = s.factory.CreateArchiveSpanWriter() + if err != nil { + return err + } + s.ArchiveSpanReader, err = s.factory.CreateArchiveSpanReader() + if err != nil { + return err + } + rawClient, err := elastic.NewClient( elastic.SetURL(queryURL), elastic.SetSniff(false)) require.NoError(t, err) s.logger, _ = testutils.NewLogger() - s.client = rawClient s.v8Client, err = elasticsearch8.NewClient(elasticsearch8.Config{ Addresses: []string{queryURL}, @@ -185,6 +210,7 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) clientFn := func() estemplate.Client { return client } // Initializing Span Reader and Writer + w := spanstore.NewSpanWriter( spanstore.SpanWriterParams{ Client: clientFn, @@ -197,43 +223,43 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) }) err = w.CreateTemplates(spanMapping, serviceMapping, indexPrefix) require.NoError(t, err) - tracer, _, closer := s.tracerProvider() - defer closer() - s.SpanWriter = w - s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ - Client: clientFn, - Logger: s.logger, - MetricsFactory: metrics.NullFactory, - IndexPrefix: indexPrefix, - MaxSpanAge: maxSpanAge, - TagDotReplacement: tagKeyDeDotChar, - MaxDocCount: defaultMaxDocCount, - Tracer: tracer.Tracer("test"), - Archive: false, - }) + // tracer, _, closer := s.tracerProvider() + // defer closer() + // s.SpanWriter = w + // s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ + // Client: clientFn, + // Logger: s.logger, + // MetricsFactory: metrics.NullFactory, + // IndexPrefix: indexPrefix, + // MaxSpanAge: maxSpanAge, + // TagDotReplacement: tagKeyDeDotChar, + // MaxDocCount: defaultMaxDocCount, + // Tracer: tracer.Tracer("test"), + // Archive: false, + // }) // Initializing Archive Span Reader and Writer - s.ArchiveSpanWriter = spanstore.NewSpanWriter( - spanstore.SpanWriterParams{ - Client: clientFn, - Logger: s.logger, - MetricsFactory: metrics.NullFactory, - IndexPrefix: indexPrefix, - AllTagsAsFields: allTagsAsFields, - TagDotReplacement: tagKeyDeDotChar, - Archive: true, - }) - s.ArchiveSpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ - Client: clientFn, - Logger: s.logger, - MetricsFactory: metrics.NullFactory, - IndexPrefix: indexPrefix, - MaxSpanAge: maxSpanAge, - TagDotReplacement: tagKeyDeDotChar, - MaxDocCount: defaultMaxDocCount, - Tracer: tracer.Tracer("test"), - Archive: true, - }) + // s.ArchiveSpanWriter = spanstore.NewSpanWriter( + // spanstore.SpanWriterParams{ + // Client: clientFn, + // Logger: s.logger, + // MetricsFactory: metrics.NullFactory, + // IndexPrefix: indexPrefix, + // AllTagsAsFields: allTagsAsFields, + // TagDotReplacement: tagKeyDeDotChar, + // Archive: true, + // }) + // s.ArchiveSpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ + // Client: clientFn, + // Logger: s.logger, + // MetricsFactory: metrics.NullFactory, + // IndexPrefix: indexPrefix, + // MaxSpanAge: maxSpanAge, + // TagDotReplacement: tagKeyDeDotChar, + // MaxDocCount: defaultMaxDocCount, + // Tracer: tracer.Tracer("test"), + // Archive: true, + // }) dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{ Client: clientFn, From 86cb9a63242b74e0246a19c2ce4e91e4017d85f9 Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Sat, 30 Mar 2024 20:20:34 +0530 Subject: [PATCH 02/18] Initialized with Viper Signed-off-by: Wise-Wizard --- .../storage/integration/elasticsearch_test.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index afaeb6d69f3..281f81bfa96 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/pkg/config" estemplate "github.com/jaegertracing/jaeger/pkg/es" eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper" "github.com/jaegertracing/jaeger/pkg/metrics" @@ -100,29 +101,32 @@ func (s *ESStorageIntegration) getVersion() (uint, error) { } func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) error { - // Intialize ES Factory - s.factory = es.NewFactory() - err := s.factory.Initialize(metrics.NullFactory, zap.NewNop()) + // Initialize ES Factory + f := es.NewFactory() + v, _ := config.Viperize(f.AddFlags) + f.InitFromViper(v, zap.NewNop()) + err := f.Initialize(metrics.NullFactory, s.logger) if err != nil { return err } + s.factory = f // Create Span Writer and Reader - s.SpanWriter, err = s.factory.CreateSpanWriter() + s.SpanWriter, err = f.CreateSpanWriter() if err != nil { return err } - s.SpanReader, err = s.factory.CreateSpanReader() + s.SpanReader, err = f.CreateSpanReader() if err != nil { return err } // Create Archive Span Writer and Reader - s.ArchiveSpanWriter, err = s.factory.CreateArchiveSpanWriter() + s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() if err != nil { return err } - s.ArchiveSpanReader, err = s.factory.CreateArchiveSpanReader() + s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() if err != nil { return err } @@ -260,7 +264,6 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) // Tracer: tracer.Tracer("test"), // Archive: true, // }) - dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{ Client: clientFn, Logger: s.logger, From 5b3d5466d9dbe9814a03a285540718404390282a Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Sat, 30 Mar 2024 21:13:22 +0530 Subject: [PATCH 03/18] Debugging Signed-off-by: Wise-Wizard --- .../storage/integration/elasticsearch_test.go | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 281f81bfa96..fffdc4e54e1 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -65,7 +65,6 @@ type ESStorageIntegration struct { v8Client *elasticsearch8.Client bulkProcessor *elastic.BulkProcessor logger *zap.Logger - factory *es.Factory } // func (s *ESStorageIntegration) tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func()) { @@ -101,16 +100,26 @@ func (s *ESStorageIntegration) getVersion() (uint, error) { } func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) error { + rawClient, err := elastic.NewClient( + elastic.SetURL(queryURL), + elastic.SetSniff(false)) + require.NoError(t, err) + s.logger, _ = testutils.NewLogger() + s.client = rawClient + s.v8Client, err = elasticsearch8.NewClient(elasticsearch8.Config{ + Addresses: []string{queryURL}, + DiscoverNodesOnStart: false, + }) + require.NoError(t, err) + // Initialize ES Factory f := es.NewFactory() v, _ := config.Viperize(f.AddFlags) f.InitFromViper(v, zap.NewNop()) - err := f.Initialize(metrics.NullFactory, s.logger) + err = f.Initialize(metrics.NullFactory, s.logger) if err != nil { return err } - s.factory = f - // Create Span Writer and Reader s.SpanWriter, err = f.CreateSpanWriter() if err != nil { @@ -131,18 +140,6 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) return err } - rawClient, err := elastic.NewClient( - elastic.SetURL(queryURL), - elastic.SetSniff(false)) - require.NoError(t, err) - s.logger, _ = testutils.NewLogger() - s.client = rawClient - s.v8Client, err = elasticsearch8.NewClient(elasticsearch8.Config{ - Addresses: []string{queryURL}, - DiscoverNodesOnStart: false, - }) - require.NoError(t, err) - s.initSpanstore(t, allTagsAsFields) s.initSamplingStore(t) From 56e025e8a2c3752185c50a1fb419e965ec8b564b Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Sat, 30 Mar 2024 21:39:54 +0530 Subject: [PATCH 04/18] Debugging Signed-off-by: Wise-Wizard --- plugin/storage/integration/elasticsearch_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index fffdc4e54e1..cdbc9f59cf5 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -151,7 +151,7 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) s.esCleanUp(t, allTagsAsFields) // TODO: remove this flag after ES support returning spanKind when get operations s.GetOperationsMissingSpanKind = true - s.SkipArchiveTest = false + s.SkipArchiveTest = true return nil } From ae967dbd98a64ab4ecc4c71d7ade87230d06f0ed Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Sat, 30 Mar 2024 22:45:56 +0530 Subject: [PATCH 05/18] Debugging-3 Signed-off-by: Wise-Wizard --- .../storage/integration/elasticsearch_test.go | 22 +++++++++---------- plugin/storage/integration/integration.go | 1 + 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index cdbc9f59cf5..7c97509c43f 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -130,16 +130,6 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) return err } - // Create Archive Span Writer and Reader - s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() - if err != nil { - return err - } - s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() - if err != nil { - return err - } - s.initSpanstore(t, allTagsAsFields) s.initSamplingStore(t) @@ -151,7 +141,17 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) s.esCleanUp(t, allTagsAsFields) // TODO: remove this flag after ES support returning spanKind when get operations s.GetOperationsMissingSpanKind = true - s.SkipArchiveTest = true + s.SkipArchiveTest = false + // Create Archive Span Writer and Reader + s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() + if err != nil { + return err + } + s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() + if err != nil { + return err + } + return nil } diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 01539da2845..86d3916577a 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -225,6 +225,7 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { var err error actual, err = s.SpanReader.GetOperations(context.Background(), spanstore.OperationQueryParameters{ServiceName: "example-service-1"}) + t.Log("Actual:", actual) require.NoError(t, err) sort.Slice(actual, func(i, j int) bool { return actual[i].Name < actual[j].Name From 1dc9cec968061faa19785df1f1a7c93e28f23ab6 Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Mon, 1 Apr 2024 09:23:05 +0530 Subject: [PATCH 06/18] Initialized with ES Config Signed-off-by: Wise-Wizard --- .../storage/integration/elasticsearch_test.go | 23 ++++++++----------- plugin/storage/integration/integration.go | 1 - 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index a559cd998d9..7131b335827 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -30,7 +30,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" - "github.com/jaegertracing/jaeger/pkg/config" estemplate "github.com/jaegertracing/jaeger/pkg/es" eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper" "github.com/jaegertracing/jaeger/pkg/metrics" @@ -112,10 +111,7 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) require.NoError(t, err) // Initialize ES Factory - f := es.NewFactory() - v, _ := config.Viperize(f.AddFlags) - f.InitFromViper(v, zap.NewNop()) - err = f.Initialize(metrics.NullFactory, s.logger) + f, err := es.NewFactoryWithConfig(es.NewFactory().Options.Primary.Configuration, metrics.NullFactory, s.logger) if err != nil { return err } @@ -128,7 +124,14 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) if err != nil { return err } - + s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() + if err != nil { + return err + } + s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() + if err != nil { + return err + } s.initSpanstore(t, allTagsAsFields) s.initSamplingStore(t) @@ -141,14 +144,6 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) s.GetOperationsMissingSpanKind = true s.SkipArchiveTest = false // Create Archive Span Writer and Reader - s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() - if err != nil { - return err - } - s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() - if err != nil { - return err - } return nil } diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 809be291a54..506900e8ee3 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -243,7 +243,6 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) { var err error actual, err = s.SpanReader.GetOperations(context.Background(), spanstore.OperationQueryParameters{ServiceName: "example-service-1"}) - t.Log("Actual:", actual) require.NoError(t, err) sort.Slice(actual, func(i, j int) bool { return actual[i].Name < actual[j].Name From 21c464f947b58696fb433df79112ac45c9977713 Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Mon, 1 Apr 2024 19:50:26 +0530 Subject: [PATCH 07/18] Initialized Configuration Signed-off-by: Wise-Wizard --- .../storage/integration/elasticsearch_test.go | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 7131b335827..c130575f913 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -38,7 +38,6 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore" "github.com/jaegertracing/jaeger/plugin/storage/es/mappings" "github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore" - "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore" ) const ( @@ -111,7 +110,12 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) require.NoError(t, err) // Initialize ES Factory - f, err := es.NewFactoryWithConfig(es.NewFactory().Options.Primary.Configuration, metrics.NullFactory, s.logger) + cfg := es.NewFactory().Options.Primary.Configuration + cfg.MaxSpanAge = maxSpanAge + cfg.Tags.AllAsFields = allTagsAsFields + cfg.Tags.DotReplacement = tagKeyDeDotChar + cfg.IndexPrefix = indexPrefix + f, err := es.NewFactoryWithConfig(cfg, metrics.NullFactory, s.logger) if err != nil { return err } @@ -143,7 +147,6 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) // TODO: remove this flag after ES support returning spanKind when get operations s.GetOperationsMissingSpanKind = true s.SkipArchiveTest = false - // Create Archive Span Writer and Reader return nil } @@ -199,24 +202,24 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) IndexPrefix: indexPrefix, UseILM: false, } - spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings() - require.NoError(t, err) + // spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings() + // require.NoError(t, err) clientFn := func() estemplate.Client { return client } // Initializing Span Reader and Writer - w := spanstore.NewSpanWriter( - spanstore.SpanWriterParams{ - Client: clientFn, - Logger: s.logger, - MetricsFactory: metrics.NullFactory, - IndexPrefix: indexPrefix, - AllTagsAsFields: allTagsAsFields, - TagDotReplacement: tagKeyDeDotChar, - Archive: false, - }) - err = w.CreateTemplates(spanMapping, serviceMapping, indexPrefix) - require.NoError(t, err) + // w := spanstore.NewSpanWriter( + // spanstore.SpanWriterParams{ + // Client: clientFn, + // Logger: s.logger, + // MetricsFactory: metrics.NullFactory, + // IndexPrefix: indexPrefix, + // AllTagsAsFields: allTagsAsFields, + // TagDotReplacement: tagKeyDeDotChar, + // Archive: false, + // }) + // err = w.CreateTemplates(spanMapping, serviceMapping, indexPrefix) + // require.NoError(t, err) // tracer, _, closer := s.tracerProvider() // defer closer() // s.SpanWriter = w From 7e42382b22210c72ab400dcf495c48bf9843b032 Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Tue, 2 Apr 2024 12:33:53 +0530 Subject: [PATCH 08/18] Removed New Factory Signed-off-by: Wise-Wizard --- plugin/storage/integration/elasticsearch_test.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index c130575f913..1e458947fb9 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -53,6 +53,8 @@ const ( spanTemplateName = "jaeger-span" serviceTemplateName = "jaeger-service" dependenciesTemplateName = "jaeger-dependencies" + primaryNamespace = "es" + archiveNamespace = "es-archive" ) type ESStorageIntegration struct { @@ -109,9 +111,12 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) }) require.NoError(t, err) - // Initialize ES Factory - cfg := es.NewFactory().Options.Primary.Configuration + //Initialize ES Factory + opts := es.NewOptions(primaryNamespace, archiveNamespace) + cfg := opts.Primary.Configuration + cfg.MaxSpanAge = maxSpanAge + cfg.MaxDocCount = defaultMaxDocCount cfg.Tags.AllAsFields = allTagsAsFields cfg.Tags.DotReplacement = tagKeyDeDotChar cfg.IndexPrefix = indexPrefix From 71322eaace5b960a6844c0e1d926834ed7483d58 Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Tue, 2 Apr 2024 12:43:55 +0530 Subject: [PATCH 09/18] Fix Lint Errors Signed-off-by: Wise-Wizard --- plugin/storage/integration/elasticsearch_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 1e458947fb9..fdc28126e56 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -111,7 +111,7 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) }) require.NoError(t, err) - //Initialize ES Factory + // Initialize ES Factory opts := es.NewOptions(primaryNamespace, archiveNamespace) cfg := opts.Primary.Configuration From 771cabe07f436edec626aa055e8863ad1c34a0b5 Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Tue, 2 Apr 2024 12:50:23 +0530 Subject: [PATCH 10/18] Removed Unneccessary Initializations Signed-off-by: Wise-Wizard --- plugin/storage/integration/elasticsearch_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index fdc28126e56..52b5542e0e6 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -115,11 +115,6 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) opts := es.NewOptions(primaryNamespace, archiveNamespace) cfg := opts.Primary.Configuration - cfg.MaxSpanAge = maxSpanAge - cfg.MaxDocCount = defaultMaxDocCount - cfg.Tags.AllAsFields = allTagsAsFields - cfg.Tags.DotReplacement = tagKeyDeDotChar - cfg.IndexPrefix = indexPrefix f, err := es.NewFactoryWithConfig(cfg, metrics.NullFactory, s.logger) if err != nil { return err From 823a14483fa8dc9b14cff0aafb783812063f8883 Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Thu, 4 Apr 2024 17:31:32 +0530 Subject: [PATCH 11/18] Created a Dummy Fix to ES Factory Signed-off-by: Wise-Wizard --- jaeger-ui | 2 +- plugin/storage/es/factory.go | 36 ++++++++ .../storage/integration/elasticsearch_test.go | 86 ++++++++++--------- 3 files changed, 81 insertions(+), 43 deletions(-) diff --git a/jaeger-ui b/jaeger-ui index f8a4ece2c01..f704b56fbc8 160000 --- a/jaeger-ui +++ b/jaeger-ui @@ -1 +1 @@ -Subproject commit f8a4ece2c01769ac1126019fdca489f73ff4a1e8 +Subproject commit f704b56fbc8dedf3b7f1e23884db8108157598db diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 8a6866c74ae..30820ef57da 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -31,6 +31,7 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/es" + estemplate "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/fswatcher" "github.com/jaegertracing/jaeger/pkg/metrics" @@ -85,6 +86,41 @@ func NewFactory() *Factory { } } +func NewFactoryWithConfigTest( + cfg config.Configuration, + metricsFactory metrics.Factory, + logger *zap.Logger, client estemplate.Client, +) (*Factory, error) { + if err := cfg.Validate(); err != nil { + return nil, err + } + + cfg.MaxDocCount = defaultMaxDocCount + cfg.Enabled = true + + archive := make(map[string]*namespaceConfig) + archive[archiveNamespace] = &namespaceConfig{ + Configuration: cfg, + namespace: archiveNamespace, + } + + f := NewFactory() + f.InitFromOptions(Options{ + Primary: namespaceConfig{ + Configuration: cfg, + namespace: primaryNamespace, + }, + others: archive, + }) + err := f.Initialize(metricsFactory, logger) + if err != nil { + return nil, err + } + + f.primaryClient.Store(&client) + return f, nil +} + func NewFactoryWithConfig( cfg config.Configuration, metricsFactory metrics.Factory, diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 52b5542e0e6..0f3a68661c8 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -28,6 +28,9 @@ import ( "github.com/olivere/elastic" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" estemplate "github.com/jaegertracing/jaeger/pkg/es" @@ -66,19 +69,19 @@ type ESStorageIntegration struct { logger *zap.Logger } -// func (s *ESStorageIntegration) tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func()) { -// exporter := tracetest.NewInMemoryExporter() -// tp := sdktrace.NewTracerProvider( -// sdktrace.WithSampler(sdktrace.AlwaysSample()), -// sdktrace.WithSyncer(exporter), -// ) -// closer := func() { -// if err := tp.Shutdown(context.Background()); err != nil { -// s.logger.Error("failed to close tracer", zap.Error(err)) -// } -// } -// return tp, exporter, closer -// } +func (s *ESStorageIntegration) tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func()) { + exporter := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithSyncer(exporter), + ) + closer := func() { + if err := tp.Shutdown(context.Background()); err != nil { + s.logger.Error("failed to close tracer", zap.Error(err)) + } + } + return tp, exporter, closer +} func (s *ESStorageIntegration) getVersion() (uint, error) { pingResult, _, err := s.client.Ping(queryURL).Do(context.Background()) @@ -98,12 +101,13 @@ func (s *ESStorageIntegration) getVersion() (uint, error) { return uint(esVersion), nil } -func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) error { +func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) { rawClient, err := elastic.NewClient( elastic.SetURL(queryURL), elastic.SetSniff(false)) require.NoError(t, err) s.logger, _ = testutils.NewLogger() + s.client = rawClient s.v8Client, err = elasticsearch8.NewClient(elasticsearch8.Config{ Addresses: []string{queryURL}, @@ -111,31 +115,6 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) }) require.NoError(t, err) - // Initialize ES Factory - opts := es.NewOptions(primaryNamespace, archiveNamespace) - cfg := opts.Primary.Configuration - - f, err := es.NewFactoryWithConfig(cfg, metrics.NullFactory, s.logger) - if err != nil { - return err - } - // Create Span Writer and Reader - s.SpanWriter, err = f.CreateSpanWriter() - if err != nil { - return err - } - s.SpanReader, err = f.CreateSpanReader() - if err != nil { - return err - } - s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() - if err != nil { - return err - } - s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() - if err != nil { - return err - } s.initSpanstore(t, allTagsAsFields) s.initSamplingStore(t) @@ -147,8 +126,6 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool) // TODO: remove this flag after ES support returning spanKind when get operations s.GetOperationsMissingSpanKind = true s.SkipArchiveTest = false - - return nil } func (s *ESStorageIntegration) esCleanUp(t *testing.T, allTagsAsFields bool) { @@ -206,8 +183,32 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) // require.NoError(t, err) clientFn := func() estemplate.Client { return client } - // Initializing Span Reader and Writer + opts := es.NewOptions(primaryNamespace, archiveNamespace) + cfg := opts.Primary.Configuration + f, err := es.NewFactoryWithConfigTest(cfg, metrics.NullFactory, s.logger, client) + if err != nil { + return err + } + // Create Span Writer and Reader + s.SpanWriter, err = f.CreateSpanWriter() + if err != nil { + return err + } + s.SpanReader, err = f.CreateSpanReader() + if err != nil { + return err + } + s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() + if err != nil { + return err + } + s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() + if err != nil { + return err + } + + // Initializing Span Reader and Writer // w := spanstore.NewSpanWriter( // spanstore.SpanWriterParams{ // Client: clientFn, @@ -257,6 +258,7 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) // Tracer: tracer.Tracer("test"), // Archive: true, // }) + dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{ Client: clientFn, Logger: s.logger, From 2c999fc1b522f169989f8e76ac12f91c0a036884 Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Thu, 4 Apr 2024 17:34:26 +0530 Subject: [PATCH 12/18] Fix Submodule Signed-off-by: Wise-Wizard --- jaeger-ui | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jaeger-ui b/jaeger-ui index f704b56fbc8..f8a4ece2c01 160000 --- a/jaeger-ui +++ b/jaeger-ui @@ -1 +1 @@ -Subproject commit f704b56fbc8dedf3b7f1e23884db8108157598db +Subproject commit f8a4ece2c01769ac1126019fdca489f73ff4a1e8 From 32262d4703d5530e3fcb49deead458d73c1c8349 Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Thu, 4 Apr 2024 18:48:04 +0530 Subject: [PATCH 13/18] Fix Lint Signed-off-by: Wise-Wizard --- .../storage/integration/elasticsearch_test.go | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 0f3a68661c8..ed4626d3199 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -28,9 +28,6 @@ import ( "github.com/olivere/elastic" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/sdk/trace/tracetest" - "go.opentelemetry.io/otel/trace" "go.uber.org/zap" estemplate "github.com/jaegertracing/jaeger/pkg/es" @@ -69,19 +66,19 @@ type ESStorageIntegration struct { logger *zap.Logger } -func (s *ESStorageIntegration) tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func()) { - exporter := tracetest.NewInMemoryExporter() - tp := sdktrace.NewTracerProvider( - sdktrace.WithSampler(sdktrace.AlwaysSample()), - sdktrace.WithSyncer(exporter), - ) - closer := func() { - if err := tp.Shutdown(context.Background()); err != nil { - s.logger.Error("failed to close tracer", zap.Error(err)) - } - } - return tp, exporter, closer -} +// func (s *ESStorageIntegration) tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func()) { +// exporter := tracetest.NewInMemoryExporter() +// tp := sdktrace.NewTracerProvider( +// sdktrace.WithSampler(sdktrace.AlwaysSample()), +// sdktrace.WithSyncer(exporter), +// ) +// closer := func() { +// if err := tp.Shutdown(context.Background()); err != nil { +// s.logger.Error("failed to close tracer", zap.Error(err)) +// } +// } +// return tp, exporter, closer +// } func (s *ESStorageIntegration) getVersion() (uint, error) { pingResult, _, err := s.client.Ping(queryURL).Do(context.Background()) From f691324c06ec9c3128103cfc345b597de8106a43 Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Fri, 5 Apr 2024 12:24:31 +0530 Subject: [PATCH 14/18] Fix failing CI Signed-off-by: Wise-Wizard --- plugin/storage/integration/elasticsearch_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index ed4626d3199..319428728e2 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -182,7 +182,7 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) opts := es.NewOptions(primaryNamespace, archiveNamespace) cfg := opts.Primary.Configuration - + cfg.IndexPrefix = indexPrefix f, err := es.NewFactoryWithConfigTest(cfg, metrics.NullFactory, s.logger, client) if err != nil { return err From e906787b68c16578bdd48bda56ffeebad980bb94 Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Fri, 5 Apr 2024 17:11:27 +0530 Subject: [PATCH 15/18] Added All Tags as Field Bool Signed-off-by: Wise-Wizard --- plugin/storage/integration/elasticsearch_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 319428728e2..c755789d9e3 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -183,6 +183,7 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) opts := es.NewOptions(primaryNamespace, archiveNamespace) cfg := opts.Primary.Configuration cfg.IndexPrefix = indexPrefix + cfg.Tags.AllAsFields = allTagsAsFields f, err := es.NewFactoryWithConfigTest(cfg, metrics.NullFactory, s.logger, client) if err != nil { return err From a3f2e685325da1e270da25bd2c5cbc8e29300e9b Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Fri, 5 Apr 2024 17:13:18 +0530 Subject: [PATCH 16/18] Added All Tags as Field Bool Signed-off-by: Wise-Wizard --- plugin/storage/es/factory.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index 30820ef57da..cc8c71b1d8b 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -91,9 +91,7 @@ func NewFactoryWithConfigTest( metricsFactory metrics.Factory, logger *zap.Logger, client estemplate.Client, ) (*Factory, error) { - if err := cfg.Validate(); err != nil { - return nil, err - } + cfg.Validate() cfg.MaxDocCount = defaultMaxDocCount cfg.Enabled = true @@ -112,10 +110,7 @@ func NewFactoryWithConfigTest( }, others: archive, }) - err := f.Initialize(metricsFactory, logger) - if err != nil { - return nil, err - } + f.Initialize(metricsFactory, logger) f.primaryClient.Store(&client) return f, nil From 9ff6012e8211d078bb9abfe0778d9aa7d982a860 Mon Sep 17 00:00:00 2001 From: Wise-Wizard Date: Fri, 5 Apr 2024 23:26:04 +0530 Subject: [PATCH 17/18] Removed Comments Signed-off-by: Wise-Wizard --- .../storage/integration/elasticsearch_test.go | 67 +------------------ 1 file changed, 1 insertion(+), 66 deletions(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index c755789d9e3..e1276b73106 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -66,19 +66,6 @@ type ESStorageIntegration struct { logger *zap.Logger } -// func (s *ESStorageIntegration) tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func()) { -// exporter := tracetest.NewInMemoryExporter() -// tp := sdktrace.NewTracerProvider( -// sdktrace.WithSampler(sdktrace.AlwaysSample()), -// sdktrace.WithSyncer(exporter), -// ) -// closer := func() { -// if err := tp.Shutdown(context.Background()); err != nil { -// s.logger.Error("failed to close tracer", zap.Error(err)) -// } -// } -// return tp, exporter, closer -// } func (s *ESStorageIntegration) getVersion() (uint, error) { pingResult, _, err := s.client.Ping(queryURL).Do(context.Background()) @@ -176,8 +163,7 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) IndexPrefix: indexPrefix, UseILM: false, } - // spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings() - // require.NoError(t, err) + clientFn := func() estemplate.Client { return client } opts := es.NewOptions(primaryNamespace, archiveNamespace) @@ -206,57 +192,6 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) return err } - // Initializing Span Reader and Writer - // w := spanstore.NewSpanWriter( - // spanstore.SpanWriterParams{ - // Client: clientFn, - // Logger: s.logger, - // MetricsFactory: metrics.NullFactory, - // IndexPrefix: indexPrefix, - // AllTagsAsFields: allTagsAsFields, - // TagDotReplacement: tagKeyDeDotChar, - // Archive: false, - // }) - // err = w.CreateTemplates(spanMapping, serviceMapping, indexPrefix) - // require.NoError(t, err) - // tracer, _, closer := s.tracerProvider() - // defer closer() - // s.SpanWriter = w - // s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ - // Client: clientFn, - // Logger: s.logger, - // MetricsFactory: metrics.NullFactory, - // IndexPrefix: indexPrefix, - // MaxSpanAge: maxSpanAge, - // TagDotReplacement: tagKeyDeDotChar, - // MaxDocCount: defaultMaxDocCount, - // Tracer: tracer.Tracer("test"), - // Archive: false, - // }) - - // Initializing Archive Span Reader and Writer - // s.ArchiveSpanWriter = spanstore.NewSpanWriter( - // spanstore.SpanWriterParams{ - // Client: clientFn, - // Logger: s.logger, - // MetricsFactory: metrics.NullFactory, - // IndexPrefix: indexPrefix, - // AllTagsAsFields: allTagsAsFields, - // TagDotReplacement: tagKeyDeDotChar, - // Archive: true, - // }) - // s.ArchiveSpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ - // Client: clientFn, - // Logger: s.logger, - // MetricsFactory: metrics.NullFactory, - // IndexPrefix: indexPrefix, - // MaxSpanAge: maxSpanAge, - // TagDotReplacement: tagKeyDeDotChar, - // MaxDocCount: defaultMaxDocCount, - // Tracer: tracer.Tracer("test"), - // Archive: true, - // }) - dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{ Client: clientFn, Logger: s.logger, From 2863e8d4a4bf7113b1c1899ceab5758ba910d480 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 5 Apr 2024 20:21:12 -0400 Subject: [PATCH 18/18] simplify Signed-off-by: Yuri Shkuro --- plugin/storage/es/factory.go | 31 ------- .../storage/integration/elasticsearch_test.go | 80 ++++++++----------- plugin/storage/integration/integration.go | 20 +++-- 3 files changed, 48 insertions(+), 83 deletions(-) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index cc8c71b1d8b..8a6866c74ae 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -31,7 +31,6 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/es" - estemplate "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/pkg/es/config" "github.com/jaegertracing/jaeger/pkg/fswatcher" "github.com/jaegertracing/jaeger/pkg/metrics" @@ -86,36 +85,6 @@ func NewFactory() *Factory { } } -func NewFactoryWithConfigTest( - cfg config.Configuration, - metricsFactory metrics.Factory, - logger *zap.Logger, client estemplate.Client, -) (*Factory, error) { - cfg.Validate() - - cfg.MaxDocCount = defaultMaxDocCount - cfg.Enabled = true - - archive := make(map[string]*namespaceConfig) - archive[archiveNamespace] = &namespaceConfig{ - Configuration: cfg, - namespace: archiveNamespace, - } - - f := NewFactory() - f.InitFromOptions(Options{ - Primary: namespaceConfig{ - Configuration: cfg, - namespace: primaryNamespace, - }, - others: archive, - }) - f.Initialize(metricsFactory, logger) - - f.primaryClient.Store(&client) - return f, nil -} - func NewFactoryWithConfig( cfg config.Configuration, metricsFactory metrics.Factory, diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index e1276b73106..aa97184485f 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -18,6 +18,7 @@ package integration import ( "context" "errors" + "fmt" "net/http" "strconv" "strings" @@ -29,15 +30,17 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "github.com/jaegertracing/jaeger/pkg/config" estemplate "github.com/jaegertracing/jaeger/pkg/es" eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es" - "github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore" "github.com/jaegertracing/jaeger/plugin/storage/es/mappings" "github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore" + "github.com/jaegertracing/jaeger/storage/dependencystore" ) const ( @@ -66,7 +69,6 @@ type ESStorageIntegration struct { logger *zap.Logger } - func (s *ESStorageIntegration) getVersion() (uint, error) { pingResult, _, err := s.client.Ping(queryURL).Do(context.Background()) if err != nil { @@ -153,60 +155,44 @@ func (s *ESStorageIntegration) getEsClient(t *testing.T) eswrapper.ClientWrapper return eswrapper.WrapESClient(s.client, bp, esVersion, s.v8Client) } -func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) error { - client := s.getEsClient(t) - mappingBuilder := mappings.MappingBuilder{ - TemplateBuilder: estemplate.TextTemplateBuilder{}, - Shards: 5, - Replicas: 1, - EsVersion: client.GetVersion(), - IndexPrefix: indexPrefix, - UseILM: false, +func (s *ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields bool) *es.Factory { + s.logger = zaptest.NewLogger(t) + f := es.NewFactory() + v, command := config.Viperize(f.AddFlags) + args := []string{ + fmt.Sprintf("--es.tags-as-fields.all=%v", allTagsAsFields), + fmt.Sprintf("--es.index-prefix=%v", indexPrefix), + "--es-archive.enabled=true", + fmt.Sprintf("--es-archive.tags-as-fields.all=%v", allTagsAsFields), + fmt.Sprintf("--es-archive.index-prefix=%v", indexPrefix), } + require.NoError(t, command.ParseFlags(args)) + f.InitFromViper(v, s.logger) + require.NoError(t, f.Initialize(metrics.NullFactory, s.logger)) - clientFn := func() estemplate.Client { return client } + // TODO ideally we need to close the factory once the test is finished + // but because esCleanup calls initialize() we get a panic later + // t.Cleanup(func() { + // require.NoError(t, f.Close()) + // }) + return f +} - opts := es.NewOptions(primaryNamespace, archiveNamespace) - cfg := opts.Primary.Configuration - cfg.IndexPrefix = indexPrefix - cfg.Tags.AllAsFields = allTagsAsFields - f, err := es.NewFactoryWithConfigTest(cfg, metrics.NullFactory, s.logger, client) - if err != nil { - return err - } - // Create Span Writer and Reader +func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool) { + f := s.initializeESFactory(t, allTagsAsFields) + var err error s.SpanWriter, err = f.CreateSpanWriter() - if err != nil { - return err - } + require.NoError(t, err) s.SpanReader, err = f.CreateSpanReader() - if err != nil { - return err - } + require.NoError(t, err) s.ArchiveSpanReader, err = f.CreateArchiveSpanReader() - if err != nil { - return err - } + require.NoError(t, err) s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter() - if err != nil { - return err - } - - dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{ - Client: clientFn, - Logger: s.logger, - IndexPrefix: indexPrefix, - IndexDateLayout: indexDateLayout, - MaxDocCount: defaultMaxDocCount, - }) - - depMapping, err := mappingBuilder.GetDependenciesMappings() require.NoError(t, err) - err = dependencyStore.CreateTemplates(depMapping) + + s.DependencyReader, err = f.CreateDependencyReader() require.NoError(t, err) - s.DependencyReader = dependencyStore - s.DependencyWriter = dependencyStore - return nil + s.DependencyWriter = s.DependencyReader.(dependencystore.Writer) } func (s *ESStorageIntegration) esRefresh(t *testing.T) { diff --git a/plugin/storage/integration/integration.go b/plugin/storage/integration/integration.go index 506900e8ee3..d6edc3a8abf 100644 --- a/plugin/storage/integration/integration.go +++ b/plugin/storage/integration/integration.go @@ -460,12 +460,22 @@ func (s *StorageIntegration) testGetDependencies(t *testing.T) { require.NoError(t, s.DependencyWriter.WriteDependencies(time.Now(), expected)) s.refresh(t) - actual, err := s.DependencyReader.GetDependencies(context.Background(), time.Now(), 5*time.Minute) - require.NoError(t, err) - sort.Slice(actual, func(i, j int) bool { - return actual[i].Parent < actual[j].Parent + + var actual []model.DependencyLink + found := s.waitForCondition(t, func(t *testing.T) bool { + var err error + actual, err = s.DependencyReader.GetDependencies(context.Background(), time.Now(), 5*time.Minute) + require.NoError(t, err) + sort.Slice(actual, func(i, j int) bool { + return actual[i].Parent < actual[j].Parent + }) + return assert.ObjectsAreEqualValues(expected, actual) }) - assert.EqualValues(t, expected, actual) + + if !assert.True(t, found) { + t.Log("\t Expected:", expected) + t.Log("\t Actual :", actual) + } } // === Sampling Store Integration Tests ===