Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace es-spanstore tracing instrumentation with OpenTelemetry #4596

Merged
merged 16 commits into from
Jul 30, 2023
Prev Previous commit
Next Next commit
adds tp to readertest
Signed-off-by: Afzal Ansari <[email protected]>
  • Loading branch information
afzal442 committed Jul 27, 2023
commit 11840a0dee81424d9db9f7fac9906c9fbc474f5f
77 changes: 73 additions & 4 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/metricstest"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/es/mocks"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand Down Expand Up @@ -87,20 +89,37 @@ type spanReaderTest struct {
client *mocks.Client
logger *zap.Logger
logBuffer *testutils.Buffer
exporter *tracetest.InMemoryExporter
closer error
reader *SpanReader
}

func tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func() error) {
exporter := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithSyncer(exporter),
)
closer := func() error {
return tp.Shutdown(context.Background())
}
return tp, exporter, closer
}

func withSpanReader(fn func(r *spanReaderTest)) {
client := &mocks.Client{}
tracer, exp, closer := tracerProvider()
logger, logBuffer := testutils.NewLogger()
r := &spanReaderTest{
client: client,
logger: logger,
logBuffer: logBuffer,
exporter: exp,
closer: closer(),
reader: NewSpanReader(SpanReaderParams{
Client: client,
Logger: zap.NewNop(),
Tracer: jtracer.NoOp().OTEL.Tracer("test"),
Tracer: tracer.Tracer("test"),
MaxSpanAge: 0,
IndexPrefix: "",
TagDotReplacement: "@",
Expand All @@ -112,15 +131,18 @@ func withSpanReader(fn func(r *spanReaderTest)) {

func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) {
client := &mocks.Client{}
tracer, exp, closer := tracerProvider()
logger, logBuffer := testutils.NewLogger()
r := &spanReaderTest{
client: client,
logger: logger,
logBuffer: logBuffer,
exporter: exp,
closer: closer(),
reader: NewSpanReader(SpanReaderParams{
Client: client,
Logger: zap.NewNop(),
Tracer: jtracer.NoOp().OTEL.Tracer("test"),
Tracer: tracer.Tracer("test"),
MaxSpanAge: 0,
IndexPrefix: "",
TagDotReplacement: "@",
Expand Down Expand Up @@ -173,7 +195,7 @@ func TestSpanReaderIndices(t *testing.T) {
serviceDataLayoutFormat := date.UTC().Format(serviceDataLayout)
metricsFactory := metricstest.NewFactory(0)
logger, _ := testutils.NewLogger()
tracer := jtracer.NoOp().OTEL
tracer, exp, closer := tracerProvider()

testCases := []struct {
indices []string
Expand Down Expand Up @@ -280,6 +302,8 @@ func TestSpanReaderIndices(t *testing.T) {
testCase.params.MetricsFactory = metricsFactory
testCase.params.Tracer = tracer.Tracer("test")
r := NewSpanReader(testCase.params)
assert.NotEmpty(t, exp.GetSpans(), "Spans recorded")
assert.NoError(t, closer())

actualSpan := r.timeRangeIndices(r.spanIndexPrefix, r.spanIndexDateLayout, date, date, -1*time.Hour)
actualService := r.timeRangeIndices(r.serviceIndexPrefix, r.serviceIndexDateLayout, date, date, -24*time.Hour)
Expand All @@ -303,6 +327,9 @@ func TestSpanReader_GetTrace(t *testing.T) {
},
}, nil)

assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)

trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1))
require.NoError(t, err)
require.NotNil(t, trace)
Expand Down Expand Up @@ -379,6 +406,9 @@ func TestSpanReader_multiRead_followUp_query(t *testing.T) {
},
}, nil)

assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)

traces, err := r.reader.multiRead(context.Background(), []model.TraceID{{High: 0, Low: 1}, {High: 0, Low: 2}}, date, date)
require.NoError(t, err)
require.NotNil(t, traces)
Expand Down Expand Up @@ -416,6 +446,9 @@ func TestSpanReader_SearchAfter(t *testing.T) {
},
}, nil).Times(2)

assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)

trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1))
require.NoError(t, err)
require.NotNil(t, trace)
Expand All @@ -435,6 +468,8 @@ func TestSpanReader_GetTraceQueryError(t *testing.T) {
Return(&elastic.MultiSearchResult{
Responses: []*elastic.SearchResult{},
}, nil)
assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)
trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1))
require.EqualError(t, err, "trace not found")
require.Nil(t, trace)
Expand All @@ -454,6 +489,9 @@ func TestSpanReader_GetTraceNilHits(t *testing.T) {
},
}, nil)

assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)

trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1))
require.EqualError(t, err, "trace not found")
require.Nil(t, trace)
Expand All @@ -476,6 +514,8 @@ func TestSpanReader_GetTraceInvalidSpanError(t *testing.T) {
{Hits: searchHits},
},
}, nil)
assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)

trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1))
require.Error(t, err, "invalid span")
Expand All @@ -500,6 +540,8 @@ func TestSpanReader_GetTraceSpanConversionError(t *testing.T) {
{Hits: searchHits},
},
}, nil)
assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)

trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1))
require.Error(t, err, "span conversion error, because lacks elements")
Expand Down Expand Up @@ -720,6 +762,9 @@ func TestSpanReader_FindTraces(t *testing.T) {
},
}, nil)

assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)

traceQuery := &spanstore.TraceQueryParameters{
ServiceName: serviceName,
Tags: map[string]string{
Expand Down Expand Up @@ -765,6 +810,9 @@ func TestSpanReader_FindTracesInvalidQuery(t *testing.T) {
},
}, nil)

assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)

traceQuery := &spanstore.TraceQueryParameters{
ServiceName: "",
Tags: map[string]string{
Expand Down Expand Up @@ -797,6 +845,9 @@ func TestSpanReader_FindTracesAggregationFailure(t *testing.T) {
Responses: []*elastic.SearchResult{},
}, nil)

assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)

traceQuery := &spanstore.TraceQueryParameters{
ServiceName: serviceName,
Tags: map[string]string{
Expand Down Expand Up @@ -831,6 +882,9 @@ func TestSpanReader_FindTracesNoTraceIDs(t *testing.T) {
Responses: []*elastic.SearchResult{},
}, nil)

assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)

traceQuery := &spanstore.TraceQueryParameters{
ServiceName: serviceName,
Tags: map[string]string{
Expand Down Expand Up @@ -864,6 +918,9 @@ func TestSpanReader_FindTracesReadTraceFailure(t *testing.T) {
mockMultiSearchService(r).
Return(nil, errors.New("read error"))

assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)

traceQuery := &spanstore.TraceQueryParameters{
ServiceName: serviceName,
Tags: map[string]string{
Expand Down Expand Up @@ -902,6 +959,9 @@ func TestSpanReader_FindTracesSpanCollectionFailure(t *testing.T) {
},
}, nil)

assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)

traceQuery := &spanstore.TraceQueryParameters{
ServiceName: serviceName,
Tags: map[string]string{
Expand Down Expand Up @@ -1206,6 +1266,9 @@ func TestSpanReader_GetEmptyIndex(t *testing.T) {
Responses: []*elastic.SearchResult{},
}, nil)

assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)

traceQuery := &spanstore.TraceQueryParameters{
ServiceName: serviceName,
Tags: map[string]string{
Expand All @@ -1231,6 +1294,9 @@ func TestSpanReader_ArchiveTraces(t *testing.T) {
Responses: []*elastic.SearchResult{},
}, nil)

assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)

trace, err := r.reader.GetTrace(context.Background(), model.TraceID{})
require.Nil(t, trace)
assert.EqualError(t, err, "trace not found")
Expand All @@ -1246,6 +1312,9 @@ func TestSpanReader_ArchiveTraces_ReadAlias(t *testing.T) {
Responses: []*elastic.SearchResult{},
}, nil)

assert.NotEmpty(t, r.exporter.GetSpans(), "Spans recorded")
assert.NoError(t, r.closer)

trace, err := r.reader.GetTrace(context.Background(), model.TraceID{})
require.Nil(t, trace)
assert.EqualError(t, err, "trace not found")
Expand Down
23 changes: 21 additions & 2 deletions plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ import (
"github.com/olivere/elastic"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
estemplate "github.com/jaegertracing/jaeger/pkg/es"
eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/es/dependencystore"
Expand Down Expand Up @@ -61,6 +63,18 @@ type ESStorageIntegration struct {
logger *zap.Logger
}

func tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func() error) {
exporter := tracetest.NewInMemoryExporter()
tp := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithSyncer(exporter),
)
closer := func() error {
return tp.Shutdown(context.Background())
}
return tp, exporter, closer
}

func (s *ESStorageIntegration) getVersion() (uint, error) {
pingResult, _, err := s.client.Ping(queryURL).Do(context.Background())
if err != nil {
Expand Down Expand Up @@ -142,6 +156,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro
if err != nil {
return err
}
tracer, _, closer := tracerProvider()
s.SpanWriter = w
s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{
Client: client,
Expand All @@ -152,7 +167,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro
TagDotReplacement: tagKeyDeDotChar,
Archive: archive,
MaxDocCount: defaultMaxDocCount,
Tracer: jtracer.NoOp().OTEL.Tracer("test"),
Tracer: tracer.Tracer("test"),
})
dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{
Client: client,
Expand All @@ -162,6 +177,10 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro
MaxDocCount: defaultMaxDocCount,
})

if closer != nil {
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

}

depMapping, err := mappingBuilder.GetDependenciesMappings()
if err != nil {
return err
Expand Down