Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create sampling templates when creating sampling store #5349

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a95370e
Create sampling templates when creating sampling store
JaeguKim Apr 11, 2024
b1c6882
Refactor MappingBuilder creation
JaeguKim Apr 11, 2024
0b27a79
Create sampling store in initializeESFactory
JaeguKim Apr 12, 2024
3a05bc9
Test case for MappingBuilder returning error
JaeguKim Apr 12, 2024
2ccca5f
Cover CreateTemplates fail case in CreateSamplingStore
JaeguKim Apr 12, 2024
ec0d556
Apply review
JaeguKim Apr 12, 2024
147a048
Revert passing templateBuilder as parameter
JaeguKim Apr 12, 2024
294019e
Create unique index template id
JaeguKim Apr 12, 2024
568c7b1
Apply review
JaeguKim Apr 12, 2024
3dd3dd1
Assign bulkprocessor and resolve npe
JaeguKim Apr 12, 2024
0bcab60
Remove unnecessary nil check
JaeguKim Apr 12, 2024
f605d5a
Revert creating unique template id
JaeguKim Apr 12, 2024
682ab72
Add prefix sampling template id, do not wrap error again
JaeguKim Apr 13, 2024
bcda25b
Export prefixIndexName() and call it when creating template
JaeguKim Apr 14, 2024
69a62ee
Remove bulk processor from the ESStorageIntegration
JaeguKim Apr 14, 2024
b96e166
Remove unnecessary es version passing
JaeguKim Apr 14, 2024
7593bd4
Change method name
JaeguKim Apr 14, 2024
cd18683
Resolve compile error
JaeguKim Apr 17, 2024
cc2b979
Rename index template name to resolve index template priority issue i…
JaeguKim Apr 17, 2024
25c2ba5
Fix failed unit test
JaeguKim Apr 18, 2024
aca9554
Sleep for a second to allow time for the traces to be indexed when us…
JaeguKim Apr 19, 2024
b48c349
Retry until expected span counts are returned
JaeguKim Apr 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 35 additions & 20 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package es

import (
"context"
"errors"
"flag"
"fmt"
Expand Down Expand Up @@ -266,23 +267,6 @@
return nil, err
}

mappingBuilder := mappings.MappingBuilder{
TemplateBuilder: es.TextTemplateBuilder{},
Shards: cfg.NumShards,
Replicas: cfg.NumReplicas,
EsVersion: cfg.Version,
IndexPrefix: cfg.IndexPrefix,
UseILM: cfg.UseILM,
PrioritySpanTemplate: cfg.PrioritySpanTemplate,
PriorityServiceTemplate: cfg.PriorityServiceTemplate,
PriorityDependenciesTemplate: cfg.PriorityDependenciesTemplate,
}

spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings()
if err != nil {
return nil, err
}

writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{
Client: clientFn,
IndexPrefix: cfg.IndexPrefix,
Expand All @@ -299,27 +283,58 @@

// Creating a template here would conflict with the one created for ILM resulting to no index rollover
if cfg.CreateIndexTemplates && !cfg.UseILM {
err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.IndexPrefix)
mappingBuilder := mappingBuilderFromConfig(cfg)
spanMapping, serviceMapping, err := mappingBuilder.GetSpanServiceMappings()
if err != nil {
return nil, err
}
if err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.IndexPrefix); err != nil {
return nil, err
}
}
return writer, nil
}

func (f *Factory) CreateSamplingStore(maxBuckets int) (samplingstore.Store, error) {
store := esSampleStore.NewSamplingStore(esSampleStore.SamplingStoreParams{
params := esSampleStore.SamplingStoreParams{
Client: f.getPrimaryClient,
Logger: f.logger,
IndexPrefix: f.primaryConfig.IndexPrefix,
IndexDateLayout: f.primaryConfig.IndexDateLayoutSampling,
IndexRolloverFrequency: f.primaryConfig.GetIndexRolloverFrequencySamplingDuration(),
Lookback: f.primaryConfig.AdaptiveSamplingLookback,
MaxDocCount: f.primaryConfig.MaxDocCount,
})
}
store := esSampleStore.NewSamplingStore(params)

if f.primaryConfig.CreateIndexTemplates && !f.primaryConfig.UseILM {
mappingBuilder := mappingBuilderFromConfig(f.primaryConfig)
samplingMapping, err := mappingBuilder.GetSamplingMappings()
if err != nil {
return nil, err

Check warning on line 314 in plugin/storage/es/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/factory.go#L314

Added line #L314 was not covered by tests
}
if _, err := f.getPrimaryClient().CreateTemplate(params.PrefixedIndexName()).Body(samplingMapping).Do(context.Background()); err != nil {
return nil, fmt.Errorf("failed to create template: %w", err)
}
}

return store, nil
}

