Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed Range Query to use startTimeMillis date field instead of startTime field #2980

Merged
merged 8 commits into from
May 11, 2021
19 changes: 15 additions & 4 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
traceIDField = "traceID"
durationField = "duration"
startTimeField = "startTime"
startTimeMillisField = "startTimeMillis"
serviceNameField = "process.serviceName"
operationNameField = "operationName"
objectTagsField = "tag"
Expand Down Expand Up @@ -102,6 +103,7 @@ type SpanReader struct {
timeRangeIndices timeRangeIndexFn
sourceFn sourceFn
maxDocCount int
useReadWriteAliases bool
}

// SpanReaderParams holds constructor params for NewSpanReader
Expand Down Expand Up @@ -133,6 +135,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader {
timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters),
sourceFn: getSourceFn(p.Archive, p.MaxDocCount),
maxDocCount: p.MaxDocCount,
useReadWriteAliases: p.UseReadWriteAliases,
}
}

Expand Down Expand Up @@ -352,7 +355,6 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st
// i.e starts in one and ends in another.
indices := s.timeRangeIndices(s.spanIndexPrefix, s.indexDateLayout, startTime.Add(-time.Hour), endTime.Add(time.Hour))
nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour))

searchAfterTime := make(map[model.TraceID]uint64)
totalDocumentsFetched := make(map[model.TraceID]int)
tracesMap := make(map[model.TraceID]*model.Trace)
Expand All @@ -362,13 +364,19 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st
}
searchRequests := make([]*elastic.SearchRequest, len(traceIDs))
for i, traceID := range traceIDs {
query := buildTraceByIDQuery(traceID)
traceQuery := buildTraceByIDQuery(traceID)
query := elastic.NewBoolQuery().
Must(traceQuery)
if s.useReadWriteAliases {
startTimeRangeQuery := s.buildStartTimeQuery(startTime.Add(-time.Hour*24), endTime.Add(time.Hour*24))
query = query.Must(startTimeRangeQuery)
}

if val, ok := searchAfterTime[traceID]; ok {
nextTime = val
}

s := s.sourceFn(query, nextTime)

searchRequests[i] = elastic.NewSearchRequest().
IgnoreUnavailable(true).
Source(s)
Expand Down Expand Up @@ -617,7 +625,10 @@ func (s *SpanReader) buildDurationQuery(durationMin time.Duration, durationMax t
func (s *SpanReader) buildStartTimeQuery(startTimeMin time.Time, startTimeMax time.Time) elastic.Query {
minStartTimeMicros := model.TimeAsEpochMicroseconds(startTimeMin)
maxStartTimeMicros := model.TimeAsEpochMicroseconds(startTimeMax)
return elastic.NewRangeQuery(startTimeField).Gte(minStartTimeMicros).Lte(maxStartTimeMicros)
// startTimeMillisField is date field in ES mapping.
// Using date field in range queries helps to skip search on unnecessary shards at Elasticsearch side.
// https://discuss.elastic.co/t/timeline-query-on-timestamped-indices/129328/2
return elastic.NewRangeQuery(startTimeMillisField).Gte(minStartTimeMicros / 1000).Lte(maxStartTimeMicros / 1000)
}

func (s *SpanReader) buildServiceNameQuery(serviceName string) elastic.Query {
Expand Down
12 changes: 7 additions & 5 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,17 @@ func TestSpanReader_multiRead_followUp_query(t *testing.T) {
spanBytesID2, err := json.Marshal(spanID2)
require.NoError(t, err)

id1Query := elastic.NewBoolQuery().Should(
traceID1Query := elastic.NewBoolQuery().Should(
elastic.NewTermQuery(traceIDField, model.TraceID{High: 0, Low: 1}.String()).Boost(2),
elastic.NewTermQuery(traceIDField, fmt.Sprintf("%x", 1)))
id1Query := elastic.NewBoolQuery().Must(traceID1Query)
id1Search := elastic.NewSearchRequest().
IgnoreUnavailable(true).
Source(r.reader.sourceFn(id1Query, model.TimeAsEpochMicroseconds(date.Add(-time.Hour))))
id2Query := elastic.NewBoolQuery().Should(
traceID2Query := elastic.NewBoolQuery().Should(
elastic.NewTermQuery(traceIDField, model.TraceID{High: 0, Low: 2}.String()).Boost(2),
elastic.NewTermQuery(traceIDField, fmt.Sprintf("%x", 2)))
id2Query := elastic.NewBoolQuery().Must(traceID2Query)
id2Search := elastic.NewSearchRequest().
IgnoreUnavailable(true).
Source(r.reader.sourceFn(id2Query, model.TimeAsEpochMicroseconds(date.Add(-time.Hour))))
Expand Down Expand Up @@ -1020,7 +1022,7 @@ func TestSpanReader_buildDurationQuery(t *testing.T) {
func TestSpanReader_buildStartTimeQuery(t *testing.T) {
expectedStr :=
`{ "range":
{ "startTime": { "include_lower": true,
{ "startTimeMillis": { "include_lower": true,
"include_upper": true,
"from": 1000000,
"to": 2000000 }
Expand All @@ -1036,8 +1038,8 @@ func TestSpanReader_buildStartTimeQuery(t *testing.T) {
expected := make(map[string]interface{})
json.Unmarshal([]byte(expectedStr), &expected)
// We need to do this because we cannot process a json into uint64.
expected["range"].(map[string]interface{})["startTime"].(map[string]interface{})["from"] = model.TimeAsEpochMicroseconds(startTimeMin)
expected["range"].(map[string]interface{})["startTime"].(map[string]interface{})["to"] = model.TimeAsEpochMicroseconds(startTimeMax)
expected["range"].(map[string]interface{})["startTimeMillis"].(map[string]interface{})["from"] = model.TimeAsEpochMicroseconds(startTimeMin) / 1000
expected["range"].(map[string]interface{})["startTimeMillis"].(map[string]interface{})["to"] = model.TimeAsEpochMicroseconds(startTimeMax) / 1000

assert.EqualValues(t, expected, actual)
})
Expand Down