From a1c621b22d40c9c4400b43d2e3c17647284ba7a1 Mon Sep 17 00:00:00 2001 From: Afzal Ansari <afzal442@gmail.com> Date: Wed, 26 Jul 2023 19:11:48 +0000 Subject: [PATCH 1/6] adds otel to cassandra spanstore Signed-off-by: Afzal Ansari <afzal442@gmail.com> --- plugin/storage/cassandra/factory.go | 8 ++++++-- plugin/storage/cassandra/savetracetest/main.go | 4 +++- plugin/storage/cassandra/spanstore/reader.go | 4 ++++ plugin/storage/cassandra/spanstore/reader_test.go | 4 +++- 4 files changed, 16 insertions(+), 4 deletions(-) diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 507f847ebd5..f41ff4080b7 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -21,6 +21,8 @@ import ( "io" "github.com/spf13/viper" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/cassandra" @@ -57,6 +59,7 @@ type Factory struct { primaryMetricsFactory metrics.Factory archiveMetricsFactory metrics.Factory logger *zap.Logger + tracer trace.TracerProvider primaryConfig config.SessionBuilder primarySession cassandra.Session @@ -67,6 +70,7 @@ type Factory struct { // NewFactory creates a new Factory. func NewFactory() *Factory { return &Factory{ + tracer: otel.GetTracerProvider(), Options: NewOptions(primaryStorageConfig, archiveStorageConfig), } } @@ -120,7 +124,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger), nil + return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")), nil } // CreateSpanWriter implements storage.Factory @@ -143,7 +147,7 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { if f.archiveSession == nil { return nil, storage.ErrArchiveStorageNotConfigured } - return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger), nil + return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")), nil } // CreateArchiveSpanWriter implements storage.ArchiveFactory diff --git a/plugin/storage/cassandra/savetracetest/main.go b/plugin/storage/cassandra/savetracetest/main.go index 8b28858b3e4..c478b0413ba 100644 --- a/plugin/storage/cassandra/savetracetest/main.go +++ b/plugin/storage/cassandra/savetracetest/main.go @@ -19,6 +19,7 @@ import ( "context" "time" + "go.opentelemetry.io/otel" "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" @@ -44,8 +45,9 @@ func main() { if err != nil { logger.Fatal("Cannot create Cassandra session", zap.Error(err)) } + tracer := otel.GetTracerProvider() spanStore := cSpanStore.NewSpanWriter(cqlSession, time.Hour*12, noScope, logger) - spanReader := cSpanStore.NewSpanReader(cqlSession, noScope, logger) + spanReader := cSpanStore.NewSpanReader(cqlSession, noScope, logger, tracer.Tracer("cSpanStore.SpanReader")) ctx := context.Background() if err = spanStore.WriteSpan(ctx, getSomeSpan()); err != nil { logger.Fatal("Failed to save", zap.Error(err)) diff --git a/plugin/storage/cassandra/spanstore/reader.go b/plugin/storage/cassandra/spanstore/reader.go index 621e327a41c..e502bdbb94d 100644 --- a/plugin/storage/cassandra/spanstore/reader.go +++ b/plugin/storage/cassandra/spanstore/reader.go @@ -24,6 +24,7 @@ import ( "github.com/opentracing/opentracing-go" ottag "github.com/opentracing/opentracing-go/ext" otlog "github.com/opentracing/opentracing-go/log" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" @@ -110,6 +111,7 @@ type SpanReader struct { operationNamesReader operationNamesReader metrics spanReaderMetrics logger *zap.Logger + tracer trace.Tracer } // NewSpanReader returns a new SpanReader. @@ -117,6 +119,7 @@ func NewSpanReader( session cassandra.Session, metricsFactory metrics.Factory, logger *zap.Logger, + tracer trace.Tracer, ) *SpanReader { readFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "read", Tags: nil}) serviceNamesStorage := NewServiceNamesStorage(session, 0, metricsFactory, logger) @@ -134,6 +137,7 @@ func NewSpanReader( queryServiceNameIndex: casMetrics.NewTable(readFactory, "service_name_index"), }, logger: logger, + tracer: tracer, } } diff --git a/plugin/storage/cassandra/spanstore/reader_test.go b/plugin/storage/cassandra/spanstore/reader_test.go index 83e509f361b..7c975a69636 100644 --- a/plugin/storage/cassandra/spanstore/reader_test.go +++ b/plugin/storage/cassandra/spanstore/reader_test.go @@ -31,6 +31,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/cassandra" "github.com/jaegertracing/jaeger/pkg/cassandra/mocks" + "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -52,11 +53,12 @@ func withSpanReader(fn func(r *spanReaderTest)) { query.On("Exec").Return(nil) logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) + tracer := jtracer.NoOp().OTEL r := &spanReaderTest{ session: session, logger: logger, logBuffer: logBuffer, - reader: NewSpanReader(session, metricsFactory, logger), + reader: NewSpanReader(session, metricsFactory, logger, tracer.Tracer("test")), } fn(r) } From b36c8a9f215a7136c733e92db65fe079308877eb Mon Sep 17 00:00:00 2001 From: Afzal <94980910+afzalbin64@users.noreply.github.com> Date: Sat, 29 Jul 2023 12:54:18 +0000 Subject: [PATCH 2/6] adds otel tracer to NewSpanReader Signed-off-by: Afzal <94980910+afzalbin64@users.noreply.github.com> --- plugin/storage/cassandra/spanstore/reader.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugin/storage/cassandra/spanstore/reader.go b/plugin/storage/cassandra/spanstore/reader.go index e502bdbb94d..3a4a0519ffb 100644 --- a/plugin/storage/cassandra/spanstore/reader.go +++ b/plugin/storage/cassandra/spanstore/reader.go @@ -24,6 +24,7 @@ import ( "github.com/opentracing/opentracing-go" ottag "github.com/opentracing/opentracing-go/ext" otlog "github.com/opentracing/opentracing-go/log" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -121,6 +122,9 @@ func NewSpanReader( logger *zap.Logger, tracer trace.Tracer, ) *SpanReader { + if tracer == nil { + tracer = otel.GetTracerProvider().Tracer("cSpanStore.SpanReader") + } readFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "read", Tags: nil}) serviceNamesStorage := NewServiceNamesStorage(session, 0, metricsFactory, logger) operationNamesStorage := NewOperationNamesStorage(session, 0, metricsFactory, logger) From 5ccf24d2b96bc91c60fbec42597284d762d0ec43 Mon Sep 17 00:00:00 2001 From: Afzal <94980910+afzalbin64@users.noreply.github.com> Date: Sat, 29 Jul 2023 19:05:16 +0000 Subject: [PATCH 3/6] adds otels tracer span to span reader Signed-off-by: Afzal <94980910+afzalbin64@users.noreply.github.com> --- plugin/storage/cassandra/spanstore/reader.go | 80 +++++++++++--------- 1 file changed, 44 insertions(+), 36 deletions(-) diff --git a/plugin/storage/cassandra/spanstore/reader.go b/plugin/storage/cassandra/spanstore/reader.go index 3a4a0519ffb..ec12099515b 100644 --- a/plugin/storage/cassandra/spanstore/reader.go +++ b/plugin/storage/cassandra/spanstore/reader.go @@ -21,10 +21,10 @@ import ( "fmt" "time" - "github.com/opentracing/opentracing-go" - ottag "github.com/opentracing/opentracing-go/ext" - otlog "github.com/opentracing/opentracing-go/log" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -158,10 +158,13 @@ func (s *SpanReader) GetOperations( return s.operationNamesReader(query) } -func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID) (*model.Trace, error) { - span, ctx := startSpanForQuery(ctx, "readTrace", querySpanByTraceID) - defer span.Finish() - span.LogFields(otlog.String("event", "searching"), otlog.Object("trace_id", traceID)) +func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID, tracer trace.Tracer) (*model.Trace, error) { + ctx, span := startSpanForQuery(ctx, "readTrace", querySpanByTraceID, tracer) + defer span.End() + span.SetAttributes( + attribute.Key("event").String("searching"), + attribute.Key("trace_id").String(traceID.String()), + ) trace, err := s.readTraceInSpan(ctx, traceID) logErrorToSpan(span, err) @@ -217,7 +220,7 @@ func (s *SpanReader) readTraceInSpan(ctx context.Context, traceID dbmodel.TraceI // GetTrace takes a traceID and returns a Trace associated with that traceID func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - return s.readTrace(ctx, dbmodel.TraceIDFromDomain(traceID)) + return s.readTrace(ctx, dbmodel.TraceIDFromDomain(traceID), s.tracer) } func validateQuery(p *spanstore.TraceQueryParameters) error { @@ -295,7 +298,7 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra return nil, err } if len(traceQuery.Tags) > 0 { - tagTraceIds, err := s.queryByTagsAndLogs(ctx, traceQuery) + tagTraceIds, err := s.queryByTagsAndLogs(ctx, traceQuery, s.tracer) if err != nil { return nil, err } @@ -307,19 +310,22 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra return traceIds, nil } if len(traceQuery.Tags) > 0 { - return s.queryByTagsAndLogs(ctx, traceQuery) + return s.queryByTagsAndLogs(ctx, traceQuery, s.tracer) } return s.queryByService(ctx, traceQuery) } -func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { - span, ctx := startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag) - defer span.Finish() +func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.TraceQueryParameters, tracer trace.Tracer) (dbmodel.UniqueTraceIDs, error) { + ctx, span := startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag, tracer) + defer span.End() results := make([]dbmodel.UniqueTraceIDs, 0, len(tq.Tags)) for k, v := range tq.Tags { - childSpan, _ := opentracing.StartSpanFromContext(ctx, "queryByTag") - childSpan.LogFields(otlog.String("tag.key", k), otlog.String("tag.value", v)) + _, childSpan := s.tracer.Start(ctx, "queryByTag") + childSpan.SetAttributes( + attribute.Key("tag.key").String(k), + attribute.Key("tag.value").String(v), + ) query := s.session.Query( queryByTag, tq.ServiceName, @@ -330,7 +336,7 @@ func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.Trace tq.NumTraces*limitMultiple, ).PageSize(0) t, err := s.executeQuery(childSpan, query, s.metrics.queryTagIndex) - childSpan.Finish() + defer childSpan.End() if err != nil { return nil, err } @@ -340,8 +346,8 @@ func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.Trace } func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { - span, ctx := startSpanForQuery(ctx, "queryByDuration", queryByDuration) - defer span.Finish() + ctx, span := startSpanForQuery(ctx, "queryByDuration", queryByDuration, s.tracer) + defer span.End() results := dbmodel.UniqueTraceIDs{} @@ -357,8 +363,8 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore. endTimeByHour := traceQuery.StartTimeMax.Round(durationBucketSize) for timeBucket := endTimeByHour; timeBucket.After(startTimeByHour) || timeBucket.Equal(startTimeByHour); timeBucket = timeBucket.Add(-1 * durationBucketSize) { - childSpan, _ := opentracing.StartSpanFromContext(ctx, "queryForTimeBucket") - childSpan.LogFields(otlog.String("timeBucket", timeBucket.String())) + _, childSpan := s.tracer.Start(ctx, "queryForTimeBucket") + childSpan.SetAttributes(attribute.Key("timeBucket").String(timeBucket.String())) query := s.session.Query( queryByDuration, timeBucket, @@ -368,7 +374,7 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore. maxDurationMicros, traceQuery.NumTraces*limitMultiple) t, err := s.executeQuery(childSpan, query, s.metrics.queryDurationIndex) - childSpan.Finish() + childSpan.End() if err != nil { return nil, err } @@ -384,8 +390,8 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore. } func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { - span, _ := startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName) - defer span.Finish() + _, span := startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName, s.tracer) + defer span.End() query := s.session.Query( queryByServiceAndOperationName, tq.ServiceName, @@ -398,8 +404,8 @@ func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spa } func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { - span, _ := startSpanForQuery(ctx, "queryByService", queryByServiceName) - defer span.Finish() + _, span := startSpanForQuery(ctx, "queryByService", queryByServiceName, s.tracer) + defer span.End() query := s.session.Query( queryByServiceName, tq.ServiceName, @@ -410,7 +416,7 @@ func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQuer return s.executeQuery(span, query, s.metrics.queryServiceNameIndex) } -func (s *SpanReader) executeQuery(span opentracing.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) { +func (s *SpanReader) executeQuery(span trace.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) { start := time.Now() i := query.Iter() retMe := dbmodel.UniqueTraceIDs{} @@ -422,25 +428,27 @@ func (s *SpanReader) executeQuery(span opentracing.Span, query cassandra.Query, tableMetrics.Emit(err, time.Since(start)) if err != nil { logErrorToSpan(span, err) - span.LogFields(otlog.String("query", query.String())) + span.SetAttributes(attribute.Key("query").String(query.String())) s.logger.Error("Failed to exec query", zap.Error(err), zap.String("query", query.String())) return nil, err } return retMe, nil } -func startSpanForQuery(ctx context.Context, name, query string) (opentracing.Span, context.Context) { - span, ctx := opentracing.StartSpanFromContext(ctx, name) - ottag.DBStatement.Set(span, query) - ottag.DBType.Set(span, "cassandra") - ottag.Component.Set(span, "gocql") - return span, ctx +func startSpanForQuery(ctx context.Context, name, query string, tp trace.Tracer) (context.Context, trace.Span) { + ctx, span := tp.Start(ctx, name) + span.SetAttributes( + attribute.Key(semconv.DBStatementKey).String(query), + attribute.Key(semconv.DBSystemKey).String("cassandra"), + attribute.Key("component").String("gocql"), + ) + return ctx, span } -func logErrorToSpan(span opentracing.Span, err error) { +func logErrorToSpan(span trace.Span, err error) { if err == nil { return } - ottag.Error.Set(span, true) - span.LogFields(otlog.Error(err)) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) } From 1c7a579ca1dff105e2507068c4a03f1d248ba5e1 Mon Sep 17 00:00:00 2001 From: Afzal <94980910+afzalbin64@users.noreply.github.com> Date: Sat, 29 Jul 2023 19:34:30 +0000 Subject: [PATCH 4/6] adds jtracer.New() to main Signed-off-by: Afzal <94980910+afzalbin64@users.noreply.github.com> --- plugin/storage/cassandra/savetracetest/main.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/plugin/storage/cassandra/savetracetest/main.go b/plugin/storage/cassandra/savetracetest/main.go index c478b0413ba..81724eb8dee 100644 --- a/plugin/storage/cassandra/savetracetest/main.go +++ b/plugin/storage/cassandra/savetracetest/main.go @@ -19,11 +19,11 @@ import ( "context" "time" - "go.opentelemetry.io/otel" "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" cascfg "github.com/jaegertracing/jaeger/pkg/cassandra/config" + "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/metrics" cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -45,9 +45,12 @@ func main() { if err != nil { logger.Fatal("Cannot create Cassandra session", zap.Error(err)) } - tracer := otel.GetTracerProvider() + tracer, err := jtracer.New("cassandra") + if err != nil { + logger.Fatal("Failed to initialize tracer", zap.Error(err)) + } spanStore := cSpanStore.NewSpanWriter(cqlSession, time.Hour*12, noScope, logger) - spanReader := cSpanStore.NewSpanReader(cqlSession, noScope, logger, tracer.Tracer("cSpanStore.SpanReader")) + spanReader := cSpanStore.NewSpanReader(cqlSession, noScope, logger, tracer.OTEL.Tracer("cSpanStore.SpanReader")) ctx := context.Background() if err = spanStore.WriteSpan(ctx, getSomeSpan()); err != nil { logger.Fatal("Failed to save", zap.Error(err)) From af3c4aa7b9c08bc1c3e21c2dc0d4658adc784c97 Mon Sep 17 00:00:00 2001 From: Afzal <94980910+afzalbin64@users.noreply.github.com> Date: Sun, 30 Jul 2023 18:14:43 +0000 Subject: [PATCH 5/6] makes startSpanquery as a member function Signed-off-by: Afzal <94980910+afzalbin64@users.noreply.github.com> --- plugin/storage/cassandra/spanstore/reader.go | 31 ++++++++------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/plugin/storage/cassandra/spanstore/reader.go b/plugin/storage/cassandra/spanstore/reader.go index ec12099515b..ad1806f5f47 100644 --- a/plugin/storage/cassandra/spanstore/reader.go +++ b/plugin/storage/cassandra/spanstore/reader.go @@ -21,7 +21,6 @@ import ( "fmt" "time" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" @@ -122,9 +121,6 @@ func NewSpanReader( logger *zap.Logger, tracer trace.Tracer, ) *SpanReader { - if tracer == nil { - tracer = otel.GetTracerProvider().Tracer("cSpanStore.SpanReader") - } readFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "read", Tags: nil}) serviceNamesStorage := NewServiceNamesStorage(session, 0, metricsFactory, logger) operationNamesStorage := NewOperationNamesStorage(session, 0, metricsFactory, logger) @@ -158,11 +154,10 @@ func (s *SpanReader) GetOperations( return s.operationNamesReader(query) } -func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID, tracer trace.Tracer) (*model.Trace, error) { - ctx, span := startSpanForQuery(ctx, "readTrace", querySpanByTraceID, tracer) +func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID) (*model.Trace, error) { + ctx, span := s.startSpanForQuery(ctx, "readTrace", querySpanByTraceID) defer span.End() span.SetAttributes( - attribute.Key("event").String("searching"), attribute.Key("trace_id").String(traceID.String()), ) @@ -220,7 +215,7 @@ func (s *SpanReader) readTraceInSpan(ctx context.Context, traceID dbmodel.TraceI // GetTrace takes a traceID and returns a Trace associated with that traceID func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - return s.readTrace(ctx, dbmodel.TraceIDFromDomain(traceID), s.tracer) + return s.readTrace(ctx, dbmodel.TraceIDFromDomain(traceID)) } func validateQuery(p *spanstore.TraceQueryParameters) error { @@ -298,7 +293,7 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra return nil, err } if len(traceQuery.Tags) > 0 { - tagTraceIds, err := s.queryByTagsAndLogs(ctx, traceQuery, s.tracer) + tagTraceIds, err := s.queryByTagsAndLogs(ctx, traceQuery) if err != nil { return nil, err } @@ -310,13 +305,13 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra return traceIds, nil } if len(traceQuery.Tags) > 0 { - return s.queryByTagsAndLogs(ctx, traceQuery, s.tracer) + return s.queryByTagsAndLogs(ctx, traceQuery) } return s.queryByService(ctx, traceQuery) } -func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.TraceQueryParameters, tracer trace.Tracer) (dbmodel.UniqueTraceIDs, error) { - ctx, span := startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag, tracer) +func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { + ctx, span := s.startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag) defer span.End() results := make([]dbmodel.UniqueTraceIDs, 0, len(tq.Tags)) @@ -336,7 +331,7 @@ func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.Trace tq.NumTraces*limitMultiple, ).PageSize(0) t, err := s.executeQuery(childSpan, query, s.metrics.queryTagIndex) - defer childSpan.End() + childSpan.End() if err != nil { return nil, err } @@ -346,7 +341,7 @@ func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.Trace } func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { - ctx, span := startSpanForQuery(ctx, "queryByDuration", queryByDuration, s.tracer) + ctx, span := s.startSpanForQuery(ctx, "queryByDuration", queryByDuration) defer span.End() results := dbmodel.UniqueTraceIDs{} @@ -390,7 +385,7 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore. } func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { - _, span := startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName, s.tracer) + _, span := s.startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName) defer span.End() query := s.session.Query( queryByServiceAndOperationName, @@ -404,7 +399,7 @@ func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spa } func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { - _, span := startSpanForQuery(ctx, "queryByService", queryByServiceName, s.tracer) + _, span := s.startSpanForQuery(ctx, "queryByService", queryByServiceAndOperationName) defer span.End() query := s.session.Query( queryByServiceName, @@ -435,8 +430,8 @@ func (s *SpanReader) executeQuery(span trace.Span, query cassandra.Query, tableM return retMe, nil } -func startSpanForQuery(ctx context.Context, name, query string, tp trace.Tracer) (context.Context, trace.Span) { - ctx, span := tp.Start(ctx, name) +func (s *SpanReader) startSpanForQuery(ctx context.Context, name, query string) (context.Context, trace.Span) { + ctx, span := s.tracer.Start(ctx, name) span.SetAttributes( attribute.Key(semconv.DBStatementKey).String(query), attribute.Key(semconv.DBSystemKey).String("cassandra"), From 46b5d931e341182311a701a09a6e11d8e7272d9f Mon Sep 17 00:00:00 2001 From: Afzal <94980910+afzalbin64@users.noreply.github.com> Date: Mon, 31 Jul 2023 07:33:06 +0000 Subject: [PATCH 6/6] adds tracetest and mocks otel tp Signed-off-by: Afzal <94980910+afzalbin64@users.noreply.github.com> --- .../storage/cassandra/savetracetest/main.go | 2 +- plugin/storage/cassandra/spanstore/reader.go | 5 +- .../cassandra/spanstore/reader_test.go | 55 +++++++++++++------ 3 files changed, 40 insertions(+), 22 deletions(-) diff --git a/plugin/storage/cassandra/savetracetest/main.go b/plugin/storage/cassandra/savetracetest/main.go index 81724eb8dee..66281a56d47 100644 --- a/plugin/storage/cassandra/savetracetest/main.go +++ b/plugin/storage/cassandra/savetracetest/main.go @@ -45,7 +45,7 @@ func main() { if err != nil { logger.Fatal("Cannot create Cassandra session", zap.Error(err)) } - tracer, err := jtracer.New("cassandra") + tracer, err := jtracer.New("savetracetest") if err != nil { logger.Fatal("Failed to initialize tracer", zap.Error(err)) } diff --git a/plugin/storage/cassandra/spanstore/reader.go b/plugin/storage/cassandra/spanstore/reader.go index ad1806f5f47..62d984fca3c 100644 --- a/plugin/storage/cassandra/spanstore/reader.go +++ b/plugin/storage/cassandra/spanstore/reader.go @@ -157,9 +157,7 @@ func (s *SpanReader) GetOperations( func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID) (*model.Trace, error) { ctx, span := s.startSpanForQuery(ctx, "readTrace", querySpanByTraceID) defer span.End() - span.SetAttributes( - attribute.Key("trace_id").String(traceID.String()), - ) + span.SetAttributes(attribute.Key("trace_id").String(traceID.String())) trace, err := s.readTraceInSpan(ctx, traceID) logErrorToSpan(span, err) @@ -423,7 +421,6 @@ func (s *SpanReader) executeQuery(span trace.Span, query cassandra.Query, tableM tableMetrics.Emit(err, time.Since(start)) if err != nil { logErrorToSpan(span, err) - span.SetAttributes(attribute.Key("query").String(query.String())) s.logger.Error("Failed to exec query", zap.Error(err), zap.String("query", query.String())) return nil, err } diff --git a/plugin/storage/cassandra/spanstore/reader_test.go b/plugin/storage/cassandra/spanstore/reader_test.go index 7c975a69636..0d468bfe5db 100644 --- a/plugin/storage/cassandra/spanstore/reader_test.go +++ b/plugin/storage/cassandra/spanstore/reader_test.go @@ -25,26 +25,41 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/cassandra" "github.com/jaegertracing/jaeger/pkg/cassandra/mocks" - "github.com/jaegertracing/jaeger/pkg/jtracer" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel" "github.com/jaegertracing/jaeger/storage/spanstore" ) type spanReaderTest struct { - session *mocks.Session - logger *zap.Logger - logBuffer *testutils.Buffer - reader *SpanReader + session *mocks.Session + logger *zap.Logger + logBuffer *testutils.Buffer + traceBuffer *tracetest.InMemoryExporter + reader *SpanReader } -func withSpanReader(fn func(r *spanReaderTest)) { +func tracerProvider(t *testing.T) (trace.TracerProvider, *tracetest.InMemoryExporter, func()) { + exporter := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithSyncer(exporter), + ) + closer := func() { + assert.NoError(t, tp.Shutdown(context.Background())) + } + return tp, exporter, closer +} + +func withSpanReader(t *testing.T, fn func(r *spanReaderTest)) { session := &mocks.Session{} query := &mocks.Query{} session.On("Query", @@ -53,12 +68,14 @@ func withSpanReader(fn func(r *spanReaderTest)) { query.On("Exec").Return(nil) logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) - tracer := jtracer.NoOp().OTEL + tracer, exp, closer := tracerProvider(t) + defer closer() r := &spanReaderTest{ - session: session, - logger: logger, - logBuffer: logBuffer, - reader: NewSpanReader(session, metricsFactory, logger, tracer.Tracer("test")), + session: session, + logger: logger, + logBuffer: logBuffer, + traceBuffer: exp, + reader: NewSpanReader(session, metricsFactory, logger, tracer.Tracer("test")), } fn(r) } @@ -66,7 +83,7 @@ func withSpanReader(fn func(r *spanReaderTest)) { var _ spanstore.Reader = &SpanReader{} // check API conformance func TestSpanReaderGetServices(t *testing.T) { - withSpanReader(func(r *spanReaderTest) { + withSpanReader(t, func(r *spanReaderTest) { r.reader.serviceNamesReader = func() ([]string, error) { return []string{"service-a"}, nil } s, err := r.reader.GetServices(context.Background()) assert.NoError(t, err) @@ -75,7 +92,7 @@ func TestSpanReaderGetServices(t *testing.T) { } func TestSpanReaderGetOperations(t *testing.T) { - withSpanReader(func(r *spanReaderTest) { + withSpanReader(t, func(r *spanReaderTest) { expectedOperations := []spanstore.Operation{ { Name: "operation-a", @@ -123,7 +140,7 @@ func TestSpanReaderGetTrace(t *testing.T) { for _, tc := range testCases { testCase := tc // capture loop var t.Run("expected err="+testCase.expectedErr, func(t *testing.T) { - withSpanReader(func(r *spanReaderTest) { + withSpanReader(t, func(r *spanReaderTest) { iter := &mocks.Iterator{} iter.On("Scan", testCase.scanner).Return(true) iter.On("Scan", matchEverything()).Return(false) @@ -137,6 +154,7 @@ func TestSpanReaderGetTrace(t *testing.T) { trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) if testCase.expectedErr == "" { + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") assert.NoError(t, err) assert.NotNil(t, trace) } else { @@ -150,7 +168,7 @@ func TestSpanReaderGetTrace(t *testing.T) { } func TestSpanReaderGetTrace_TraceNotFound(t *testing.T) { - withSpanReader(func(r *spanReaderTest) { + withSpanReader(t, func(r *spanReaderTest) { iter := &mocks.Iterator{} iter.On("Scan", matchEverything()).Return(false) iter.On("Close").Return(nil) @@ -162,14 +180,16 @@ func TestSpanReaderGetTrace_TraceNotFound(t *testing.T) { r.session.On("Query", mock.AnythingOfType("string"), matchEverything()).Return(query) trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") assert.Nil(t, trace) assert.EqualError(t, err, "trace not found") }) } func TestSpanReaderFindTracesBadRequest(t *testing.T) { - withSpanReader(func(r *spanReaderTest) { + withSpanReader(t, func(r *spanReaderTest) { _, err := r.reader.FindTraces(context.Background(), nil) + require.Empty(t, r.traceBuffer.GetSpans(), "Spans Not recorded") assert.Error(t, err) }) } @@ -288,7 +308,7 @@ func TestSpanReaderFindTraces(t *testing.T) { for _, tc := range testCases { testCase := tc // capture loop var t.Run(testCase.caption, func(t *testing.T) { - withSpanReader(func(r *spanReaderTest) { + withSpanReader(t, func(r *spanReaderTest) { // scanMatcher can match Iter.Scan() parameters and set trace ID fields scanMatcher := func(name string) interface{} { traceIDs := []dbmodel.TraceID{ @@ -386,6 +406,7 @@ func TestSpanReaderFindTraces(t *testing.T) { } res, err := r.reader.FindTraces(context.Background(), queryParams) if testCase.expectedError == "" { + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") assert.NoError(t, err) assert.Len(t, res, testCase.expectedCount, "expecting certain number of traces") } else {