From 061c5bd5c9d6a65144bb2f26c13f3445c87afa03 Mon Sep 17 00:00:00 2001 From: Manik2708 Date: Tue, 21 Jan 2025 19:56:40 +0530 Subject: [PATCH] minor fixes Signed-off-by: Manik2708 --- .../integration/elasticsearch_test.go | 5 +- .../internal/integration/opensearch_test.go | 5 +- pkg/es/client.go | 1 + pkg/es/mocks/SearchService.go | 20 +++ pkg/es/wrapper/wrapper.go | 4 + .../es/spanstore/internal/dbmodel/model.go | 1 + plugin/storage/es/spanstore/reader.go | 13 +- plugin/storage/es/spanstore/reader_test.go | 141 ++++++++++++++---- .../storage/es/spanstore/service_operation.go | 96 +++++++++--- .../es/spanstore/service_operation_test.go | 45 +++++- plugin/storage/es/spanstore/writer.go | 9 +- plugin/storage/es/spanstore/writer_test.go | 6 +- .../storage/integration/elasticsearch_test.go | 3 - 13 files changed, 269 insertions(+), 80 deletions(-) diff --git a/cmd/jaeger/internal/integration/elasticsearch_test.go b/cmd/jaeger/internal/integration/elasticsearch_test.go index 844c4866321..5aac1f70aaa 100644 --- a/cmd/jaeger/internal/integration/elasticsearch_test.go +++ b/cmd/jaeger/internal/integration/elasticsearch_test.go @@ -15,9 +15,8 @@ func TestElasticsearchStorage(t *testing.T) { s := &E2EStorageIntegration{ ConfigFile: "../../config-elasticsearch.yaml", StorageIntegration: integration.StorageIntegration{ - CleanUp: purge, - Fixtures: integration.LoadAndParseQueryTestCases(t, "fixtures/queries_es.json"), - GetOperationsMissingSpanKind: true, + CleanUp: purge, + Fixtures: integration.LoadAndParseQueryTestCases(t, "fixtures/queries_es.json"), }, } s.e2eInitialize(t, "elasticsearch") diff --git a/cmd/jaeger/internal/integration/opensearch_test.go b/cmd/jaeger/internal/integration/opensearch_test.go index f164bfcda26..2f79be2652e 100644 --- a/cmd/jaeger/internal/integration/opensearch_test.go +++ b/cmd/jaeger/internal/integration/opensearch_test.go @@ -14,9 +14,8 @@ func TestOpenSearchStorage(t *testing.T) { s := &E2EStorageIntegration{ ConfigFile: "../../config-opensearch.yaml", StorageIntegration: integration.StorageIntegration{ - CleanUp: purge, - Fixtures: integration.LoadAndParseQueryTestCases(t, "fixtures/queries_es.json"), - GetOperationsMissingSpanKind: true, + CleanUp: purge, + Fixtures: integration.LoadAndParseQueryTestCases(t, "fixtures/queries_es.json"), }, } s.e2eInitialize(t, "opensearch") diff --git a/pkg/es/client.go b/pkg/es/client.go index f7a5b6e73a0..396519a724f 100644 --- a/pkg/es/client.go +++ b/pkg/es/client.go @@ -60,6 +60,7 @@ type SearchService interface { Size(size int) SearchService Aggregation(name string, aggregation elastic.Aggregation) SearchService IgnoreUnavailable(ignoreUnavailable bool) SearchService + FetchSourceContext(fetchSourceContext *elastic.FetchSourceContext) SearchService Query(query elastic.Query) SearchService Do(ctx context.Context) (*elastic.SearchResult, error) } diff --git a/pkg/es/mocks/SearchService.go b/pkg/es/mocks/SearchService.go index cf01f949330..4f906a366ed 100644 --- a/pkg/es/mocks/SearchService.go +++ b/pkg/es/mocks/SearchService.go @@ -71,6 +71,26 @@ func (_m *SearchService) Do(ctx context.Context) (*elastic.SearchResult, error) return r0, r1 } +// FetchSourceContext provides a mock function with given fields: fetchSourceContext +func (_m *SearchService) FetchSourceContext(fetchSourceContext *elastic.FetchSourceContext) es.SearchService { + ret := _m.Called(fetchSourceContext) + + if len(ret) == 0 { + panic("no return value specified for FetchSourceContext") + } + + var r0 es.SearchService + if rf, ok := ret.Get(0).(func(*elastic.FetchSourceContext) es.SearchService); ok { + r0 = rf(fetchSourceContext) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(es.SearchService) + } + } + + return r0 +} + // IgnoreUnavailable provides a mock function with given fields: ignoreUnavailable func (_m *SearchService) IgnoreUnavailable(ignoreUnavailable bool) es.SearchService { ret := _m.Called(ignoreUnavailable) diff --git a/pkg/es/wrapper/wrapper.go b/pkg/es/wrapper/wrapper.go index e34b8c49590..b3bccb82885 100644 --- a/pkg/es/wrapper/wrapper.go +++ b/pkg/es/wrapper/wrapper.go @@ -265,6 +265,10 @@ func (s SearchServiceWrapper) Query(query elastic.Query) es.SearchService { return WrapESSearchService(s.searchService.Query(query)) } +func (s SearchServiceWrapper) FetchSourceContext(fetchSourceContext *elastic.FetchSourceContext) es.SearchService { + return WrapESSearchService(s.searchService.FetchSourceContext(fetchSourceContext)) +} + // Do calls this function to internal service. func (s SearchServiceWrapper) Do(ctx context.Context) (*elastic.SearchResult, error) { return s.searchService.Do(ctx) diff --git a/plugin/storage/es/spanstore/internal/dbmodel/model.go b/plugin/storage/es/spanstore/internal/dbmodel/model.go index d849c488573..086362fccb7 100644 --- a/plugin/storage/es/spanstore/internal/dbmodel/model.go +++ b/plugin/storage/es/spanstore/internal/dbmodel/model.go @@ -86,5 +86,6 @@ type KeyValue struct { // Service is the JSON struct for service:operation documents in ElasticSearch type Service struct { ServiceName string `json:"serviceName"` + Kind string `json:"spanKind,omitempty"` OperationName string `json:"operationName"` } diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 454726a1039..1ce849c0d82 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -298,20 +298,11 @@ func (s *SpanReader) GetOperations( currentTime, cfg.RolloverFrequencyAsNegativeDuration(s.serviceIndex.RolloverFrequency), ) - operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.maxDocCount) + operations, err := s.serviceOperationStorage.getOperationsWithKind(ctx, jaegerIndices, query.ServiceName, query.SpanKind, s.maxDocCount) if err != nil { return nil, err } - - // TODO: https://github.com/jaegertracing/jaeger/issues/1923 - // - return the operations with actual span kind that meet requirement - var result []spanstore.Operation - for _, operation := range operations { - result = append(result, spanstore.Operation{ - Name: operation, - }) - } - return result, err + return operations, err } func bucketToStringArray(buckets []*elastic.AggregationBucketKeyItem) ([]string, error) { diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 9aa184e7aa9..b1bda69ac92 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -612,58 +612,126 @@ func TestSpanReader_indexWithDate(t *testing.T) { }) } +type testGetStruct struct { + caption string + searchResult *elastic.SearchResult + searchError error + expectedError func() string + expectedOutput map[string]any +} + func testGet(typ string, t *testing.T) { + testGetWithKind(typ, t, false) +} + +func testGetWithKind(typ string, t *testing.T, testKind bool) { goodAggregations := make(map[string]*json.RawMessage) rawMessage := []byte(`{"buckets": [{"key": "123","doc_count": 16}]}`) goodAggregations[typ] = (*json.RawMessage)(&rawMessage) - + var filterRawMessage json.RawMessage + if typ == operationsAggName { + if testKind { + filterRawMessage = rawMessage + } + goodAggregations[typ] = &filterRawMessage + } else { + goodAggregations[typ] = (*json.RawMessage)(&rawMessage) + } badAggregations := make(map[string]*json.RawMessage) badRawMessage := []byte(`{"buckets": [{bad json]}asdf`) badAggregations[typ] = (*json.RawMessage)(&badRawMessage) - testCases := []struct { - caption string - searchResult *elastic.SearchResult - searchError error - expectedError func() string - expectedOutput map[string]any - }{ - { - caption: typ + " full behavior", - searchResult: &elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, - expectedOutput: map[string]any{ - operationsAggregation: []spanstore.Operation{{Name: "123"}}, - "default": []string{"123"}, - }, - expectedError: func() string { - return "" - }, - }, + testCases := []testGetStruct{ { caption: typ + " search error", searchError: errors.New("Search failure"), expectedError: func() string { - if typ == operationsAggregation { + if typ == operationsAggName { return "search operations failed: Search failure" } return "search services failed: Search failure" }, }, - { + } + + if (typ == operationsAggName && testKind) || (typ != operationsAggName) { + testCase := testGetStruct{ caption: typ + " search error", searchResult: &elastic.SearchResult{Aggregations: elastic.Aggregations(badAggregations)}, expectedError: func() string { return "could not find aggregation of " + typ }, - }, + } + testCases = append(testCases, testCase) + } + + if testKind { + testCases = append(testCases, testGetStruct{ + caption: typ + " full behavior with kind", + searchResult: &elastic.SearchResult{Aggregations: goodAggregations}, + expectedOutput: map[string]any{ + operationsAggName: []spanstore.Operation{{Name: "123", SpanKind: "server"}}, + "default": []string{"123"}, + }, + expectedError: func() string { + return "" + }, + }) + } + + if typ == operationsAggName && !testKind { + score := 0.6931471 + msg := json.RawMessage(`{"operationName": "123"}`) + hitModel := &elastic.SearchHits{ + TotalHits: 1, + MaxScore: &score, + Hits: []*elastic.SearchHit{ + { + Score: &score, + SeqNo: nil, + Id: "e232b0fbe5cebc85", + PrimaryTerm: nil, + Source: &msg, + }, + }, + } + testCases = append(testCases, testGetStruct{ + caption: typ + " full behavior", + searchResult: &elastic.SearchResult{Hits: hitModel}, + expectedOutput: map[string]any{ + operationsAggName: []spanstore.Operation{{Name: "123"}}, + "default": []string{"123"}, + }, + expectedError: func() string { + return "" + }, + }) + } + + if typ != operationsAggName { + testCases = append(testCases, testGetStruct{ + caption: typ + " full behavior", + searchResult: &elastic.SearchResult{Aggregations: goodAggregations}, + expectedOutput: map[string]any{ + operationsAggName: []spanstore.Operation{{Name: "123"}}, + "default": []string{"123"}, + }, + expectedError: func() string { + return "" + }, + }) } for _, tc := range testCases { testCase := tc t.Run(testCase.caption, func(t *testing.T) { withSpanReader(t, func(r *spanReaderTest) { - mockSearchService(r).Return(testCase.searchResult, testCase.searchError) - actual, err := returnSearchFunc(typ, r) + if testKind { + mockSearchServiceWithSpanKind(r, true).Return(testCase.searchResult, testCase.searchError) + } else { + mockSearchService(r).Return(testCase.searchResult, testCase.searchError) + } + actual, err := returnSearchFunc(typ, r, testKind) if testCase.expectedError() != "" { require.EqualError(t, err, testCase.expectedError()) assert.Nil(t, actual) @@ -677,11 +745,17 @@ func testGet(typ string, t *testing.T) { } } -func returnSearchFunc(typ string, r *spanReaderTest) (any, error) { +func returnSearchFunc(typ string, r *spanReaderTest, testKind bool) (any, error) { switch typ { case servicesAggregation: return r.reader.GetServices(context.Background()) - case operationsAggregation: + case operationsAggName: + if testKind { + return r.reader.GetOperations( + context.Background(), + spanstore.OperationQueryParameters{ServiceName: "someservice", SpanKind: "server"}, + ) + } return r.reader.GetOperations( context.Background(), spanstore.OperationQueryParameters{ServiceName: "someService"}, @@ -951,7 +1025,7 @@ func TestFindTraceIDs(t *testing.T) { }{ {traceIDAggregation}, {servicesAggregation}, - {operationsAggregation}, + {operationsAggName}, } for _, testCase := range testCases { t.Run(testCase.aggregrationID, func(t *testing.T) { @@ -997,14 +1071,23 @@ func matchTermsAggregation(termsAgg *elastic.TermsAggregation) bool { } func mockSearchService(r *spanReaderTest) *mock.Call { + return mockSearchServiceWithSpanKind(r, false) +} + +func mockSearchServiceWithSpanKind(r *spanReaderTest, inputHasSpanKind bool) *mock.Call { searchService := &mocks.SearchService{} searchService.On("Query", mock.Anything).Return(searchService) - searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService) searchService.On("Size", mock.MatchedBy(func(size int) bool { return size == 0 // Aggregations apply size (bucket) limits in their own query objects, and do not apply at the parent query level. })).Return(searchService) + searchService.On("IgnoreUnavailable", mock.AnythingOfType("bool")).Return(searchService) searchService.On("Aggregation", stringMatcher(servicesAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService) - searchService.On("Aggregation", stringMatcher(operationsAggregation), mock.MatchedBy(matchTermsAggregation)).Return(searchService) + if inputHasSpanKind { + searchService.On("Aggregation", stringMatcher(operationsAggName), mock.MatchedBy(matchTermsAggregation)).Return(searchService) + } else { + searchService.On("FetchSourceContext", mock.AnythingOfType("*elastic.FetchSourceContext"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(searchService) + searchService.On("Size", mock.AnythingOfType("int")).Return(searchService) + } searchService.On("Aggregation", stringMatcher(traceIDAggregation), mock.AnythingOfType("*elastic.TermsAggregation")).Return(searchService) r.client.On("Search", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string")).Return(searchService) return searchService.On("Do", mock.Anything) diff --git a/plugin/storage/es/spanstore/service_operation.go b/plugin/storage/es/spanstore/service_operation.go index 0ac2c20e3cb..6eb74804181 100644 --- a/plugin/storage/es/spanstore/service_operation.go +++ b/plugin/storage/es/spanstore/service_operation.go @@ -6,6 +6,7 @@ package spanstore import ( "context" + "encoding/json" "errors" "fmt" "hash/fnv" @@ -15,16 +16,18 @@ import ( "github.com/olivere/elastic" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/cache" "github.com/jaegertracing/jaeger/pkg/es" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/internal/dbmodel" + "github.com/jaegertracing/jaeger/storage/spanstore" ) const ( - serviceName = "serviceName" - - operationsAggregation = "distinct_operations" - servicesAggregation = "distinct_services" + serviceName = "serviceName" + spanKindField = "spanKindField" + operationsAggName = "distinct_operations" + servicesAggregation = "distinct_services" ) // ServiceOperationStorage stores service to operation pairs. @@ -53,10 +56,11 @@ func NewServiceOperationStorage( } // Write saves a service to operation pair. -func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span) { +func (s *ServiceOperationStorage) Write(indexName string, jsonSpan *dbmodel.Span, kind model.SpanKind) { // Insert serviceName:operationName document service := dbmodel.Service{ ServiceName: jsonSpan.Process.ServiceName, + Kind: string(kind), OperationName: jsonSpan.OperationName, } @@ -96,29 +100,44 @@ func getServicesAggregation(maxDocCount int) elastic.Query { Size(maxDocCount) // ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 } -func (s *ServiceOperationStorage) getOperations(ctx context.Context, indices []string, service string, maxDocCount int) ([]string, error) { +func (s *ServiceOperationStorage) getOperationsWithKind(ctx context.Context, indices []string, service, kind string, maxDocCount int) ([]spanstore.Operation, error) { + var searchService es.SearchService + if kind != "" { + searchService = s.client().Search(indices...). + Size(0). // set to 0 because we don't want actual documents. + Query(elastic.NewBoolQuery().Must( + elastic.NewTermQuery(serviceName, service), + elastic.NewTermQuery(spanKindField, kind))). + IgnoreUnavailable(true). + Aggregation(operationsAggName, getOperationsAggregation(maxDocCount)) + searchResult, err := searchService.Do(ctx) + if err != nil { + return nil, fmt.Errorf("search operations failed: %w", es.DetailedError(err)) + } + if searchResult.Aggregations == nil { + return []spanstore.Operation{}, nil + } + bucket, found := searchResult.Aggregations.Terms(operationsAggName) + if !found { + return nil, errors.New("could not find aggregation of " + operationsAggName) + } + operationNamesBucket := bucket.Buckets + return bucketOfOperationNamesToOperationsArray(operationNamesBucket, kind) + } serviceQuery := elastic.NewTermQuery(serviceName, service) - serviceFilter := getOperationsAggregation(maxDocCount) - - searchService := s.client().Search(indices...). - Size(0). + searchService = s.client().Search(indices...). Query(serviceQuery). IgnoreUnavailable(true). - Aggregation(operationsAggregation, serviceFilter) - + FetchSourceContext(elastic.NewFetchSourceContext(true).Include(spanKindField, operationNameField)). + Size(maxDocCount) searchResult, err := searchService.Do(ctx) if err != nil { return nil, fmt.Errorf("search operations failed: %w", es.DetailedError(err)) } - if searchResult.Aggregations == nil { - return []string{}, nil - } - bucket, found := searchResult.Aggregations.Terms(operationsAggregation) - if !found { - return nil, errors.New("could not find aggregation of " + operationsAggregation) + if searchResult.Hits == nil { + return []spanstore.Operation{}, nil } - operationNamesBucket := bucket.Buckets - return bucketToStringArray(operationNamesBucket) + return bucketOfOperationsToOperationsArray(searchResult.Hits) } func getOperationsAggregation(maxDocCount int) elastic.Query { @@ -127,9 +146,46 @@ func getOperationsAggregation(maxDocCount int) elastic.Query { Size(maxDocCount) // ES deprecated size omission for aggregating all. https://github.com/elastic/elasticsearch/issues/18838 } +func bucketOfOperationNamesToOperationsArray(buckets []*elastic.AggregationBucketKeyItem, kind string) ([]spanstore.Operation, error) { + result := make([]spanstore.Operation, len(buckets)) + for i, keyItem := range buckets { + str, ok := keyItem.Key.(string) + if !ok { + return nil, errors.New("non-string key found in aggregation") + } + result[i] = spanstore.Operation{ + Name: str, + SpanKind: kind, + } + } + return result, nil +} + +func bucketOfOperationsToOperationsArray(searchResult *elastic.SearchHits) ([]spanstore.Operation, error) { + result := make([]spanstore.Operation, len(searchResult.Hits)) + for i, hit := range searchResult.Hits { + data := hit.Source + op, err := rawMessageToOperation(data) + if err != nil { + return nil, err + } + result[i] = op + } + return result, nil +} + +func rawMessageToOperation(data *json.RawMessage) (spanstore.Operation, error) { + var service dbmodel.Service + if err := json.Unmarshal(*data, &service); err != nil { + return spanstore.Operation{}, err + } + return spanstore.Operation{Name: service.OperationName, SpanKind: service.Kind}, nil +} + func hashCode(s dbmodel.Service) string { h := fnv.New64a() h.Write([]byte(s.ServiceName)) + h.Write([]byte(s.Kind)) h.Write([]byte(s.OperationName)) return strconv.FormatUint(h.Sum64(), 16) } diff --git a/plugin/storage/es/spanstore/service_operation_test.go b/plugin/storage/es/spanstore/service_operation_test.go index b1ba9f2c377..8cc790d3ad9 100644 --- a/plugin/storage/es/spanstore/service_operation_test.go +++ b/plugin/storage/es/spanstore/service_operation_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es/mocks" "github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/internal/dbmodel" "github.com/jaegertracing/jaeger/storage/spanstore" @@ -42,13 +43,48 @@ func TestWriteService(t *testing.T) { }, } - w.writer.writeService(indexName, jsonSpan) + w.writer.writeService(indexName, jsonSpan, model.SpanKindUnspecified) indexService.AssertNumberOfCalls(t, "Add", 1) assert.Equal(t, "", w.logBuffer.String()) // test that cache works, will call the index service only once. - w.writer.writeService(indexName, jsonSpan) + w.writer.writeService(indexName, jsonSpan, model.SpanKindUnspecified) + indexService.AssertNumberOfCalls(t, "Add", 1) + }) +} + +func TestWriteServiceWithKind(t *testing.T) { + withSpanWriter(func(w *spanWriterTest) { + indexService := &mocks.IndexService{} + + indexName := "jaeger-1995-04-21" + serviceHash := "dff8fa7700d2d554" + + indexService.On("Index", stringMatcher(indexName)).Return(indexService) + indexService.On("Type", stringMatcher(serviceType)).Return(indexService) + indexService.On("Id", stringMatcher(serviceHash)).Return(indexService) + indexService.On("BodyJson", mock.AnythingOfType("dbmodel.Service")).Return(indexService) + indexService.On("Add") + + w.client.On("Index").Return(indexService) + + jsonSpan := &dbmodel.Span{ + TraceID: dbmodel.TraceID("1"), + SpanID: dbmodel.SpanID("0"), + OperationName: "operation", + Process: dbmodel.Process{ + ServiceName: "service", + }, + } + + w.writer.writeService(indexName, jsonSpan, model.SpanKindServer) + + indexService.AssertNumberOfCalls(t, "Add", 1) + assert.Equal(t, "", w.logBuffer.String()) + + // test that cache works, will call the index service only once. + w.writer.writeService(indexName, jsonSpan, model.SpanKindServer) indexService.AssertNumberOfCalls(t, "Add", 1) }) } @@ -77,7 +113,7 @@ func TestWriteServiceError(*testing.T) { }, } - w.writer.writeService(indexName, jsonSpan) + w.writer.writeService(indexName, jsonSpan, model.SpanKindUnspecified) }) } @@ -86,7 +122,8 @@ func TestSpanReader_GetServices(t *testing.T) { } func TestSpanReader_GetOperations(t *testing.T) { - testGet(operationsAggregation, t) + testGet(operationsAggName, t) + testGetWithKind(operationsAggName, t, true) } func TestSpanReader_GetServicesEmptyIndex(t *testing.T) { diff --git a/plugin/storage/es/spanstore/writer.go b/plugin/storage/es/spanstore/writer.go index f8647a1db0f..437203060ba 100644 --- a/plugin/storage/es/spanstore/writer.go +++ b/plugin/storage/es/spanstore/writer.go @@ -31,7 +31,7 @@ type spanWriterMetrics struct { indexCreate *spanstoremetrics.WriteMetrics } -type serviceWriter func(string, *dbmodel.Span) +type serviceWriter func(string, *dbmodel.Span, model.SpanKind) // SpanWriter is a wrapper around elastic.Client type SpanWriter struct { @@ -124,8 +124,9 @@ func getSpanAndServiceIndexFn(p SpanWriterParams, writeAlias string) spanAndServ func (s *SpanWriter) WriteSpan(_ context.Context, span *model.Span) error { spanIndexName, serviceIndexName := s.spanServiceIndex(span.StartTime) jsonSpan := s.spanConverter.FromDomainEmbedProcess(span) + kind, _ := span.GetSpanKind() if serviceIndexName != "" { - s.writeService(serviceIndexName, jsonSpan) + s.writeService(serviceIndexName, jsonSpan, kind) } s.writeSpan(spanIndexName, jsonSpan) s.logger.Debug("Wrote span to ES index", zap.String("index", spanIndexName)) @@ -145,8 +146,8 @@ func writeCache(key string, c cache.Cache) { c.Put(key, key) } -func (s *SpanWriter) writeService(indexName string, jsonSpan *dbmodel.Span) { - s.serviceWriter(indexName, jsonSpan) +func (s *SpanWriter) writeService(indexName string, jsonSpan *dbmodel.Span, kind model.SpanKind) { + s.serviceWriter(indexName, jsonSpan, kind) } func (s *SpanWriter) writeSpan(indexName string, jsonSpan *dbmodel.Span) { diff --git a/plugin/storage/es/spanstore/writer_test.go b/plugin/storage/es/spanstore/writer_test.go index bf1ef68325a..cabf3927b30 100644 --- a/plugin/storage/es/spanstore/writer_test.go +++ b/plugin/storage/es/spanstore/writer_test.go @@ -470,11 +470,11 @@ func TestSpanWriterParamsTTL(t *testing.T) { OperationName: "bar", } - w.writeService(serviceIndexName, jsonSpan) + w.writeService(serviceIndexName, jsonSpan, model.SpanKindUnspecified) time.Sleep(1 * time.Nanosecond) - w.writeService(serviceIndexName, jsonSpan) + w.writeService(serviceIndexName, jsonSpan, model.SpanKindUnspecified) time.Sleep(1 * time.Nanosecond) - w.writeService(serviceIndexName, jsonSpan) + w.writeService(serviceIndexName, jsonSpan, model.SpanKindUnspecified) indexService.AssertNumberOfCalls(t, "Add", test.expectedAddCalls) }) } diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 016c7fb87ef..c7e5618b14e 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -172,9 +172,6 @@ func testElasticsearchStorage(t *testing.T, allTagsAsFields bool) { StorageIntegration: StorageIntegration{ Fixtures: LoadAndParseQueryTestCases(t, "fixtures/queries_es.json"), SkipArchiveTest: false, - // TODO: remove this flag after ES supports returning spanKind - // Issue https://github.com/jaegertracing/jaeger/issues/1923 - GetOperationsMissingSpanKind: true, }, } s.initializeES(t, c, allTagsAsFields)