diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index e1bee4e6647..4688f6c6ea7 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -48,6 +48,7 @@ const ( traceIDField = "traceID" durationField = "duration" startTimeField = "startTime" + startTimeMillisField = "startTimeMillis" serviceNameField = "process.serviceName" operationNameField = "operationName" objectTagsField = "tag" @@ -102,6 +103,7 @@ type SpanReader struct { timeRangeIndices timeRangeIndexFn sourceFn sourceFn maxDocCount int + useReadWriteAliases bool } // SpanReaderParams holds constructor params for NewSpanReader @@ -133,6 +135,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters), sourceFn: getSourceFn(p.Archive, p.MaxDocCount), maxDocCount: p.MaxDocCount, + useReadWriteAliases: p.UseReadWriteAliases, } } @@ -352,7 +355,6 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st // i.e starts in one and ends in another. indices := s.timeRangeIndices(s.spanIndexPrefix, s.indexDateLayout, startTime.Add(-time.Hour), endTime.Add(time.Hour)) nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) - searchAfterTime := make(map[model.TraceID]uint64) totalDocumentsFetched := make(map[model.TraceID]int) tracesMap := make(map[model.TraceID]*model.Trace) @@ -362,13 +364,19 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st } searchRequests := make([]*elastic.SearchRequest, len(traceIDs)) for i, traceID := range traceIDs { - query := buildTraceByIDQuery(traceID) + traceQuery := buildTraceByIDQuery(traceID) + query := elastic.NewBoolQuery(). + Must(traceQuery) + if s.useReadWriteAliases { + startTimeRangeQuery := s.buildStartTimeQuery(startTime.Add(-time.Hour*24), endTime.Add(time.Hour*24)) + query = query.Must(startTimeRangeQuery) + } + if val, ok := searchAfterTime[traceID]; ok { nextTime = val } s := s.sourceFn(query, nextTime) - searchRequests[i] = elastic.NewSearchRequest(). IgnoreUnavailable(true). Source(s) @@ -617,7 +625,10 @@ func (s *SpanReader) buildDurationQuery(durationMin time.Duration, durationMax t func (s *SpanReader) buildStartTimeQuery(startTimeMin time.Time, startTimeMax time.Time) elastic.Query { minStartTimeMicros := model.TimeAsEpochMicroseconds(startTimeMin) maxStartTimeMicros := model.TimeAsEpochMicroseconds(startTimeMax) - return elastic.NewRangeQuery(startTimeField).Gte(minStartTimeMicros).Lte(maxStartTimeMicros) + // startTimeMillisField is date field in ES mapping. + // Using date field in range queries helps to skip search on unnecessary shards at Elasticsearch side. + // https://discuss.elastic.co/t/timeline-query-on-timestamped-indices/129328/2 + return elastic.NewRangeQuery(startTimeMillisField).Gte(minStartTimeMicros / 1000).Lte(maxStartTimeMicros / 1000) } func (s *SpanReader) buildServiceNameQuery(serviceName string) elastic.Query { diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 4668a00ff2d..220445dc7ab 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -243,15 +243,17 @@ func TestSpanReader_multiRead_followUp_query(t *testing.T) { spanBytesID2, err := json.Marshal(spanID2) require.NoError(t, err) - id1Query := elastic.NewBoolQuery().Should( + traceID1Query := elastic.NewBoolQuery().Should( elastic.NewTermQuery(traceIDField, model.TraceID{High: 0, Low: 1}.String()).Boost(2), elastic.NewTermQuery(traceIDField, fmt.Sprintf("%x", 1))) + id1Query := elastic.NewBoolQuery().Must(traceID1Query) id1Search := elastic.NewSearchRequest(). IgnoreUnavailable(true). Source(r.reader.sourceFn(id1Query, model.TimeAsEpochMicroseconds(date.Add(-time.Hour)))) - id2Query := elastic.NewBoolQuery().Should( + traceID2Query := elastic.NewBoolQuery().Should( elastic.NewTermQuery(traceIDField, model.TraceID{High: 0, Low: 2}.String()).Boost(2), elastic.NewTermQuery(traceIDField, fmt.Sprintf("%x", 2))) + id2Query := elastic.NewBoolQuery().Must(traceID2Query) id2Search := elastic.NewSearchRequest(). IgnoreUnavailable(true). Source(r.reader.sourceFn(id2Query, model.TimeAsEpochMicroseconds(date.Add(-time.Hour)))) @@ -1020,7 +1022,7 @@ func TestSpanReader_buildDurationQuery(t *testing.T) { func TestSpanReader_buildStartTimeQuery(t *testing.T) { expectedStr := `{ "range": - { "startTime": { "include_lower": true, + { "startTimeMillis": { "include_lower": true, "include_upper": true, "from": 1000000, "to": 2000000 } @@ -1036,8 +1038,8 @@ func TestSpanReader_buildStartTimeQuery(t *testing.T) { expected := make(map[string]interface{}) json.Unmarshal([]byte(expectedStr), &expected) // We need to do this because we cannot process a json into uint64. - expected["range"].(map[string]interface{})["startTime"].(map[string]interface{})["from"] = model.TimeAsEpochMicroseconds(startTimeMin) - expected["range"].(map[string]interface{})["startTime"].(map[string]interface{})["to"] = model.TimeAsEpochMicroseconds(startTimeMax) + expected["range"].(map[string]interface{})["startTimeMillis"].(map[string]interface{})["from"] = model.TimeAsEpochMicroseconds(startTimeMin) / 1000 + expected["range"].(map[string]interface{})["startTimeMillis"].(map[string]interface{})["to"] = model.TimeAsEpochMicroseconds(startTimeMax) / 1000 assert.EqualValues(t, expected, actual) })