Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <[email protected]>
  • Loading branch information
pavolloffay committed Aug 22, 2018
1 parent d913b29 commit ddd8ad0
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 83 deletions.
30 changes: 15 additions & 15 deletions plugin/storage/es/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ type timeToDependencies struct {

// DependencyStore handles all queries and insertions to ElasticSearch dependencies
type DependencyStore struct {
ctx context.Context
client es.Client
logger *zap.Logger
indexPrefix string
ctx context.Context
client es.Client
logger *zap.Logger
dependencyIndexPrefix string
}

// NewDependencyStore returns a DependencyStore
Expand All @@ -51,16 +51,16 @@ func NewDependencyStore(client es.Client, logger *zap.Logger, indexPrefix string
indexPrefix += ":"
}
return &DependencyStore{
ctx: context.Background(),
client: client,
logger: logger,
indexPrefix: indexPrefix,
ctx: context.Background(),
client: client,
logger: logger,
dependencyIndexPrefix: indexPrefix + dependencyIndex,
}
}

// WriteDependencies implements dependencystore.Writer#WriteDependencies.
func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.DependencyLink) error {
indexName := indexName(s.indexPrefix, ts)
indexName := indexWithDate(s.dependencyIndexPrefix, ts)
if err := s.createIndex(indexName); err != nil {
return err
}
Expand All @@ -85,7 +85,7 @@ func (s *DependencyStore) writeDependencies(indexName string, ts time.Time, depe

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
searchResult, err := s.client.Search(getIndices(s.indexPrefix, endTs, lookback)...).
searchResult, err := s.client.Search(getIndices(s.dependencyIndexPrefix, endTs, lookback)...).
Type(dependencyType).
Size(10000). // the default elasticsearch allowed limit
Query(buildTSQuery(endTs, lookback)).
Expand Down Expand Up @@ -114,16 +114,16 @@ func buildTSQuery(endTs time.Time, lookback time.Duration) elastic.Query {

func getIndices(prefix string, ts time.Time, lookback time.Duration) []string {
var indices []string
firstIndex := indexName(prefix, ts.Add(-lookback))
currentIndex := indexName(prefix, ts)
firstIndex := indexWithDate(prefix, ts.Add(-lookback))
currentIndex := indexWithDate(prefix, ts)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
ts = ts.Add(-24 * time.Hour)
currentIndex = indexName(prefix, ts)
currentIndex = indexWithDate(prefix, ts)
}
return append(indices, firstIndex)
}

func indexName(prefix string, date time.Time) string {
return prefix + dependencyIndex + date.UTC().Format("2006-01-02")
func indexWithDate(indexNamePrefix string, date time.Time) string {
return indexNamePrefix + date.UTC().Format("2006-01-02")
}
12 changes: 6 additions & 6 deletions plugin/storage/es/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) {
for _, testCase := range testCases {
client := &mocks.Client{}
r := NewDependencyStore(client, zap.NewNop(), testCase.prefix)
assert.Equal(t, testCase.expected, r.indexPrefix)
assert.Equal(t, testCase.expected+dependencyIndex, r.dependencyIndexPrefix)
}
}

Expand All @@ -85,7 +85,7 @@ func TestWriteDependencies(t *testing.T) {
for _, testCase := range testCases {
withDepStorage(func(r *depStorageTest) {
fixedTime := time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC)
indexName := indexName("", fixedTime)
indexName := indexWithDate("", fixedTime)

indexService := &mocks.IndicesCreateService{}
writeService := &mocks.IndexService{}
Expand Down Expand Up @@ -193,22 +193,22 @@ func TestGetIndices(t *testing.T) {
prefix string
}{
{
expected: []string{indexName("", fixedTime), indexName("", fixedTime.Add(-24*time.Hour))},
expected: []string{indexWithDate("", fixedTime), indexWithDate("", fixedTime.Add(-24*time.Hour))},
lookback: 23 * time.Hour,
prefix: "",
},
{
expected: []string{indexName("", fixedTime), indexName("", fixedTime.Add(-24*time.Hour))},
expected: []string{indexWithDate("", fixedTime), indexWithDate("", fixedTime.Add(-24*time.Hour))},
lookback: 13 * time.Hour,
prefix: "",
},
{
expected: []string{indexName("foo:", fixedTime)},
expected: []string{indexWithDate("foo:", fixedTime)},
lookback: 1 * time.Hour,
prefix: "foo:",
},
{
expected: []string{indexName("foo-", fixedTime)},
expected: []string{indexWithDate("foo-", fixedTime)},
lookback: 0,
prefix: "foo-",
},
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/es/esCleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def main():
if len(sys.argv) == 1:
print('USAGE: [TIMEOUT=(default 120)] [INDEX_PREFIX=(default \'\')] %s NUM_OF_DAYS HOSTNAME[:PORT] ...' % sys.argv[0])
print('USAGE: [TIMEOUT=(default 120)] [INDEX_PREFIX=(default "")] %s NUM_OF_DAYS HOSTNAME[:PORT] ...' % sys.argv[0])
print('Specify a NUM_OF_DAYS that will delete indices that are older than the given NUM_OF_DAYS.')
print('HOSTNAME ... specifies which ElasticSearch hosts to search and delete indices from.')
print('INDEX_PREFIX ... specifies index prefix.')
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
flagSet.String(
nsConfig.namespace+suffixIndexPrefix,
nsConfig.IndexPrefix,
"The prefix of Jaeger indices. For example \"production\" creates \"production:jaeger-*\". (Default unset)")
"Optional prefix of Jaeger indices. For example \"production\" creates \"production:jaeger-*\".")
}

// InitFromViper initializes Options with properties from viper
Expand Down
26 changes: 12 additions & 14 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ type SpanReader struct {
// this will be rounded down to UTC 00:00 of that day.
maxLookback time.Duration
serviceOperationStorage *ServiceOperationStorage
indexPrefix string
spanIndexPrefix string
serviceIndexPrefix string
}

// NewSpanReader returns a new SpanReader with a metrics.
Expand All @@ -107,7 +108,8 @@ func newSpanReader(client es.Client, logger *zap.Logger, maxLookback time.Durati
logger: logger,
maxLookback: maxLookback,
serviceOperationStorage: NewServiceOperationStorage(ctx, client, metrics.NullFactory, logger, 0), // the decorator takes care of metrics
indexPrefix: indexPrefix,
spanIndexPrefix: indexPrefix + spanIndex,
serviceIndexPrefix: indexPrefix + serviceIndex,
}
}

Expand Down Expand Up @@ -152,33 +154,29 @@ func (s *SpanReader) unmarshalJSONSpan(esSpanRaw *elastic.SearchHit) (*jModel.Sp
}

// Returns the array of indices that we need to query, based on query params
func (s *SpanReader) findIndices(indexName string, startTime time.Time, endTime time.Time) []string {
func (s *SpanReader) indicesForTimeRange(indexName string, startTime time.Time, endTime time.Time) []string {
var indices []string
firstIndex := indexWithDate(s.indexPrefix, indexName, startTime)
currentIndex := indexWithDate(s.indexPrefix, indexName, endTime)
firstIndex := indexWithDate(indexName, startTime)
currentIndex := indexWithDate(indexName, endTime)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
endTime = endTime.Add(-24 * time.Hour)
currentIndex = indexWithDate(s.indexPrefix, indexName, endTime)
currentIndex = indexWithDate(indexName, endTime)
}
return append(indices, firstIndex)
}

func indexWithDate(prefix string, indexType string, date time.Time) string {
return prefix + indexType + date.UTC().Format("2006-01-02")
}

// GetServices returns all services traced by Jaeger, ordered by frequency
func (s *SpanReader) GetServices() ([]string, error) {
currentTime := time.Now()
jaegerIndices := s.findIndices(serviceIndex, currentTime.Add(-s.maxLookback), currentTime)
jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxLookback), currentTime)
return s.serviceOperationStorage.getServices(jaegerIndices)
}

// GetOperations returns all operations for a specific service traced by Jaeger
func (s *SpanReader) GetOperations(service string) ([]string, error) {
currentTime := time.Now()
jaegerIndices := s.findIndices(serviceIndex, currentTime.Add(-s.maxLookback), currentTime)
jaegerIndices := s.indicesForTimeRange(s.serviceIndexPrefix, currentTime.Add(-s.maxLookback), currentTime)
return s.serviceOperationStorage.getOperations(jaegerIndices, service)
}

Expand Down Expand Up @@ -219,7 +217,7 @@ func (s *SpanReader) multiRead(traceIDs []string, startTime, endTime time.Time)
var traces []*model.Trace
// Add an hour in both directions so that traces that straddle two indexes are retrieved.
// i.e starts in one and ends in another.
indices := s.findIndices(spanIndex, startTime.Add(-time.Hour), endTime.Add(time.Hour))
indices := s.indicesForTimeRange(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour))

nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour))

Expand Down Expand Up @@ -352,7 +350,7 @@ func (s *SpanReader) findTraceIDs(traceQuery *spanstore.TraceQueryParameters) ([
aggregation := s.buildTraceIDAggregation(traceQuery.NumTraces)
boolQuery := s.buildFindTraceIDsQuery(traceQuery)

jaegerIndices := s.findIndices(spanIndex, traceQuery.StartTimeMin, traceQuery.StartTimeMax)
jaegerIndices := s.indicesForTimeRange(s.spanIndexPrefix, traceQuery.StartTimeMin, traceQuery.StartTimeMax)

searchService := s.client.Search(jaegerIndices...).
Type(spanType).
Expand Down
38 changes: 10 additions & 28 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func TestNewSpanReaderIndexPrefix(t *testing.T) {
for _, testCase := range testCases {
client := &mocks.Client{}
r := newSpanReader(client, zap.NewNop(), 0, testCase.prefix)
assert.Equal(t, testCase.expected, r.indexPrefix)
assert.Equal(t, testCase.expected+spanIndex, r.spanIndexPrefix)
assert.Equal(t, testCase.expected+serviceIndex, r.serviceIndexPrefix)
}
}

Expand Down Expand Up @@ -288,25 +289,6 @@ func TestSpanReader_esJSONtoJSONSpanModelError(t *testing.T) {
})
}

