Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ES index name configurable #1009

Merged
merged 9 commits into from
Aug 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Configuration struct {
BulkWorkers int
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
}

// ClientBuilder creates new es.Client
Expand All @@ -49,6 +50,7 @@ type ClientBuilder interface {
GetNumShards() int64
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
GetIndexPrefix() string
}

// NewClient creates a new ElasticSearch client
Expand Down Expand Up @@ -158,6 +160,11 @@ func (c *Configuration) GetMaxSpanAge() time.Duration {
return c.MaxSpanAge
}

// GetIndexPrefix returns index prefix
func (c *Configuration) GetIndexPrefix() string {
return c.IndexPrefix
}

// GetConfigs wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) GetConfigs() []elastic.ClientOptionFunc {
options := make([]elastic.ClientOptionFunc, 3)
Expand Down
39 changes: 22 additions & 17 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
)

const (
dependencyType = "dependencies"
dependencyIndexPrefix = "jaeger-dependencies-"
dependencyType = "dependencies"
dependencyIndex = "jaeger-dependencies-"
)

type timeToDependencies struct {
Expand All @@ -39,23 +39,28 @@ type timeToDependencies struct {

// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
ctx context.Context
client es.Client
logger *zap.Logger
ctx context.Context
client es.Client
logger *zap.Logger
dependencyIndexPrefix string
}

// NewDependencyStore returns a DependencyStore
func NewDependencyStore(client es.Client, logger *zap.Logger) *DependencyStore {
func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string) *DependencyStore {
if indexPrefix != "" {
indexPrefix += ":"
}
return &DependencyStore{
ctx: context.Background(),
client: client,
logger: logger,
ctx: context.Background(),
client: client,
logger: logger,
dependencyIndexPrefix: indexPrefix + dependencyIndex,
}
}

// WriteDependencies implements dependencystore.Writer#WriteDependencies.
func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error {
indexName := indexName(ts)
indexName := indexWithDate(s.dependencyIndexPrefix, ts)
if err := s.createIndex(indexName); err != nil {
return err
}
Expand All @@ -80,7 +85,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
searchResult, err := s.client.Search(getIndices(endTs, lookback)...).
searchResult, err := s.client.Search(getIndices(s.dependencyIndexPrefix, endTs, lookback)...).
Type(dependencyType).
Size(10000). // the default elasticsearch allowed limit
Query(buildTSQuery(endTs, lookback)).
Expand All @@ -107,18 +112,18 @@ func buildTSQuery(endTs time.Time, lookback time.Duration) elastic.Query {
return elastic.NewRangeQuery("timestamp").Gte(endTs.Add(-lookback)).Lte(endTs)
}

func getIndices(ts time.Time, lookback time.Duration) []string {
func getIndices(prefix string, ts time.Time, lookback time.Duration) []string {
var indices []string
firstIndex := indexName(ts.Add(-lookback))
currentIndex := indexName(ts)
firstIndex := indexWithDate(prefix, ts.Add(-lookback))
currentIndex := indexWithDate(prefix, ts)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
ts = ts.Add(-24 * time.Hour)
currentIndex = indexName(ts)
currentIndex = indexWithDate(prefix, ts)
}
return append(indices, firstIndex)
}

func indexName(date time.Time) string {
return dependencyIndexPrefix + date.UTC().Format("2006-01-02")
func indexWithDate(indexNamePrefix string, date time.Time) string {
return indexNamePrefix + date.UTC().Format("2006-01-02")
}
35 changes: 28 additions & 7 deletions plugin/storage/es/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,30 @@ func withDepStorage(fn func(r *depStorageTest)) {
client: client,
logger: logger,
logBuffer: logBuffer,
storage: NewDependencyStore(client, logger),
storage: NewDependencyStore(client, logger, ""),
}
fn(r)
}

var _ dependencystore.Reader = &DependencyStore{} // check API conformance
var _ dependencystore.Writer = &DependencyStore{} // check API conformance

func TestNewSpanReaderIndexPrefix(t *testing.T) {
testCases := []struct {
prefix string
expected string
}{
{prefix: "", expected: ""},
{prefix: "foo", expected: "foo:"},
{prefix: ":", expected: "::"},
}
for _, testCase := range testCases {
client := &mocks.Client{}
r := NewDependencyStore(client, zap.NewNop(), testCase.prefix)
assert.Equal(t, testCase.expected+dependencyIndex, r.dependencyIndexPrefix)
}
}

func TestWriteDependencies(t *testing.T) {
testCases := []struct {
createIndexError error
Expand All @@ -69,7 +85,7 @@ func TestWriteDependencies(t *testing.T) {
for _, testCase := range testCases {
withDepStorage(func(r *depStorageTest) {
fixedTime := time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC)
indexName := indexName(fixedTime)
indexName := indexWithDate("", fixedTime)

indexService := &mocks.IndicesCreateService{}
writeService := &mocks.IndexService{}
Expand Down Expand Up @@ -174,26 +190,31 @@ func TestGetIndices(t *testing.T) {
testCases := []struct {
expected []string
lookback time.Duration
prefix string
}{
{
expected: []string{indexName(fixedTime), indexName(fixedTime.Add(-24 * time.Hour))},
expected: []string{indexWithDate("", fixedTime), indexWithDate("", fixedTime.Add(-24*time.Hour))},
lookback: 23 * time.Hour,
prefix: "",
},
{
expected: []string{indexName(fixedTime), indexName(fixedTime.Add(-24 * time.Hour))},
expected: []string{indexWithDate("", fixedTime), indexWithDate("", fixedTime.Add(-24*time.Hour))},
lookback: 13 * time.Hour,
prefix: "",
},
{
expected: []string{indexName(fixedTime)},
expected: []string{indexWithDate("foo:", fixedTime)},
lookback: 1 * time.Hour,
prefix: "foo:",
},
{
expected: []string{indexName(fixedTime)},
expected: []string{indexWithDate("foo-", fixedTime)},
lookback: 0,
prefix: "foo-",
},
}
for _, testCase := range testCases {
assert.EqualValues(t, testCase.expected, getIndices(fixedTime, testCase.lookback))
assert.EqualValues(t, testCase.expected, getIndices(testCase.prefix, fixedTime, testCase.lookback))
}
}

Expand Down
11 changes: 9 additions & 2 deletions plugin/storage/es/esCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,23 @@

def main():
if len(sys.argv) == 1:
print('USAGE: [TIMEOUT=(default 120)] %s NUM_OF_DAYS HOSTNAME[:PORT] ...' % sys.argv[0])
print('USAGE: [TIMEOUT=(default 120)] [INDEX_PREFIX=(default "")] %s NUM_OF_DAYS HOSTNAME[:PORT] ...' % sys.argv[0])
print('Specify a NUM_OF_DAYS that will delete indices that are older than the given NUM_OF_DAYS.')
print('HOSTNAME ... specifies which ElasticSearch hosts to search and delete indices from.')
print('INDEX_PREFIX ... specifies index prefix.')
sys.exit(1)

client = elasticsearch.Elasticsearch(sys.argv[2:])

ilo = curator.IndexList(client)
empty_list(ilo, 'ElasticSearch has no indices')
ilo.filter_by_regex(kind='prefix', value='jaeger-')

prefix = os.getenv("INDEX_PREFIX", '')
if prefix != '':
prefix += ':'
prefix += 'jaeger'

ilo.filter_by_regex(kind='prefix', value=prefix)
ilo.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=int(sys.argv[1]))
empty_list(ilo, 'No indices to delete')

Expand Down
6 changes: 3 additions & 3 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,16 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
cfg := f.primaryConfig
return esSpanStore.NewSpanReader(f.primaryClient, f.logger, cfg.GetMaxSpanAge(), f.metricsFactory), nil
return esSpanStore.NewSpanReader(f.primaryClient, f.logger, cfg.GetMaxSpanAge(), f.metricsFactory, cfg.GetIndexPrefix()), nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this reached the point of needing its own type SpanReaderParams struct{}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will keep the current form for the simplicity. If it grows we can refactor

}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
cfg := f.primaryConfig
return esSpanStore.NewSpanWriter(f.primaryClient, f.logger, f.metricsFactory, cfg.GetNumShards(), cfg.GetNumReplicas()), nil
return esSpanStore.NewSpanWriter(f.primaryClient, f.logger, f.metricsFactory, cfg.GetNumShards(), cfg.GetNumReplicas(), cfg.GetIndexPrefix()), nil
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return esDepStore.NewDependencyStore(f.primaryClient, f.logger), nil
return esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix()), nil
}
6 changes: 6 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
suffixBulkWorkers = ".bulk.workers"
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixIndexPrefix = ".index-prefix"
)

// TODO this should be moved next to config.Configuration struct (maybe ./flags package)
Expand Down Expand Up @@ -141,6 +142,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixBulkFlushInterval,
nsConfig.BulkFlushInterval,
"A time.Duration after which bulk requests are committed, regardless of other tresholds. Set to zero to disable. By default, this is disabled.")
flagSet.String(
nsConfig.namespace+suffixIndexPrefix,
nsConfig.IndexPrefix,
"Optional prefix of Jaeger indices. For example \"production\" creates \"production:jaeger-*\".")
}

