From 67d0a4375953d4be4e71407055450d130ce4e5d0 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Mon, 24 Aug 2020 08:34:43 +0200 Subject: [PATCH 1/2] Fix Elasticsearch version in ES OTEL writer Signed-off-by: Pavol Loffay --- .../app/exporter/elasticsearchexporter/exporter.go | 3 +-- .../app/exporter/elasticsearchexporter/spanstore.go | 4 ++++ cmd/opentelemetry/app/internal/esclient/client.go | 11 +++++++---- cmd/opentelemetry/app/internal/esclient/es6client.go | 4 ++++ .../app/internal/esclient/es6client_test.go | 6 ++++++ cmd/opentelemetry/app/internal/esclient/es7client.go | 4 ++++ .../app/internal/esclient/es7client_test.go | 6 ++++++ .../es/esdependencyreader/dependency_store_test.go | 4 ++++ .../reader/es/esspanreader/span_reader_test.go | 4 ++++ 9 files changed, 40 insertions(+), 6 deletions(-) diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go index d55f70bb244..d271cddabb7 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go @@ -16,7 +16,6 @@ package elasticsearchexporter import ( "context" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/exporterhelper" @@ -31,7 +30,7 @@ func new(ctx context.Context, config *Config, params component.ExporterCreatePar return nil, err } if config.Primary.IsCreateIndexTemplates() { - spanMapping, serviceMapping := es.GetSpanServiceMappings(esCfg.GetNumShards(), esCfg.GetNumReplicas(), esCfg.GetVersion()) + spanMapping, serviceMapping := es.GetSpanServiceMappings(esCfg.GetNumShards(), esCfg.GetNumReplicas(), uint(w.esClientVersion())) if err = w.CreateTemplates(ctx, spanMapping, serviceMapping); err != nil { return nil, err } diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go index eab9af23cb7..cc0956e9eca 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/spanstore.go @@ -220,3 +220,7 @@ type bulkItem struct { // isService indicates that this bulk operation is for service index isService bool } + +func (w *esSpanWriter) esClientVersion() int { + return w.client.MajorVersion() +} diff --git a/cmd/opentelemetry/app/internal/esclient/client.go b/cmd/opentelemetry/app/internal/esclient/client.go index 1563be8801d..08c2a08298b 100644 --- a/cmd/opentelemetry/app/internal/esclient/client.go +++ b/cmd/opentelemetry/app/internal/esclient/client.go @@ -44,6 +44,9 @@ type ElasticsearchClient interface { Search(ctx context.Context, query SearchBody, size int, indices ...string) (*SearchResponse, error) // MultiSearch searches data via /_msearch MultiSearch(ctx context.Context, queries []SearchBody) (*MultiSearchResponse, error) + + // Major version returns major ES version + MajorVersion() int } // BulkResponse is a response returned by Elasticsearch Bulk API @@ -172,20 +175,20 @@ func NewElasticsearchClient(params config.Configuration, logger *zap.Logger) (El if err != nil { return nil, err } - if params.GetVersion() == 0 { + esVersion := int(params.GetVersion()) + if esVersion == 0 { esPing := elasticsearchPing{ username: params.Username, password: params.Password, roundTripper: roundTripper, } - esVersion, err := esPing.getVersion(params.Servers[0]) + esVersion, err = esPing.getVersion(params.Servers[0]) if err != nil { return nil, err } logger.Info("Elasticsearch detected", zap.Int("version", esVersion)) - params.Version = uint(esVersion) } - return newElasticsearchClient(int(params.Version), clientConfig{ + return newElasticsearchClient(esVersion, clientConfig{ DiscoverNotesOnStartup: params.Sniffer, Addresses: params.Servers, Username: params.Username, diff --git a/cmd/opentelemetry/app/internal/esclient/es6client.go b/cmd/opentelemetry/app/internal/esclient/es6client.go index 8bc567a7938..573f59f0e9f 100644 --- a/cmd/opentelemetry/app/internal/esclient/es6client.go +++ b/cmd/opentelemetry/app/internal/esclient/es6client.go @@ -152,3 +152,7 @@ func (es *elasticsearch6Client) MultiSearch(ctx context.Context, queries []Searc } return r, nil } + +func (es *elasticsearch6Client) MajorVersion() int { + return 6 +} diff --git a/cmd/opentelemetry/app/internal/esclient/es6client_test.go b/cmd/opentelemetry/app/internal/esclient/es6client_test.go index 25977251862..d26c7c55f8b 100644 --- a/cmd/opentelemetry/app/internal/esclient/es6client_test.go +++ b/cmd/opentelemetry/app/internal/esclient/es6client_test.go @@ -70,3 +70,9 @@ func TestES6MultiSearch(t *testing.T) { return newElasticsearch6Client(clientConfig{}, tripper) }) } + +func TestES6Version(t *testing.T) { + c, err := newElasticsearch6Client(clientConfig{}, nil) + require.NoError(t, err) + assert.Equal(t, 6, c.MajorVersion()) +} diff --git a/cmd/opentelemetry/app/internal/esclient/es7client.go b/cmd/opentelemetry/app/internal/esclient/es7client.go index efceb684d3f..ec818ccbb32 100644 --- a/cmd/opentelemetry/app/internal/esclient/es7client.go +++ b/cmd/opentelemetry/app/internal/esclient/es7client.go @@ -158,6 +158,10 @@ func (es *elasticsearch7Client) MultiSearch(ctx context.Context, queries []Searc return convertMultiSearchResponse(r), nil } +func (es *elasticsearch7Client) MajorVersion() int { + return 7 +} + func convertMultiSearchResponse(response *es7multiSearchResponse) *MultiSearchResponse { mResponse := &MultiSearchResponse{} for _, r := range response.Responses { diff --git a/cmd/opentelemetry/app/internal/esclient/es7client_test.go b/cmd/opentelemetry/app/internal/esclient/es7client_test.go index c159c1dd6b1..d11656cafb0 100644 --- a/cmd/opentelemetry/app/internal/esclient/es7client_test.go +++ b/cmd/opentelemetry/app/internal/esclient/es7client_test.go @@ -70,3 +70,9 @@ func TestES7MultiSearch(t *testing.T) { return newElasticsearch7Client(clientConfig{}, tripper) }) } + +func TestES7Version(t *testing.T) { + c, err := newElasticsearch7Client(clientConfig{}, nil) + require.NoError(t, err) + assert.Equal(t, 7, c.MajorVersion()) +} diff --git a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go index 48cf2302d54..cef22308974 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go +++ b/cmd/opentelemetry/app/internal/reader/es/esdependencyreader/dependency_store_test.go @@ -189,3 +189,7 @@ func (m *mockClient) Search(ctx context.Context, query esclient.SearchBody, size func (m mockClient) MultiSearch(ctx context.Context, queries []esclient.SearchBody) (*esclient.MultiSearchResponse, error) { panic("implement me") } + +func (m *mockClient) MajorVersion() int { + panic("implement me") +} diff --git a/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader_test.go b/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader_test.go index acf00a60e75..9f8dfcea602 100644 --- a/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader_test.go +++ b/cmd/opentelemetry/app/internal/reader/es/esspanreader/span_reader_test.go @@ -182,3 +182,7 @@ func (m *mockClient) Search(ctx context.Context, query esclient.SearchBody, size func (m *mockClient) MultiSearch(ctx context.Context, queries []esclient.SearchBody) (*esclient.MultiSearchResponse, error) { return m.multiSearchResponse, nil } + +func (m *mockClient) MajorVersion() int { + panic("implement me") +} From deccc76c0168e24ae97161b5bc0f3919e06cdb95 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Mon, 24 Aug 2020 13:25:38 +0200 Subject: [PATCH 2/2] fmt Signed-off-by: Pavol Loffay --- cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go b/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go index d271cddabb7..db1f7f9f8ff 100644 --- a/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go +++ b/cmd/opentelemetry/app/exporter/elasticsearchexporter/exporter.go @@ -16,6 +16,7 @@ package elasticsearchexporter import ( "context" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/exporterhelper"