func TestWriterReaderIndexNames(t *testing.T) {
testCases := []struct {
prefix string
}{
{prefix: ""},
{prefix: "foo"},
{prefix: "--"},
}

span := &model.Span{StartTime: time.Now()}
for _, testCase := range testCases {
spanIndexWrite, serviceIndexWrite := indexNames(testCase.prefix, span)
readSpanIndex := indexWithDate(testCase.prefix, spanIndex, span.StartTime)
readServiceIndex := indexWithDate(testCase.prefix, serviceIndex, span.StartTime)
assert.EqualValues(t, spanIndexWrite, readSpanIndex)
assert.EqualValues(t, serviceIndexWrite, readServiceIndex)
}
}

func TestSpanReaderFindIndices(t *testing.T) {
today := time.Date(1995, time.April, 21, 4, 12, 19, 95, time.UTC)
yesterday := today.AddDate(0, 0, -1)
Expand All @@ -321,38 +303,38 @@ func TestSpanReaderFindIndices(t *testing.T) {
startTime: today.Add(-time.Millisecond),
endTime: today,
expected: []string{
indexWithDate("", spanIndex, today),
indexWithDate(spanIndex, today),
},
},
{
startTime: today.Add(-13 * time.Hour),
endTime: today,
expected: []string{
indexWithDate("", spanIndex, today),
indexWithDate("", spanIndex, yesterday),
indexWithDate(spanIndex, today),
indexWithDate(spanIndex, yesterday),
},
},
{
startTime: today.Add(-48 * time.Hour),
endTime: today,
expected: []string{
indexWithDate("", spanIndex, today),
indexWithDate("", spanIndex, yesterday),
indexWithDate("", spanIndex, twoDaysAgo),
indexWithDate(spanIndex, today),
indexWithDate(spanIndex, yesterday),
indexWithDate(spanIndex, twoDaysAgo),
},
},
}
withSpanReader(func(r *spanReaderTest) {
for _, testCase := range testCases {
actual := r.reader.findIndices(spanIndex, testCase.startTime, testCase.endTime)
actual := r.reader.indicesForTimeRange(spanIndex, testCase.startTime, testCase.endTime)
assert.EqualValues(t, testCase.expected, actual)
}
})
}