// InitFromViper initializes Options with properties from viper
Expand All @@ -163,6 +168,7 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.BulkWorkers = v.GetInt(cfg.namespace + suffixBulkWorkers)
cfg.BulkActions = v.GetInt(cfg.namespace + suffixBulkActions)
cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval)
cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix)
}

// GetPrimary returns primary configuration.
Expand Down
37 changes: 20 additions & 17 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import (
)

const (
spanIndexPrefix = "jaeger-span-"
serviceIndexPrefix = "jaeger-service-"
spanIndex = "jaeger-span-"
serviceIndex = "jaeger-service-"
traceIDAggregation = "traceIDs"

traceIDField = "traceID"
Expand Down Expand Up @@ -88,21 +88,28 @@ type SpanReader struct {
// this will be rounded down to UTC 00:00 of that day.
maxLookback time.Duration
serviceOperationStorage *ServiceOperationStorage
spanIndexPrefix string
serviceIndexPrefix string
}

// NewSpanReader returns a new SpanReader with a metrics.
func NewSpanReader(client es.Client, logger *zap.Logger, maxLookback time.Duration, metricsFactory metrics.Factory) spanstore.Reader {
return storageMetrics.NewReadMetricsDecorator(newSpanReader(client, logger, maxLookback), metricsFactory)
func NewSpanReader(client es.Client, logger *zap.Logger, maxLookback time.Duration, metricsFactory metrics.Factory, indexPrefix string) spanstore.Reader {
return storageMetrics.NewReadMetricsDecorator(newSpanReader(client, logger, maxLookback, indexPrefix), metricsFactory)
}

func newSpanReader(client es.Client, logger *zap.Logger, maxLookback time.Duration) *SpanReader {
func newSpanReader(client es.Client, logger *zap.Logger, maxLookback time.Duration, indexPrefix string) *SpanReader {
ctx := context.Background()
if indexPrefix != "" {
indexPrefix += ":"
}
return &SpanReader{
ctx: ctx,
client: client,
logger: logger,
maxLookback: maxLookback,
serviceOperationStorage: NewServiceOperationStorage(ctx, client, metrics.NullFactory, logger, 0), // the decorator takes care of metrics
spanIndexPrefix: indexPrefix + spanIndex,
serviceIndexPrefix: indexPrefix + serviceIndex,
}
}

Expand Down Expand Up @@ -147,33 +154,29 @@ func (s *SpanReader) unmarshalJSONSpan(esSpanRaw *elastic.SearchHit) (*jModel.Sp
}

// Returns the array of indices that we need to query, based on query params
func findIndices(prefix string, startTime time.Time, endTime time.Time) []string {
func (s *SpanReader) indicesForTimeRange(indexName string, startTime time.Time, endTime time.Time) []string {
var indices []string
firstIndex := indexWithDate(prefix, startTime)
currentIndex := indexWithDate(prefix, endTime)
firstIndex := indexWithDate(indexName, startTime)
currentIndex := indexWithDate(indexName, endTime)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
endTime = endTime.Add(-24 * time.Hour)
currentIndex = indexWithDate(prefix, endTime)
currentIndex = indexWithDate(indexName, endTime)
}
return append(indices, firstIndex)
}

func indexWithDate(prefix string, date time.Time) string {
return prefix + date.UTC().Format("2006-01-02")
}

// GetServices returns all services traced by Jaeger, ordered by frequency
func (s *SpanReader) GetServices() ([]string, error) {
currentTime := time.Now()
jaegerIndices := findIndices(serviceIndexPrefix, currentTime.Add(-s.maxLookback), currentTime)
jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxLookback), currentTime)
return s.serviceOperationStorage.getServices(jaegerIndices)
}

// GetOperations returns all operations for a specific service traced by Jaeger
func (s *SpanReader) GetOperations(service string) ([]string, error) {
currentTime := time.Now()
jaegerIndices := findIndices(serviceIndexPrefix, currentTime.Add(-s.maxLookback), currentTime)
jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxLookback), currentTime)
return s.serviceOperationStorage.getOperations(jaegerIndices, service)
}

Expand Down Expand Up @@ -214,7 +217,7 @@ func (s *SpanReader) multiRead(traceIDs []string, startTime, endTime time.Time)
var traces []*model.Trace
// Add an hour in both directions so that traces that straddle two indexes are retrieved.
// i.e starts in one and ends in another.
indices := findIndices(spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour))
indices := s.indicesForTimeRange(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour))

nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour))

Expand Down Expand Up @@ -347,7 +350,7 @@ func (s *SpanReader) findTraceIDs(traceQuery *spanstore.TraceQueryParameters) ([
aggregation := s.buildTraceIDAggregation(traceQuery.NumTraces)
boolQuery := s.buildFindTraceIDsQuery(traceQuery)

jaegerIndices := findIndices(spanIndexPrefix, traceQuery.StartTimeMin, traceQuery.StartTimeMax)
jaegerIndices := s.indicesForTimeRange(s.spanIndexPrefix, traceQuery.StartTimeMin, traceQuery.StartTimeMax)

searchService := s.client.Search(jaegerIndices...).
Type(spanType).
Expand Down
Loading