From 9f97e2dae86606e69222bee866bde7ec7b28a45a Mon Sep 17 00:00:00 2001 From: Sreevani871 Date: Fri, 14 Aug 2020 10:47:31 +0530 Subject: [PATCH 1/5] Fix exitting collector abruptly after Zipkin sever close during explicit shutdown Signed-off-by: Sreevani871 --- cmd/collector/app/collector.go | 1 + cmd/collector/app/server/zipkin.go | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index 29e191043c5..6fc197dd63d 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -110,6 +110,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error { if zkServer, err := server.StartZipkinServer(&server.ZipkinServerParams{ HostPort: builderOpts.CollectorZipkinHTTPHostPort, Handler: c.spanHandlers.ZipkinSpansHandler, + HealthCheck: c.hCheck, AllowedHeaders: builderOpts.CollectorZipkinAllowedHeaders, AllowedOrigins: builderOpts.CollectorZipkinAllowedOrigins, Logger: c.logger, diff --git a/cmd/collector/app/server/zipkin.go b/cmd/collector/app/server/zipkin.go index 70faf00f0ae..b4fca5afe5a 100644 --- a/cmd/collector/app/server/zipkin.go +++ b/cmd/collector/app/server/zipkin.go @@ -76,7 +76,9 @@ func serveZipkin(server *http.Server, listener net.Listener, params *ZipkinServe server.Handler = cors.Handler(recoveryHandler(r)) go func(listener net.Listener, server *http.Server) { if err := server.Serve(listener); err != nil { - params.Logger.Fatal("Could not launch Zipkin server", zap.Error(err)) + if err != http.ErrServerClosed { + params.Logger.Fatal("Could not launch Zipkin server", zap.Error(err)) + } } params.HealthCheck.Set(healthcheck.Unavailable) }(listener, server) From ef1fe2823079bee414653f4e7b950b4e86ce2173 Mon Sep 17 00:00:00 2001 From: Sreevani871 Date: Sat, 8 May 2021 09:43:52 +0530 Subject: [PATCH 2/5] Changed to use startTimeMillis in ES timeRangeTerm query when es.use-alises:true Signed-off-by: Sreevani871 --- plugin/storage/es/spanstore/reader.go | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index e1bee4e6647..21dcbb53433 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -48,6 +48,7 @@ const ( traceIDField = "traceID" durationField = "duration" startTimeField = "startTime" + startTimeMillisField = "startTimeMillis" serviceNameField = "process.serviceName" operationNameField = "operationName" objectTagsField = "tag" @@ -102,6 +103,7 @@ type SpanReader struct { timeRangeIndices timeRangeIndexFn sourceFn sourceFn maxDocCount int + useReadWriteAliases bool } // SpanReaderParams holds constructor params for NewSpanReader @@ -133,6 +135,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters), sourceFn: getSourceFn(p.Archive, p.MaxDocCount), maxDocCount: p.MaxDocCount, + useReadWriteAliases: p.UseReadWriteAliases, } } @@ -352,7 +355,10 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st // i.e starts in one and ends in another. indices := s.timeRangeIndices(s.spanIndexPrefix, s.indexDateLayout, startTime.Add(-time.Hour), endTime.Add(time.Hour)) nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) - + var startTimeRangeQuery elastic.Query + if s.useReadWriteAliases { + startTimeRangeQuery = s.buildStartTimeQuery(startTime.Add(-time.Hour*24), endTime.Add(time.Hour*24)) + } searchAfterTime := make(map[model.TraceID]uint64) totalDocumentsFetched := make(map[model.TraceID]int) tracesMap := make(map[model.TraceID]*model.Trace) @@ -362,13 +368,22 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st } searchRequests := make([]*elastic.SearchRequest, len(traceIDs)) for i, traceID := range traceIDs { - query := buildTraceByIDQuery(traceID) + traceQuery := buildTraceByIDQuery(traceID) + var query *elastic.BoolQuery + if s.useReadWriteAliases { + query = elastic.NewBoolQuery(). + Must(startTimeRangeQuery). + Must(traceQuery) + } else { + query = elastic.NewBoolQuery(). + Must(traceQuery) + } + if val, ok := searchAfterTime[traceID]; ok { nextTime = val } s := s.sourceFn(query, nextTime) - searchRequests[i] = elastic.NewSearchRequest(). IgnoreUnavailable(true). Source(s) @@ -617,7 +632,10 @@ func (s *SpanReader) buildDurationQuery(durationMin time.Duration, durationMax t func (s *SpanReader) buildStartTimeQuery(startTimeMin time.Time, startTimeMax time.Time) elastic.Query { minStartTimeMicros := model.TimeAsEpochMicroseconds(startTimeMin) maxStartTimeMicros := model.TimeAsEpochMicroseconds(startTimeMax) - return elastic.NewRangeQuery(startTimeField).Gte(minStartTimeMicros).Lte(maxStartTimeMicros) + // startTimeMillisField is date field in ES mapping. + // Using date field in range queries helps to skip search on unnecessary shards at Elasticsearch side. + // https://discuss.elastic.co/t/timeline-query-on-timestamped-indices/129328/2 + return elastic.NewRangeQuery(startTimeMillisField).Gte(minStartTimeMicros / 1000).Lte(maxStartTimeMicros / 1000) } func (s *SpanReader) buildServiceNameQuery(serviceName string) elastic.Query { From 132bfe99d475bc2849995dd1ce044bb9da0a4f97 Mon Sep 17 00:00:00 2001 From: Sreevani871 Date: Sat, 8 May 2021 12:11:15 +0530 Subject: [PATCH 3/5] tests fail fixes Signed-off-by: Sreevani871 --- plugin/storage/es/spanstore/reader_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 4668a00ff2d..5f335509932 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -243,15 +243,17 @@ func TestSpanReader_multiRead_followUp_query(t *testing.T) { spanBytesID2, err := json.Marshal(spanID2) require.NoError(t, err) - id1Query := elastic.NewBoolQuery().Should( + traceId1Query := elastic.NewBoolQuery().Should( elastic.NewTermQuery(traceIDField, model.TraceID{High: 0, Low: 1}.String()).Boost(2), elastic.NewTermQuery(traceIDField, fmt.Sprintf("%x", 1))) + id1Query := elastic.NewBoolQuery().Must(traceId1Query) id1Search := elastic.NewSearchRequest(). IgnoreUnavailable(true). Source(r.reader.sourceFn(id1Query, model.TimeAsEpochMicroseconds(date.Add(-time.Hour)))) - id2Query := elastic.NewBoolQuery().Should( + traceId2Query := elastic.NewBoolQuery().Should( elastic.NewTermQuery(traceIDField, model.TraceID{High: 0, Low: 2}.String()).Boost(2), elastic.NewTermQuery(traceIDField, fmt.Sprintf("%x", 2))) + id2Query := elastic.NewBoolQuery().Must(traceId2Query) id2Search := elastic.NewSearchRequest(). IgnoreUnavailable(true). Source(r.reader.sourceFn(id2Query, model.TimeAsEpochMicroseconds(date.Add(-time.Hour)))) @@ -1020,7 +1022,7 @@ func TestSpanReader_buildDurationQuery(t *testing.T) { func TestSpanReader_buildStartTimeQuery(t *testing.T) { expectedStr := `{ "range": - { "startTime": { "include_lower": true, + { "startTimeMillis": { "include_lower": true, "include_upper": true, "from": 1000000, "to": 2000000 } @@ -1036,8 +1038,8 @@ func TestSpanReader_buildStartTimeQuery(t *testing.T) { expected := make(map[string]interface{}) json.Unmarshal([]byte(expectedStr), &expected) // We need to do this because we cannot process a json into uint64. - expected["range"].(map[string]interface{})["startTime"].(map[string]interface{})["from"] = model.TimeAsEpochMicroseconds(startTimeMin) - expected["range"].(map[string]interface{})["startTime"].(map[string]interface{})["to"] = model.TimeAsEpochMicroseconds(startTimeMax) + expected["range"].(map[string]interface{})["startTimeMillis"].(map[string]interface{})["from"] = model.TimeAsEpochMicroseconds(startTimeMin) / 1000 + expected["range"].(map[string]interface{})["startTimeMillis"].(map[string]interface{})["to"] = model.TimeAsEpochMicroseconds(startTimeMax) / 1000 assert.EqualValues(t, expected, actual) }) From f36a9fc42726ad85fc9797896d1e1a35d022342c Mon Sep 17 00:00:00 2001 From: Sreevani871 Date: Sat, 8 May 2021 12:26:04 +0530 Subject: [PATCH 4/5] go-lint errors fix Signed-off-by: Sreevani871 --- plugin/storage/es/spanstore/reader_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index 5f335509932..220445dc7ab 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -243,17 +243,17 @@ func TestSpanReader_multiRead_followUp_query(t *testing.T) { spanBytesID2, err := json.Marshal(spanID2) require.NoError(t, err) - traceId1Query := elastic.NewBoolQuery().Should( + traceID1Query := elastic.NewBoolQuery().Should( elastic.NewTermQuery(traceIDField, model.TraceID{High: 0, Low: 1}.String()).Boost(2), elastic.NewTermQuery(traceIDField, fmt.Sprintf("%x", 1))) - id1Query := elastic.NewBoolQuery().Must(traceId1Query) + id1Query := elastic.NewBoolQuery().Must(traceID1Query) id1Search := elastic.NewSearchRequest(). IgnoreUnavailable(true). Source(r.reader.sourceFn(id1Query, model.TimeAsEpochMicroseconds(date.Add(-time.Hour)))) - traceId2Query := elastic.NewBoolQuery().Should( + traceID2Query := elastic.NewBoolQuery().Should( elastic.NewTermQuery(traceIDField, model.TraceID{High: 0, Low: 2}.String()).Boost(2), elastic.NewTermQuery(traceIDField, fmt.Sprintf("%x", 2))) - id2Query := elastic.NewBoolQuery().Must(traceId2Query) + id2Query := elastic.NewBoolQuery().Must(traceID2Query) id2Search := elastic.NewSearchRequest(). IgnoreUnavailable(true). Source(r.reader.sourceFn(id2Query, model.TimeAsEpochMicroseconds(date.Add(-time.Hour)))) From e1a73e8ebe4494d3a47db5e10a226b814c85f087 Mon Sep 17 00:00:00 2001 From: Sreevani871 Date: Tue, 11 May 2021 09:23:13 +0530 Subject: [PATCH 5/5] simplified code changes Signed-off-by: Sreevani871 --- plugin/storage/es/spanstore/reader.go | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 21dcbb53433..4688f6c6ea7 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -355,10 +355,6 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st // i.e starts in one and ends in another. indices := s.timeRangeIndices(s.spanIndexPrefix, s.indexDateLayout, startTime.Add(-time.Hour), endTime.Add(time.Hour)) nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) - var startTimeRangeQuery elastic.Query - if s.useReadWriteAliases { - startTimeRangeQuery = s.buildStartTimeQuery(startTime.Add(-time.Hour*24), endTime.Add(time.Hour*24)) - } searchAfterTime := make(map[model.TraceID]uint64) totalDocumentsFetched := make(map[model.TraceID]int) tracesMap := make(map[model.TraceID]*model.Trace) @@ -369,14 +365,11 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st searchRequests := make([]*elastic.SearchRequest, len(traceIDs)) for i, traceID := range traceIDs { traceQuery := buildTraceByIDQuery(traceID) - var query *elastic.BoolQuery + query := elastic.NewBoolQuery(). + Must(traceQuery) if s.useReadWriteAliases { - query = elastic.NewBoolQuery(). - Must(startTimeRangeQuery). - Must(traceQuery) - } else { - query = elastic.NewBoolQuery(). - Must(traceQuery) + startTimeRangeQuery := s.buildStartTimeQuery(startTime.Add(-time.Hour*24), endTime.Add(time.Hour*24)) + query = query.Must(startTimeRangeQuery) } if val, ok := searchAfterTime[traceID]; ok {