func mappingBuilderFromConfig(cfg *config.Configuration) mappings.MappingBuilder {
return mappings.MappingBuilder{
TemplateBuilder: es.TextTemplateBuilder{},
Shards: cfg.NumShards,
Replicas: cfg.NumReplicas,
EsVersion: cfg.Version,
IndexPrefix: cfg.IndexPrefix,
UseILM: cfg.UseILM,
PrioritySpanTemplate: cfg.PrioritySpanTemplate,
PriorityServiceTemplate: cfg.PriorityServiceTemplate,
PriorityDependenciesTemplate: cfg.PriorityDependenciesTemplate,
}
}

func createDependencyReader(
clientFn func() es.Client,
cfg *config.Configuration,
Expand Down
5 changes: 5 additions & 0 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,14 @@ func TestCreateTemplateError(t *testing.T) {
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
defer f.Close()

w, err := f.CreateSpanWriter()
assert.Nil(t, w)
require.Error(t, err, "template-error")

s, err := f.CreateSamplingStore(1)
assert.Nil(t, s)
require.Error(t, err, "template-error")
}

func TestILMDisableTemplateCreation(t *testing.T) {
Expand Down
11 changes: 3 additions & 8 deletions plugin/storage/es/samplingstore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
)

const (
samplingIndex = "jaeger-sampling-"
samplingIndex = "jaeger-sampling"
throughputType = "throughput-sampling"
probabilitiesType = "probabilities-sampling"
indexPrefixSeparator = "-"
Expand Down Expand Up @@ -59,7 +59,7 @@ func NewSamplingStore(p SamplingStoreParams) *SamplingStore {
return &SamplingStore{
client: p.Client,
logger: p.Logger,
samplingIndexPrefix: p.prefixIndexName(),
samplingIndexPrefix: p.PrefixedIndexName() + indexPrefixSeparator,
indexDateLayout: p.IndexDateLayout,
maxDocCount: p.MaxDocCount,
indexRolloverFrequency: p.IndexRolloverFrequency,
Expand Down Expand Up @@ -161,11 +161,6 @@ func (s *SamplingStore) writeProbabilitiesAndQPS(indexName string, ts time.Time,
}).Add()
}

func (s *SamplingStore) CreateTemplates(samplingTemplate string) error {
_, err := s.client().CreateTemplate("jaeger-sampling").Body(samplingTemplate).Do(context.Background())
return err
}

func getLatestIndices(indexPrefix, indexDateLayout string, clientFn es.Client, rollover time.Duration, maxDuration time.Duration) ([]string, error) {
ctx := context.Background()
now := time.Now().UTC()
Expand Down Expand Up @@ -200,7 +195,7 @@ func getReadIndices(indexName, indexDateLayout string, startTime time.Time, endT
return indices
}

func (p *SamplingStoreParams) prefixIndexName() string {
func (p *SamplingStoreParams) PrefixedIndexName() string {
if p.IndexPrefix != "" {
return p.IndexPrefix + indexPrefixSeparator + samplingIndex
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/es/samplingstore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestNewIndexPrefix(t *testing.T) {
IndexDateLayout: "2006-01-02",
MaxDocCount: defaultMaxDocCount,
})
assert.Equal(t, test.expected+samplingIndex, r.samplingIndexPrefix)
assert.Equal(t, test.expected+samplingIndex+indexPrefixSeparator, r.samplingIndexPrefix)
})
}
}
Expand Down
57 changes: 12 additions & 45 deletions plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,9 @@ import (
"go.uber.org/zap/zaptest"

"github.com/jaegertracing/jaeger/pkg/config"
estemplate "github.com/jaegertracing/jaeger/pkg/es"
eswrapper "github.com/jaegertracing/jaeger/pkg/es/wrapper"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/plugin/storage/es/mappings"
"github.com/jaegertracing/jaeger/plugin/storage/es/samplingstore"
"github.com/jaegertracing/jaeger/storage/dependencystore"
)

Expand All @@ -63,10 +59,9 @@ const (
type ESStorageIntegration struct {
StorageIntegration

client *elastic.Client
v8Client *elasticsearch8.Client
bulkProcessor *elastic.BulkProcessor
logger *zap.Logger
client *elastic.Client
v8Client *elasticsearch8.Client
logger *zap.Logger
}

func (s *ESStorageIntegration) getVersion() (uint, error) {
Expand Down Expand Up @@ -102,7 +97,6 @@ func (s *ESStorageIntegration) initializeES(t *testing.T, allTagsAsFields bool)
require.NoError(t, err)

s.initSpanstore(t, allTagsAsFields)
s.initSamplingStore(t)

s.CleanUp = func(t *testing.T) {
s.esCleanUp(t, allTagsAsFields)
Expand All @@ -120,48 +114,18 @@ func (s *ESStorageIntegration) esCleanUp(t *testing.T, allTagsAsFields bool) {
s.initSpanstore(t, allTagsAsFields)
}

func (s *ESStorageIntegration) initSamplingStore(t *testing.T) {
client := s.getEsClient(t)
mappingBuilder := mappings.MappingBuilder{
TemplateBuilder: estemplate.TextTemplateBuilder{},
Shards: 5,
Replicas: 1,
EsVersion: client.GetVersion(),
IndexPrefix: indexPrefix,
UseILM: false,
}
clientFn := func() estemplate.Client { return client }
samplingstore := samplingstore.NewSamplingStore(
samplingstore.SamplingStoreParams{
Client: clientFn,
Logger: s.logger,
IndexPrefix: indexPrefix,
IndexDateLayout: indexDateLayout,
MaxDocCount: defaultMaxDocCount,
})
sampleMapping, err := mappingBuilder.GetSamplingMappings()
require.NoError(t, err)
err = samplingstore.CreateTemplates(sampleMapping)
require.NoError(t, err)
s.SamplingStore = samplingstore
}

func (s *ESStorageIntegration) getEsClient(t *testing.T) eswrapper.ClientWrapper {
bp, err := s.client.BulkProcessor().BulkActions(1).FlushInterval(time.Nanosecond).Do(context.Background())
require.NoError(t, err)
s.bulkProcessor = bp
esVersion, err := s.getVersion()
require.NoError(t, err)
return eswrapper.WrapESClient(s.client, bp, esVersion, s.v8Client)
}

func (s *ESStorageIntegration) initializeESFactory(t *testing.T, allTagsAsFields bool) *es.Factory {
s.logger = zaptest.NewLogger(t)
f := es.NewFactory()
v, command := config.Viperize(f.AddFlags)
args := []string{
fmt.Sprintf("--es.tags-as-fields.all=%v", allTagsAsFields),
fmt.Sprintf("--es.num-shards=%v", 5),
fmt.Sprintf("--es.num-replicas=%v", 1),
fmt.Sprintf("--es.index-prefix=%v", indexPrefix),
fmt.Sprintf("--es.use-ilm=%v", false),
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
fmt.Sprintf("--es.tags-as-fields.all=%v", allTagsAsFields),
fmt.Sprintf("--es.bulk.actions=%v", 1),
fmt.Sprintf("--es.bulk.flush-interval=%v", time.Nanosecond),
"--es-archive.enabled=true",
fmt.Sprintf("--es-archive.tags-as-fields.all=%v", allTagsAsFields),
fmt.Sprintf("--es-archive.index-prefix=%v", indexPrefix),
Expand Down Expand Up @@ -193,6 +157,9 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool)
s.DependencyReader, err = f.CreateDependencyReader()
require.NoError(t, err)
s.DependencyWriter = s.DependencyReader.(dependencystore.Writer)

s.SamplingStore, err = f.CreateSamplingStore(1)
require.NoError(t, err)
}

func healthCheck() error {
Expand Down
14 changes: 5 additions & 9 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,16 @@ func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *spanstore.Tr
traces, err = s.SpanReader.FindTraces(context.Background(), query)
require.NoError(t, err)
if len(expected) != len(traces) {
t.Logf("FindTraces: expected: %d, actual: %d", len(expected), len(traces))
t.Logf("Expecting certain number of traces: expected: %d, actual: %d", len(expected), len(traces))
return false
}
if spanCount(expected) != spanCount(traces) {
t.Logf("Excepting certain number of spans: expected: %d, actual: %d", spanCount(expected), spanCount(traces))
return false
}
return true
})
require.True(t, found)
tracesMatch(t, traces, expected)
return traces
}

Expand Down Expand Up @@ -433,13 +436,6 @@ func correctTime(json []byte) []byte {
return []byte(retString)
}

func tracesMatch(t *testing.T, actual []*model.Trace, expected []*model.Trace) bool {
if !assert.Equal(t, len(expected), len(actual), "Expecting certain number of traces") {
return false
}
return assert.Equal(t, spanCount(expected), spanCount(actual), "Expecting certain number of spans")
}

func spanCount(traces []*model.Trace) int {
var count int
for _, trace := range traces {
Expand Down
2 changes: 1 addition & 1 deletion storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/distributedlock"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/storage/dependencystore"
metricsstore "github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand Down
Loading