func TestSpanReader_indexWithDate(t *testing.T) {
withSpanReader(func(r *spanReaderTest) {
actual := indexWithDate("", spanIndex, time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC))
actual := indexWithDate(spanIndex, time.Date(1995, time.April, 21, 4, 21, 19, 95, time.UTC))
assert.Equal(t, "jaeger-span-1995-04-21", actual)
})
}
Expand Down
36 changes: 20 additions & 16 deletions plugin/storage/es/spanstore/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,16 @@ type serviceWriter func(string, *jModel.Span)

// SpanWriter is a wrapper around elastic.Client
type SpanWriter struct {
ctx context.Context
client es.Client
logger *zap.Logger
writerMetrics spanWriterMetrics // TODO: build functions to wrap around each Do fn
indexCache cache.Cache
serviceWriter serviceWriter
numShards int64
numReplicas int64
indexPrefix string
ctx context.Context
client es.Client
logger *zap.Logger
writerMetrics spanWriterMetrics // TODO: build functions to wrap around each Do fn
indexCache cache.Cache
serviceWriter serviceWriter
numShards int64
numReplicas int64
spanIndexPrefix string
serviceIndexPrefix string
}

// Service is the JSON struct for service:operation documents in ElasticSearch
Expand Down Expand Up @@ -116,15 +117,18 @@ func NewSpanWriter(
TTL: 48 * time.Hour,
},
),
numShards: numShards,
numReplicas: numReplicas,
indexPrefix: indexPrefix,
numShards: numShards,
numReplicas: numReplicas,
spanIndexPrefix: indexPrefix + spanIndex,
serviceIndexPrefix: indexPrefix + serviceIndex,
}
}

// WriteSpan writes a span and its corresponding service:operation in ElasticSearch
func (s *SpanWriter) WriteSpan(span *model.Span) error {
spanIndexName, serviceIndexName := indexNames(s.indexPrefix, span)
spanIndexName := indexWithDate(s.spanIndexPrefix, span.StartTime)
serviceIndexName := indexWithDate(s.serviceIndexPrefix, span.StartTime)

// Convert model.Span into json.Span
jsonSpan := json.FromDomainEmbedProcess(span)

Expand All @@ -144,9 +148,9 @@ func (s *SpanWriter) Close() error {
return s.client.Close()
}

func indexNames(prefix string, span *model.Span) (string, string) {
spanDate := span.StartTime.UTC().Format("2006-01-02")
return prefix + spanIndex + spanDate, prefix + serviceIndex + spanDate
func indexWithDate(indexPrefix string, date time.Time) string {
spanDate := date.UTC().Format("2006-01-02")
return indexPrefix + spanDate
}

func (s *SpanWriter) createIndex(indexName string, mapping string, jsonSpan *jModel.Span) error {
Expand Down
6 changes: 4 additions & 2 deletions plugin/storage/es/spanstore/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func TestNewSpanWriterIndexPrefix(t *testing.T) {
metricsFactory := metrics.NewLocalFactory(0)
for _, testCase := range testCases {
w := NewSpanWriter(client, logger, metricsFactory, 0, 0, testCase.prefix)
assert.Equal(t, testCase.expected, w.indexPrefix)
assert.Equal(t, testCase.expected+spanIndex, w.spanIndexPrefix)
assert.Equal(t, testCase.expected+serviceIndex, w.serviceIndexPrefix)
}
}

Expand Down Expand Up @@ -219,7 +220,8 @@ func TestSpanIndexName(t *testing.T) {
span := &model.Span{
StartTime: date,
}
spanIndexName, serviceIndexName := indexNames("", span)
spanIndexName := indexWithDate(spanIndex, span.StartTime)
serviceIndexName := indexWithDate(serviceIndex, span.StartTime)
assert.Equal(t, "jaeger-span-1995-04-21", spanIndexName)
assert.Equal(t, "jaeger-service-1995-04-21", serviceIndexName)
}
Expand Down

0 comments on commit ddd8ad0

Please sign in to comment.