From ba4efcf1a479e0c01ec2729d31746d5152ca5921 Mon Sep 17 00:00:00 2001 From: Jude Wang Date: Mon, 25 Mar 2019 13:06:58 -0400 Subject: [PATCH 01/13] adding endpoint metrics Signed-off-by: Jude Wang --- cmd/collector/app/metrics.go | 34 +++++++++++++++--------- cmd/collector/app/span_processor.go | 3 ++- cmd/collector/app/span_processor_test.go | 1 - 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/cmd/collector/app/metrics.go b/cmd/collector/app/metrics.go index dceb441dc11..162e0ac53cb 100644 --- a/cmd/collector/app/metrics.go +++ b/cmd/collector/app/metrics.go @@ -25,6 +25,8 @@ import ( const ( maxServiceNames = 2000 otherServices = "other-services" + GrpcEndpoint = "gRPC" + HttpEndpoint = "HTTP" ) // SpanProcessorMetrics contains all the necessary metrics for the SpanProcessor @@ -41,10 +43,11 @@ type SpanProcessorMetrics struct { // QueueLength measures the size of the internal span queue QueueLength metrics.Gauge // SavedOkBySvc contains span and trace counts by service - SavedOkBySvc metricsBySvc // spans actually saved - SavedErrBySvc metricsBySvc // spans failed to save - serviceNames metrics.Gauge // total number of unique service name metrics reported by this collector - spanCounts map[string]CountsBySpanType + SavedOkBySvc metricsBySvc // spans actually saved + SavedErrBySvc metricsBySvc // spans failed to save + serviceNames metrics.Gauge // total number of unique service name metrics reported by this collector + spanCounts map[string]CountsBySpanType + countsByEndpoints map[string]metrics.Counter // count of spans processed from different endpoints (Http, TChannel, gRPC) } type countsBySvc struct { @@ -79,16 +82,21 @@ func NewSpanProcessorMetrics(serviceMetrics metrics.Factory, hostMetrics metrics for _, otherFormatType := range otherFormatTypes { spanCounts[otherFormatType] = newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": otherFormatType}})) } + countsByEndpoints := map[string]metrics.Counter{ + GrpcEndpoint: serviceMetrics.Counter(metrics.Options{Name: "grpc-endpoint", Tags: nil}), + HttpEndpoint: serviceMetrics.Counter(metrics.Options{Name: "http-endpoint", Tags: nil}), + } m := &SpanProcessorMetrics{ - SaveLatency: hostMetrics.Timer(metrics.TimerOptions{Name: "save-latency", Tags: nil}), - InQueueLatency: hostMetrics.Timer(metrics.TimerOptions{Name: "in-queue-latency", Tags: nil}), - SpansDropped: hostMetrics.Counter(metrics.Options{Name: "spans.dropped", Tags: nil}), - BatchSize: hostMetrics.Gauge(metrics.Options{Name: "batch-size", Tags: nil}), - QueueLength: hostMetrics.Gauge(metrics.Options{Name: "queue-length", Tags: nil}), - SavedOkBySvc: newMetricsBySvc(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"result": "ok"}}), "saved-by-svc"), - SavedErrBySvc: newMetricsBySvc(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"result": "err"}}), "saved-by-svc"), - spanCounts: spanCounts, - serviceNames: hostMetrics.Gauge(metrics.Options{Name: "spans.serviceNames", Tags: nil}), + SaveLatency: hostMetrics.Timer(metrics.TimerOptions{Name: "save-latency", Tags: nil}), + InQueueLatency: hostMetrics.Timer(metrics.TimerOptions{Name: "in-queue-latency", Tags: nil}), + SpansDropped: hostMetrics.Counter(metrics.Options{Name: "spans.dropped", Tags: nil}), + BatchSize: hostMetrics.Gauge(metrics.Options{Name: "batch-size", Tags: nil}), + QueueLength: hostMetrics.Gauge(metrics.Options{Name: "queue-length", Tags: nil}), + SavedOkBySvc: newMetricsBySvc(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"result": "ok"}}), "saved-by-svc"), + SavedErrBySvc: newMetricsBySvc(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"result": "err"}}), "saved-by-svc"), + spanCounts: spanCounts, + serviceNames: hostMetrics.Gauge(metrics.Options{Name: "spans.serviceNames", Tags: nil}), + countsByEndpoints: countsByEndpoints, } return m diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index c0bfa7b404c..9b41ce97458 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -17,7 +17,7 @@ package app import ( "time" - "github.com/uber/tchannel-go" + tchannel "github.com/uber/tchannel-go" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer" @@ -124,6 +124,7 @@ func (sp *spanProcessor) saveSpan(span *model.Span) { func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options ProcessSpansOptions) ([]bool, error) { sp.preProcessSpans(mSpans) sp.metrics.BatchSize.Update(int64(len(mSpans))) + sp.metrics.countsByEndpoints[options.InboundTransport].Inc(int64(len(mSpans))) retMe := make([]bool, len(mSpans)) for i, mSpan := range mSpans { ok := sp.enqueueSpan(mSpan, options.SpanFormat) diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index 4891bfbcf1f..3a4185b6c6c 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -237,7 +237,6 @@ func TestSpanProcessorErrors(t *testing.T) { }, }, }, ProcessSpansOptions{SpanFormat: JaegerFormatType}) - assert.NoError(t, err) assert.Equal(t, []bool{true}, res) From 1403e0e5480a5239e77d0d53e3ac68cac81c0c28 Mon Sep 17 00:00:00 2001 From: Jude Wang Date: Tue, 26 Mar 2019 12:27:33 -0400 Subject: [PATCH 02/13] fix linter issue Signed-off-by: Jude Wang --- cmd/collector/app/metrics.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/cmd/collector/app/metrics.go b/cmd/collector/app/metrics.go index 162e0ac53cb..2677a004437 100644 --- a/cmd/collector/app/metrics.go +++ b/cmd/collector/app/metrics.go @@ -25,8 +25,10 @@ import ( const ( maxServiceNames = 2000 otherServices = "other-services" - GrpcEndpoint = "gRPC" - HttpEndpoint = "HTTP" + // GrpcEndpoint is the key name for gRPC endpoint metric + GrpcEndpoint = "gRPC" + // HTTPEndpoint is the key name for HTTP endpoint metric + HTTPEndpoint = "HTTP" ) // SpanProcessorMetrics contains all the necessary metrics for the SpanProcessor @@ -84,7 +86,7 @@ func NewSpanProcessorMetrics(serviceMetrics metrics.Factory, hostMetrics metrics } countsByEndpoints := map[string]metrics.Counter{ GrpcEndpoint: serviceMetrics.Counter(metrics.Options{Name: "grpc-endpoint", Tags: nil}), - HttpEndpoint: serviceMetrics.Counter(metrics.Options{Name: "http-endpoint", Tags: nil}), + HTTPEndpoint: serviceMetrics.Counter(metrics.Options{Name: "http-endpoint", Tags: nil}), } m := &SpanProcessorMetrics{ SaveLatency: hostMetrics.Timer(metrics.TimerOptions{Name: "save-latency", Tags: nil}), From 667384396d6da85d309e8dd3ddab19a975170270 Mon Sep 17 00:00:00 2001 From: Jude Wang Date: Mon, 1 Apr 2019 14:19:36 -0400 Subject: [PATCH 03/13] refactor span_handler to emit endpoint metrics Signed-off-by: Jude Wang --- cmd/collector/app/grpc_handler.go | 2 +- cmd/collector/app/http_handler.go | 2 +- cmd/collector/app/metrics.go | 54 +++++++++---------- cmd/collector/app/metrics_test.go | 36 ++++++------- cmd/collector/app/span_processor.go | 13 +++-- cmd/collector/app/span_processor_test.go | 10 ++-- cmd/collector/app/tchannel_handler.go | 4 +- cmd/collector/app/thrift_span_handler.go | 7 +++ cmd/collector/app/thrift_span_handler_test.go | 29 ++++++++++ cmd/collector/app/zipkin/http_handler.go | 2 +- 10 files changed, 96 insertions(+), 63 deletions(-) diff --git a/cmd/collector/app/grpc_handler.go b/cmd/collector/app/grpc_handler.go index b3496bc2a18..1726a320307 100644 --- a/cmd/collector/app/grpc_handler.go +++ b/cmd/collector/app/grpc_handler.go @@ -44,7 +44,7 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) } } _, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, ProcessSpansOptions{ - InboundTransport: "grpc", // TODO do we have a constant? + InboundTransport: GrpcEndpoint, SpanFormat: JaegerFormatType, }) if err != nil { diff --git a/cmd/collector/app/http_handler.go b/cmd/collector/app/http_handler.go index a12527d7b82..ce35f8df4d6 100644 --- a/cmd/collector/app/http_handler.go +++ b/cmd/collector/app/http_handler.go @@ -85,7 +85,7 @@ func (aH *APIHandler) saveSpan(w http.ResponseWriter, r *http.Request) { return } batches := []*tJaeger.Batch{batch} - opts := SubmitBatchOptions{InboundTransport: "http"} // TODO do we have a constant? + opts := SubmitBatchOptions{InboundTransport: HTTPEndpoint} if _, err = aH.jaegerBatchesHandler.SubmitBatches(batches, opts); err != nil { http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError) return diff --git a/cmd/collector/app/metrics.go b/cmd/collector/app/metrics.go index 2677a004437..585b2ee78b0 100644 --- a/cmd/collector/app/metrics.go +++ b/cmd/collector/app/metrics.go @@ -45,11 +45,10 @@ type SpanProcessorMetrics struct { // QueueLength measures the size of the internal span queue QueueLength metrics.Gauge // SavedOkBySvc contains span and trace counts by service - SavedOkBySvc metricsBySvc // spans actually saved - SavedErrBySvc metricsBySvc // spans failed to save - serviceNames metrics.Gauge // total number of unique service name metrics reported by this collector - spanCounts map[string]CountsBySpanType - countsByEndpoints map[string]metrics.Counter // count of spans processed from different endpoints (Http, TChannel, gRPC) + SavedOkBySvc metricsBySvc // spans actually saved + SavedErrBySvc metricsBySvc // spans failed to save + serviceNames metrics.Gauge // total number of unique service name metrics reported by this collector + spanCounts map[string]CountsBySpanType } type countsBySvc struct { @@ -84,21 +83,16 @@ func NewSpanProcessorMetrics(serviceMetrics metrics.Factory, hostMetrics metrics for _, otherFormatType := range otherFormatTypes { spanCounts[otherFormatType] = newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": otherFormatType}})) } - countsByEndpoints := map[string]metrics.Counter{ - GrpcEndpoint: serviceMetrics.Counter(metrics.Options{Name: "grpc-endpoint", Tags: nil}), - HTTPEndpoint: serviceMetrics.Counter(metrics.Options{Name: "http-endpoint", Tags: nil}), - } m := &SpanProcessorMetrics{ - SaveLatency: hostMetrics.Timer(metrics.TimerOptions{Name: "save-latency", Tags: nil}), - InQueueLatency: hostMetrics.Timer(metrics.TimerOptions{Name: "in-queue-latency", Tags: nil}), - SpansDropped: hostMetrics.Counter(metrics.Options{Name: "spans.dropped", Tags: nil}), - BatchSize: hostMetrics.Gauge(metrics.Options{Name: "batch-size", Tags: nil}), - QueueLength: hostMetrics.Gauge(metrics.Options{Name: "queue-length", Tags: nil}), - SavedOkBySvc: newMetricsBySvc(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"result": "ok"}}), "saved-by-svc"), - SavedErrBySvc: newMetricsBySvc(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"result": "err"}}), "saved-by-svc"), - spanCounts: spanCounts, - serviceNames: hostMetrics.Gauge(metrics.Options{Name: "spans.serviceNames", Tags: nil}), - countsByEndpoints: countsByEndpoints, + SaveLatency: hostMetrics.Timer(metrics.TimerOptions{Name: "save-latency", Tags: nil}), + InQueueLatency: hostMetrics.Timer(metrics.TimerOptions{Name: "in-queue-latency", Tags: nil}), + SpansDropped: hostMetrics.Counter(metrics.Options{Name: "spans.dropped", Tags: nil}), + BatchSize: hostMetrics.Gauge(metrics.Options{Name: "batch-size", Tags: nil}), + QueueLength: hostMetrics.Gauge(metrics.Options{Name: "queue-length", Tags: nil}), + SavedOkBySvc: newMetricsBySvc(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"result": "ok"}}), "saved-by-svc"), + SavedErrBySvc: newMetricsBySvc(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"result": "err"}}), "saved-by-svc"), + spanCounts: spanCounts, + serviceNames: hostMetrics.Gauge(metrics.Options{Name: "spans.serviceNames", Tags: nil}), } return m @@ -146,26 +140,26 @@ func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat string) CountsBySpa // reportServiceNameForSpan determines the name of the service that emitted // the span and reports a counter stat. -func (m metricsBySvc) ReportServiceNameForSpan(span *model.Span) { +func (m metricsBySvc) ReportServiceNameForSpan(span *model.Span, endpoint string) { serviceName := span.Process.ServiceName if serviceName == "" { return } - m.countSpansByServiceName(serviceName, span.Flags.IsDebug()) + m.countSpansByServiceName(serviceName, span.Flags.IsDebug(), endpoint) if span.ParentSpanID() == 0 { - m.countTracesByServiceName(serviceName, span.Flags.IsDebug()) + m.countTracesByServiceName(serviceName, span.Flags.IsDebug(), endpoint) } } // countSpansByServiceName counts how many spans are received per service. -func (m metricsBySvc) countSpansByServiceName(serviceName string, isDebug bool) { - m.spans.countByServiceName(serviceName, isDebug) +func (m metricsBySvc) countSpansByServiceName(serviceName string, isDebug bool, endpoint string) { + m.spans.countByServiceName(serviceName, isDebug, endpoint) } // countTracesByServiceName counts how many traces are received per service, // i.e. the counter is only incremented for the root spans. -func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool) { - m.traces.countByServiceName(serviceName, isDebug) +func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool, endpoint string) { + m.traces.countByServiceName(serviceName, isDebug, endpoint) } // countByServiceName maintains a map of counters for each service name it's @@ -178,7 +172,7 @@ func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool) // total number of stored counters, so if it exceeds say the 90% threshold // an alert should be raised to investigate what's causing so many unique // service names. -func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool) { +func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool, endpointType string) { serviceName = NormalizeServiceName(serviceName) counts := m.counts if isDebug { @@ -193,7 +187,11 @@ func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool) { if isDebug { debugStr = "true" } - c := m.factory.Counter(metrics.Options{Name: m.category, Tags: map[string]string{"svc": serviceName, "debug": debugStr}}) + tags := map[string]string{"svc": serviceName, "debug": debugStr} + if endpointType != "" { + tags["transport"] = endpointType + } + c := m.factory.Counter(metrics.Options{Name: m.category, Tags: tags}) counts[serviceName] = c counter = c } else { diff --git a/cmd/collector/app/metrics_test.go b/cmd/collector/app/metrics_test.go index 1b3ac468e5c..da45e58b77c 100644 --- a/cmd/collector/app/metrics_test.go +++ b/cmd/collector/app/metrics_test.go @@ -36,23 +36,23 @@ func TestProcessorMetrics(t *testing.T) { assert.NotNil(t, jFormat) jFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{ Process: &model.Process{}, - }) + }, "HTTP") mSpan := model.Span{ Process: &model.Process{ ServiceName: "fry", }, } - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) + jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, "HTTP") mSpan.Flags.SetDebug() - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) + jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, "HTTP") mSpan.ReplaceParentID(1234) - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) + jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, "HTTP") counters, gauges := baseMetrics.Backend.Snapshot() - assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry"]) - assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry"]) - assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry"]) - assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry"]) + assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport=HTTP"]) + assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry|transport=HTTP"]) + assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry|transport=HTTP"]) + assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry|transport=HTTP"]) assert.Empty(t, gauges) } @@ -60,23 +60,23 @@ func TestNewCountsBySvc(t *testing.T) { baseMetrics := metricstest.NewFactory(time.Hour) metrics := newCountsBySvc(baseMetrics, "not_on_my_level", 3) - metrics.countByServiceName("fry", false) - metrics.countByServiceName("leela", false) - metrics.countByServiceName("bender", false) - metrics.countByServiceName("zoidberg", false) + metrics.countByServiceName("fry", false, "") + metrics.countByServiceName("leela", false, "") + metrics.countByServiceName("bender", false, "") + metrics.countByServiceName("zoidberg", false, "") counters, _ := baseMetrics.Backend.Snapshot() assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=fry"]) assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=leela"]) assert.EqualValues(t, 2, counters["not_on_my_level|debug=false|svc=other-services"]) - metrics.countByServiceName("zoidberg", true) - metrics.countByServiceName("bender", true) - metrics.countByServiceName("leela", true) - metrics.countByServiceName("fry", true) + metrics.countByServiceName("zoidberg", true, "gRPC") + metrics.countByServiceName("bender", true, "gRPC") + metrics.countByServiceName("leela", true, "gRPC") + metrics.countByServiceName("fry", true, "gRPC") counters, _ = baseMetrics.Backend.Snapshot() - assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=zoidberg"]) - assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=bender"]) + assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=zoidberg|transport=gRPC"]) + assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=bender|transport=gRPC"]) assert.EqualValues(t, 2, counters["not_on_my_level|debug=true|svc=other-services"]) } diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index 9b41ce97458..a389ecc4075 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -112,11 +112,11 @@ func (sp *spanProcessor) saveSpan(span *model.Span) { startTime := time.Now() if err := sp.spanWriter.WriteSpan(span); err != nil { sp.logger.Error("Failed to save span", zap.Error(err)) - sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span) + sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span, "") } else { sp.logger.Debug("Span written to the storage by the collector", zap.Stringer("trace-id", span.TraceID), zap.Stringer("span-id", span.SpanID)) - sp.metrics.SavedOkBySvc.ReportServiceNameForSpan(span) + sp.metrics.SavedOkBySvc.ReportServiceNameForSpan(span, "") } sp.metrics.SaveLatency.Record(time.Since(startTime)) } @@ -124,10 +124,9 @@ func (sp *spanProcessor) saveSpan(span *model.Span) { func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options ProcessSpansOptions) ([]bool, error) { sp.preProcessSpans(mSpans) sp.metrics.BatchSize.Update(int64(len(mSpans))) - sp.metrics.countsByEndpoints[options.InboundTransport].Inc(int64(len(mSpans))) retMe := make([]bool, len(mSpans)) for i, mSpan := range mSpans { - ok := sp.enqueueSpan(mSpan, options.SpanFormat) + ok := sp.enqueueSpan(mSpan, options.SpanFormat, options.InboundTransport) if !ok && sp.reportBusy { return nil, tchannel.ErrServerBusy } @@ -141,12 +140,12 @@ func (sp *spanProcessor) processItemFromQueue(item *queueItem) { sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime)) } -func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat string) bool { +func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat, endpointType string) bool { spanCounts := sp.metrics.GetCountsForFormat(originalFormat) - spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span) + spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span, endpointType) if !sp.filterSpan(span) { - spanCounts.RejectedBySvc.ReportServiceNameForSpan(span) + spanCounts.RejectedBySvc.ReportServiceNameForSpan(span, endpointType) return true // as in "not dropped", because it's actively rejected } item := &queueItem{ diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index 3a4185b6c6c..20f4dc9e071 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -109,21 +109,21 @@ func TestBySvcMetrics(t *testing.T) { expected := []metricstest.ExpectedMetric{} if test.debug { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName, Value: 2, + Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=TChannel", Value: 2, }) } else { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName, Value: 2, + Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=TChannel", Value: 2, }) } if test.rootSpan { if test.debug { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName, Value: 2, + Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=TChannel", Value: 2, }) } else { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName, Value: 2, + Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=TChannel", Value: 2, }) } } @@ -137,7 +137,7 @@ func TestBySvcMetrics(t *testing.T) { }) } else { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName, Value: 2, + Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=TChannel", Value: 2, }) } mb.AssertCounterMetrics(t, expected...) diff --git a/cmd/collector/app/tchannel_handler.go b/cmd/collector/app/tchannel_handler.go index 6a9ca42d2b5..af8a2f542a5 100644 --- a/cmd/collector/app/tchannel_handler.go +++ b/cmd/collector/app/tchannel_handler.go @@ -43,7 +43,7 @@ func (h *TChannelHandler) SubmitZipkinBatch( _ thrift.Context, spans []*zipkincore.Span, ) ([]*zipkincore.Response, error) { - return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{InboundTransport: "tchannel"}) + return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{InboundTransport: tchannelEndpoint}) } // SubmitBatches implements jaeger.TChanCollector. @@ -51,5 +51,5 @@ func (h *TChannelHandler) SubmitBatches( _ thrift.Context, batches []*jaeger.Batch, ) ([]*jaeger.BatchSubmitResponse, error) { - return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{InboundTransport: "tchannel"}) + return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{InboundTransport: tchannelEndpoint}) } diff --git a/cmd/collector/app/thrift_span_handler.go b/cmd/collector/app/thrift_span_handler.go index 1d44359d9e3..81b8eb0a6a2 100644 --- a/cmd/collector/app/thrift_span_handler.go +++ b/cmd/collector/app/thrift_span_handler.go @@ -32,6 +32,13 @@ const ( ZipkinFormatType = "zipkin" // UnknownFormatType is for spans that do not have a widely defined/well-known format type UnknownFormatType = "unknown" + + // TChannelEndpoint is for TChannel endpoint + tchannelEndpoint = "TChannel" + // HTTPEndpoint is for HTTP endpoint + httpEndpoint = "HTTP" + // GRPCEndpoint is for gRPC endpoint + grpcEndpoint = "GRPC" ) // SubmitBatchOptions are passed to Submit methods of the handlers. diff --git a/cmd/collector/app/thrift_span_handler_test.go b/cmd/collector/app/thrift_span_handler_test.go index 99bdebde60f..3dfc48c2710 100644 --- a/cmd/collector/app/thrift_span_handler_test.go +++ b/cmd/collector/app/thrift_span_handler_test.go @@ -55,6 +55,21 @@ func TestJaegerSpanHandler(t *testing.T) { assert.NoError(t, err) assert.True(t, res[0].Ok) } + + res, err = h.SubmitHTTPBatches(ctx, []*jaeger.Batch{ + { + Process: &jaeger.Process{ServiceName: "someServiceName"}, + Spans: []*jaeger.Span{{SpanId: 21345}}, + }, + }) + if tc.expectedErr != nil { + assert.Nil(t, res) + assert.Equal(t, tc.expectedErr, err) + } else { + assert.Len(t, res, 1) + assert.NoError(t, err) + assert.True(t, res[0].Ok) + } } } @@ -102,5 +117,19 @@ func TestZipkinSpanHandler(t *testing.T) { assert.NoError(t, err) assert.True(t, res[0].Ok) } + + res, err = h.SubmitHTTPZipkinBatch(ctx, []*zipkincore.Span{ + { + ID: 12345, + }, + }) + if tc.expectedErr != nil { + assert.Nil(t, res) + assert.Equal(t, tc.expectedErr, err) + } else { + assert.Len(t, res, 1) + assert.NoError(t, err) + assert.True(t, res[0].Ok) + } } } diff --git a/cmd/collector/app/zipkin/http_handler.go b/cmd/collector/app/zipkin/http_handler.go index 8085c7ee06c..1628523ea2b 100644 --- a/cmd/collector/app/zipkin/http_handler.go +++ b/cmd/collector/app/zipkin/http_handler.go @@ -173,7 +173,7 @@ func gunzip(r io.ReadCloser) (*gzip.Reader, error) { func (aH *APIHandler) saveThriftSpans(tSpans []*zipkincore.Span) error { if len(tSpans) > 0 { - opts := app.SubmitBatchOptions{InboundTransport: "http"} // TODO do we have a constant? + opts := app.SubmitBatchOptions{InboundTransport: app.HTTPEndpoint} if _, err := aH.zipkinSpansHandler.SubmitZipkinBatch(tSpans, opts); err != nil { return err } From 5cd7c1b1f3edd3193a21fcbecc83b2ecfe6a50df Mon Sep 17 00:00:00 2001 From: Jude Wang Date: Mon, 1 Apr 2019 16:12:27 -0400 Subject: [PATCH 04/13] minor update Signed-off-by: Jude Wang --- cmd/collector/app/metrics_test.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/cmd/collector/app/metrics_test.go b/cmd/collector/app/metrics_test.go index da45e58b77c..f807649da67 100644 --- a/cmd/collector/app/metrics_test.go +++ b/cmd/collector/app/metrics_test.go @@ -42,17 +42,17 @@ func TestProcessorMetrics(t *testing.T) { ServiceName: "fry", }, } - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, "HTTP") + jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, tchannelEndpoint) mSpan.Flags.SetDebug() - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, "HTTP") + jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, tchannelEndpoint) mSpan.ReplaceParentID(1234) - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, "HTTP") + jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, tchannelEndpoint) counters, gauges := baseMetrics.Backend.Snapshot() - assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport=HTTP"]) - assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry|transport=HTTP"]) - assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry|transport=HTTP"]) - assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry|transport=HTTP"]) + assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport="+tchannelEndpoint]) + assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry|transport="+tchannelEndpoint]) + assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry|transport="+tchannelEndpoint]) + assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry|transport="+tchannelEndpoint]) assert.Empty(t, gauges) } @@ -70,13 +70,13 @@ func TestNewCountsBySvc(t *testing.T) { assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=leela"]) assert.EqualValues(t, 2, counters["not_on_my_level|debug=false|svc=other-services"]) - metrics.countByServiceName("zoidberg", true, "gRPC") - metrics.countByServiceName("bender", true, "gRPC") - metrics.countByServiceName("leela", true, "gRPC") - metrics.countByServiceName("fry", true, "gRPC") + metrics.countByServiceName("zoidberg", true, grpcEndpoint) + metrics.countByServiceName("bender", true, grpcEndpoint) + metrics.countByServiceName("leela", true, grpcEndpoint) + metrics.countByServiceName("fry", true, grpcEndpoint) counters, _ = baseMetrics.Backend.Snapshot() - assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=zoidberg|transport=gRPC"]) - assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=bender|transport=gRPC"]) + assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=zoidberg|transport="+grpcEndpoint]) + assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=bender|transport="+grpcEndpoint]) assert.EqualValues(t, 2, counters["not_on_my_level|debug=true|svc=other-services"]) } From 8cd79cd4c416be8f1e2bd86fb6ce19632a17416b Mon Sep 17 00:00:00 2001 From: Jude Wang Date: Tue, 2 Apr 2019 12:34:12 -0400 Subject: [PATCH 05/13] replace string Signed-off-by: Jude Wang --- cmd/collector/app/metrics_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/collector/app/metrics_test.go b/cmd/collector/app/metrics_test.go index f807649da67..80b68844135 100644 --- a/cmd/collector/app/metrics_test.go +++ b/cmd/collector/app/metrics_test.go @@ -36,7 +36,7 @@ func TestProcessorMetrics(t *testing.T) { assert.NotNil(t, jFormat) jFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{ Process: &model.Process{}, - }, "HTTP") + }, tchannelEndpoint) mSpan := model.Span{ Process: &model.Process{ ServiceName: "fry", From abd55e5ce56b9977abd5797519abee14d244a961 Mon Sep 17 00:00:00 2001 From: Jude Wang Date: Wed, 3 Apr 2019 10:02:37 -0400 Subject: [PATCH 06/13] default transport type is undefined Signed-off-by: Jude Wang --- cmd/collector/app/metrics.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/cmd/collector/app/metrics.go b/cmd/collector/app/metrics.go index 585b2ee78b0..5becbd455b2 100644 --- a/cmd/collector/app/metrics.go +++ b/cmd/collector/app/metrics.go @@ -23,12 +23,9 @@ import ( ) const ( - maxServiceNames = 2000 - otherServices = "other-services" - // GrpcEndpoint is the key name for gRPC endpoint metric - GrpcEndpoint = "gRPC" - // HTTPEndpoint is the key name for HTTP endpoint metric - HTTPEndpoint = "HTTP" + maxServiceNames = 2000 + otherServices = "other-services" + defaultTransportType = "undefined" ) // SpanProcessorMetrics contains all the necessary metrics for the SpanProcessor @@ -187,7 +184,7 @@ func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool, endpo if isDebug { debugStr = "true" } - tags := map[string]string{"svc": serviceName, "debug": debugStr} + tags := map[string]string{"svc": serviceName, "debug": debugStr, "transport": defaultTransportType} if endpointType != "" { tags["transport"] = endpointType } From 2eb7f0a5474e8bb9c749f6c4fbb3d24c150bc4e1 Mon Sep 17 00:00:00 2001 From: Jude Wang Date: Thu, 4 Apr 2019 13:41:12 -0400 Subject: [PATCH 07/13] fix merge conflict and test Signed-off-by: Jude Wang --- cmd/collector/app/grpc_handler.go | 7 +++- cmd/collector/app/http_handler.go | 2 + cmd/collector/app/metrics.go | 40 ++++++++++++++----- cmd/collector/app/metrics_test.go | 39 +++++++++--------- cmd/collector/app/span_processor_test.go | 12 +++--- cmd/collector/app/tchannel_handler.go | 9 ++++- cmd/collector/app/thrift_span_handler.go | 7 ---- cmd/collector/app/thrift_span_handler_test.go | 8 ++-- 8 files changed, 76 insertions(+), 48 deletions(-) diff --git a/cmd/collector/app/grpc_handler.go b/cmd/collector/app/grpc_handler.go index 1726a320307..d4b001f7fe1 100644 --- a/cmd/collector/app/grpc_handler.go +++ b/cmd/collector/app/grpc_handler.go @@ -22,6 +22,11 @@ import ( "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) +const ( + // GRPCEndpoint is for gRPC endpoint + GRPCEndpoint = "GRPC" +) + // GRPCHandler implements gRPC CollectorService. type GRPCHandler struct { logger *zap.Logger @@ -44,7 +49,7 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) } } _, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, ProcessSpansOptions{ - InboundTransport: GrpcEndpoint, + InboundTransport: GRPCEndpoint, SpanFormat: JaegerFormatType, }) if err != nil { diff --git a/cmd/collector/app/http_handler.go b/cmd/collector/app/http_handler.go index ce35f8df4d6..3a93cd7f65a 100644 --- a/cmd/collector/app/http_handler.go +++ b/cmd/collector/app/http_handler.go @@ -29,6 +29,8 @@ import ( const ( // UnableToReadBodyErrFormat is an error message for invalid requests UnableToReadBodyErrFormat = "Unable to process request body: %v" + // HTTPEndpoint is for HTTP endpoint + HTTPEndpoint = "HTTP" ) var ( diff --git a/cmd/collector/app/metrics.go b/cmd/collector/app/metrics.go index 5becbd455b2..08c813c69a6 100644 --- a/cmd/collector/app/metrics.go +++ b/cmd/collector/app/metrics.go @@ -23,9 +23,13 @@ import ( ) const ( - maxServiceNames = 2000 - otherServices = "other-services" - defaultTransportType = "undefined" + maxServiceNames = 2000 + defaultTransportType = "Undefined" + otherServices = "other-services" + otherServicesViaHTTP = "other-services-via-http" + otherServicesViaTChannel = "other-services-via-tchannel" + otherServicesViaGRPC = "other-services-via-grpc" + otherServicesViaDefault = "other-services-via-undefined" ) // SpanProcessorMetrics contains all the necessary metrics for the SpanProcessor @@ -105,13 +109,11 @@ func newMetricsBySvc(factory metrics.Factory, category string) metricsBySvc { } func newCountsBySvc(factory metrics.Factory, category string, maxServiceNames int) countsBySvc { + // Add 3 to maxServiceNames threshold to compensate for extra slots taken by transport types + maxServiceNames = maxServiceNames + 3 return countsBySvc{ - counts: map[string]metrics.Counter{ - otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "false"}}), - }, - debugCounts: map[string]metrics.Counter{ - otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "true"}}), - }, + counts: newCountsByTransport(factory, category, "false"), + debugCounts: newCountsByTransport(factory, category, "true"), factory: factory, lock: &sync.Mutex{}, maxServiceNames: maxServiceNames, @@ -119,6 +121,15 @@ func newCountsBySvc(factory metrics.Factory, category string, maxServiceNames in } } +func newCountsByTransport(factory metrics.Factory, category string, debugFlag string) map[string]metrics.Counter { + return map[string]metrics.Counter{ + otherServicesViaHTTP: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": HTTPEndpoint}}), + otherServicesViaTChannel: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": TChannelEndpoint}}), + otherServicesViaGRPC: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": GRPCEndpoint}}), + otherServicesViaDefault: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": defaultTransportType}}), + } +} + func newCountsBySpanType(factory metrics.Factory) CountsBySpanType { return CountsBySpanType{ RejectedBySvc: newMetricsBySvc(factory, "rejected"), @@ -192,7 +203,16 @@ func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool, endpo counts[serviceName] = c counter = c } else { - counter = counts[otherServices] + switch endpointType { + case HTTPEndpoint: + counter = counts[otherServicesViaHTTP] + case TChannelEndpoint: + counter = counts[otherServicesViaTChannel] + case GRPCEndpoint: + counter = counts[otherServicesViaGRPC] + default: + counter = counts[otherServicesViaDefault] + } } m.lock.Unlock() counter.Inc(1) diff --git a/cmd/collector/app/metrics_test.go b/cmd/collector/app/metrics_test.go index 80b68844135..3e5783b92ec 100644 --- a/cmd/collector/app/metrics_test.go +++ b/cmd/collector/app/metrics_test.go @@ -36,23 +36,23 @@ func TestProcessorMetrics(t *testing.T) { assert.NotNil(t, jFormat) jFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{ Process: &model.Process{}, - }, tchannelEndpoint) + }, TChannelEndpoint) mSpan := model.Span{ Process: &model.Process{ ServiceName: "fry", }, } - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, tchannelEndpoint) + jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, TChannelEndpoint) mSpan.Flags.SetDebug() - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, tchannelEndpoint) + jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, TChannelEndpoint) mSpan.ReplaceParentID(1234) - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, tchannelEndpoint) + jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, TChannelEndpoint) counters, gauges := baseMetrics.Backend.Snapshot() - assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport="+tchannelEndpoint]) - assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry|transport="+tchannelEndpoint]) - assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry|transport="+tchannelEndpoint]) - assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry|transport="+tchannelEndpoint]) + assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport="+TChannelEndpoint]) + assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry|transport="+TChannelEndpoint]) + assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry|transport="+TChannelEndpoint]) + assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry|transport="+TChannelEndpoint]) assert.Empty(t, gauges) } @@ -64,19 +64,22 @@ func TestNewCountsBySvc(t *testing.T) { metrics.countByServiceName("leela", false, "") metrics.countByServiceName("bender", false, "") metrics.countByServiceName("zoidberg", false, "") + metrics.countByServiceName("zoidberg2", false, TChannelEndpoint) + metrics.countByServiceName("zoidberg3", false, HTTPEndpoint) counters, _ := baseMetrics.Backend.Snapshot() - assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=fry"]) - assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=leela"]) - assert.EqualValues(t, 2, counters["not_on_my_level|debug=false|svc=other-services"]) + assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=fry|transport="+defaultTransportType]) + assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=leela|transport="+defaultTransportType]) + assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=other-services|transport="+TChannelEndpoint]) + assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=other-services|transport="+HTTPEndpoint]) - metrics.countByServiceName("zoidberg", true, grpcEndpoint) - metrics.countByServiceName("bender", true, grpcEndpoint) - metrics.countByServiceName("leela", true, grpcEndpoint) - metrics.countByServiceName("fry", true, grpcEndpoint) + metrics.countByServiceName("zoidberg", true, GRPCEndpoint) + metrics.countByServiceName("bender", true, GRPCEndpoint) + metrics.countByServiceName("leela", true, GRPCEndpoint) + metrics.countByServiceName("fry", true, GRPCEndpoint) counters, _ = baseMetrics.Backend.Snapshot() - assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=zoidberg|transport="+grpcEndpoint]) - assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=bender|transport="+grpcEndpoint]) - assert.EqualValues(t, 2, counters["not_on_my_level|debug=true|svc=other-services"]) + assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=zoidberg|transport="+GRPCEndpoint]) + assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=bender|transport="+GRPCEndpoint]) + assert.EqualValues(t, 2, counters["not_on_my_level|debug=true|svc=other-services|transport="+GRPCEndpoint]) } diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index 20f4dc9e071..ae7adc53717 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -109,21 +109,21 @@ func TestBySvcMetrics(t *testing.T) { expected := []metricstest.ExpectedMetric{} if test.debug { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=TChannel", Value: 2, + Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2, }) } else { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=TChannel", Value: 2, + Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2, }) } if test.rootSpan { if test.debug { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=TChannel", Value: 2, + Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2, }) } else { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=TChannel", Value: 2, + Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2, }) } } @@ -137,7 +137,7 @@ func TestBySvcMetrics(t *testing.T) { }) } else { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=TChannel", Value: 2, + Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2, }) } mb.AssertCounterMetrics(t, expected...) @@ -249,7 +249,7 @@ func TestSpanProcessorErrors(t *testing.T) { }, logBuf.JSONLine(0)) expected := []metricstest.ExpectedMetric{{ - Name: "service.spans.saved-by-svc|debug=false|result=err|svc=x", Value: 1, + Name: "service.spans.saved-by-svc|debug=false|result=err|svc=x|transport=" + defaultTransportType, Value: 1, }} mb.AssertCounterMetrics(t, expected...) } diff --git a/cmd/collector/app/tchannel_handler.go b/cmd/collector/app/tchannel_handler.go index af8a2f542a5..2b3787739b4 100644 --- a/cmd/collector/app/tchannel_handler.go +++ b/cmd/collector/app/tchannel_handler.go @@ -21,6 +21,11 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) +const ( + // TChannelEndpoint is for TChannel endpoint + TChannelEndpoint = "TChannel" +) + // TChannelHandler implements jaeger.TChanCollector and zipkincore.TChanZipkinCollector. type TChannelHandler struct { jaegerHandler JaegerBatchesHandler @@ -43,7 +48,7 @@ func (h *TChannelHandler) SubmitZipkinBatch( _ thrift.Context, spans []*zipkincore.Span, ) ([]*zipkincore.Response, error) { - return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{InboundTransport: tchannelEndpoint}) + return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{InboundTransport: TChannelEndpoint}) } // SubmitBatches implements jaeger.TChanCollector. @@ -51,5 +56,5 @@ func (h *TChannelHandler) SubmitBatches( _ thrift.Context, batches []*jaeger.Batch, ) ([]*jaeger.BatchSubmitResponse, error) { - return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{InboundTransport: tchannelEndpoint}) + return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{InboundTransport: TChannelEndpoint}) } diff --git a/cmd/collector/app/thrift_span_handler.go b/cmd/collector/app/thrift_span_handler.go index 81b8eb0a6a2..1d44359d9e3 100644 --- a/cmd/collector/app/thrift_span_handler.go +++ b/cmd/collector/app/thrift_span_handler.go @@ -32,13 +32,6 @@ const ( ZipkinFormatType = "zipkin" // UnknownFormatType is for spans that do not have a widely defined/well-known format type UnknownFormatType = "unknown" - - // TChannelEndpoint is for TChannel endpoint - tchannelEndpoint = "TChannel" - // HTTPEndpoint is for HTTP endpoint - httpEndpoint = "HTTP" - // GRPCEndpoint is for gRPC endpoint - grpcEndpoint = "GRPC" ) // SubmitBatchOptions are passed to Submit methods of the handlers. diff --git a/cmd/collector/app/thrift_span_handler_test.go b/cmd/collector/app/thrift_span_handler_test.go index 3dfc48c2710..5857d942a1c 100644 --- a/cmd/collector/app/thrift_span_handler_test.go +++ b/cmd/collector/app/thrift_span_handler_test.go @@ -56,12 +56,12 @@ func TestJaegerSpanHandler(t *testing.T) { assert.True(t, res[0].Ok) } - res, err = h.SubmitHTTPBatches(ctx, []*jaeger.Batch{ + res, err = h.SubmitBatches([]*jaeger.Batch{ { Process: &jaeger.Process{ServiceName: "someServiceName"}, Spans: []*jaeger.Span{{SpanId: 21345}}, }, - }) + }, SubmitBatchOptions{}) if tc.expectedErr != nil { assert.Nil(t, res) assert.Equal(t, tc.expectedErr, err) @@ -118,11 +118,11 @@ func TestZipkinSpanHandler(t *testing.T) { assert.True(t, res[0].Ok) } - res, err = h.SubmitHTTPZipkinBatch(ctx, []*zipkincore.Span{ + res, err = h.SubmitZipkinBatch([]*zipkincore.Span{ { ID: 12345, }, - }) + }, SubmitBatchOptions{}) if tc.expectedErr != nil { assert.Nil(t, res) assert.Equal(t, tc.expectedErr, err) From f40afbb0701c359b862390439b3b550eeeccae01 Mon Sep 17 00:00:00 2001 From: Jude Wang Date: Fri, 5 Apr 2019 13:55:24 -0400 Subject: [PATCH 08/13] refactor Signed-off-by: Jude Wang --- cmd/collector/app/grpc_handler.go | 5 -- cmd/collector/app/http_handler.go | 2 - cmd/collector/app/metrics.go | 93 +++++++++++--------- cmd/collector/app/metrics_test.go | 52 +++++------ cmd/collector/app/span_processor.go | 10 +-- cmd/collector/app/span_processor_test.go | 12 +-- cmd/collector/app/tchannel_handler.go | 5 -- cmd/collector/app/transport_handler_const.go | 12 +++ 8 files changed, 98 insertions(+), 93 deletions(-) create mode 100644 cmd/collector/app/transport_handler_const.go diff --git a/cmd/collector/app/grpc_handler.go b/cmd/collector/app/grpc_handler.go index d4b001f7fe1..bd2693ce1b9 100644 --- a/cmd/collector/app/grpc_handler.go +++ b/cmd/collector/app/grpc_handler.go @@ -22,11 +22,6 @@ import ( "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) -const ( - // GRPCEndpoint is for gRPC endpoint - GRPCEndpoint = "GRPC" -) - // GRPCHandler implements gRPC CollectorService. type GRPCHandler struct { logger *zap.Logger diff --git a/cmd/collector/app/http_handler.go b/cmd/collector/app/http_handler.go index 3a93cd7f65a..ce35f8df4d6 100644 --- a/cmd/collector/app/http_handler.go +++ b/cmd/collector/app/http_handler.go @@ -29,8 +29,6 @@ import ( const ( // UnableToReadBodyErrFormat is an error message for invalid requests UnableToReadBodyErrFormat = "Unable to process request body: %v" - // HTTPEndpoint is for HTTP endpoint - HTTPEndpoint = "HTTP" ) var ( diff --git a/cmd/collector/app/metrics.go b/cmd/collector/app/metrics.go index 08c813c69a6..305aff2fbdc 100644 --- a/cmd/collector/app/metrics.go +++ b/cmd/collector/app/metrics.go @@ -23,13 +23,9 @@ import ( ) const ( - maxServiceNames = 2000 - defaultTransportType = "Undefined" - otherServices = "other-services" - otherServicesViaHTTP = "other-services-via-http" - otherServicesViaTChannel = "other-services-via-tchannel" - otherServicesViaGRPC = "other-services-via-grpc" - otherServicesViaDefault = "other-services-via-undefined" + maxServiceNames = 2000 + unknownTransportType = "Undefined" + otherServices = "other-services" ) // SpanProcessorMetrics contains all the necessary metrics for the SpanProcessor @@ -68,6 +64,13 @@ type metricsBySvc struct { // CountsBySpanType measures received, rejected, and receivedByService metrics for a format type type CountsBySpanType struct { + HTTPEndpoint CountsByTransportType + TChannelEndpoint CountsByTransportType + GRPCEndpoint CountsByTransportType + UnknownEndpoint CountsByTransportType +} + +type CountsByTransportType struct { // ReceivedBySvc maintain by-service metrics for a format type ReceivedBySvc metricsBySvc // RejectedBySvc is the number of spans we rejected (usually due to blacklisting) by-service @@ -109,11 +112,13 @@ func newMetricsBySvc(factory metrics.Factory, category string) metricsBySvc { } func newCountsBySvc(factory metrics.Factory, category string, maxServiceNames int) countsBySvc { - // Add 3 to maxServiceNames threshold to compensate for extra slots taken by transport types - maxServiceNames = maxServiceNames + 3 return countsBySvc{ - counts: newCountsByTransport(factory, category, "false"), - debugCounts: newCountsByTransport(factory, category, "true"), + counts: map[string]metrics.Counter{ + otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "false"}}), + }, + debugCounts: map[string]metrics.Counter{ + otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "true"}}), + }, factory: factory, lock: &sync.Mutex{}, maxServiceNames: maxServiceNames, @@ -121,53 +126,65 @@ func newCountsBySvc(factory metrics.Factory, category string, maxServiceNames in } } -func newCountsByTransport(factory metrics.Factory, category string, debugFlag string) map[string]metrics.Counter { - return map[string]metrics.Counter{ - otherServicesViaHTTP: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": HTTPEndpoint}}), - otherServicesViaTChannel: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": TChannelEndpoint}}), - otherServicesViaGRPC: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": GRPCEndpoint}}), - otherServicesViaDefault: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": defaultTransportType}}), +func newCountsBySpanType(factory metrics.Factory) CountsBySpanType { + return CountsBySpanType{ + HTTPEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": HTTPEndpoint}})), + TChannelEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": TChannelEndpoint}})), + GRPCEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": GRPCEndpoint}})), + UnknownEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": unknownTransportType}})), } } -func newCountsBySpanType(factory metrics.Factory) CountsBySpanType { - return CountsBySpanType{ +func newCountsByTransport(factory metrics.Factory) CountsByTransportType { + return CountsByTransportType{ RejectedBySvc: newMetricsBySvc(factory, "rejected"), ReceivedBySvc: newMetricsBySvc(factory, "received"), } } // GetCountsForFormat gets the countsBySpanType for a given format. If none exists, we use the Unknown format. -func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat string) CountsBySpanType { +func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat, endpointType string) CountsByTransportType { c, ok := m.spanCounts[spanFormat] if !ok { - return m.spanCounts[UnknownFormatType] + c = m.spanCounts[UnknownFormatType] + } + + var counter CountsByTransportType + switch endpointType { + case HTTPEndpoint: + counter = c.HTTPEndpoint + case TChannelEndpoint: + counter = c.TChannelEndpoint + case GRPCEndpoint: + counter = c.GRPCEndpoint + default: + counter = c.UnknownEndpoint } - return c + return counter } // reportServiceNameForSpan determines the name of the service that emitted // the span and reports a counter stat. -func (m metricsBySvc) ReportServiceNameForSpan(span *model.Span, endpoint string) { +func (m metricsBySvc) ReportServiceNameForSpan(span *model.Span) { serviceName := span.Process.ServiceName if serviceName == "" { return } - m.countSpansByServiceName(serviceName, span.Flags.IsDebug(), endpoint) + m.countSpansByServiceName(serviceName, span.Flags.IsDebug()) if span.ParentSpanID() == 0 { - m.countTracesByServiceName(serviceName, span.Flags.IsDebug(), endpoint) + m.countTracesByServiceName(serviceName, span.Flags.IsDebug()) } } // countSpansByServiceName counts how many spans are received per service. -func (m metricsBySvc) countSpansByServiceName(serviceName string, isDebug bool, endpoint string) { - m.spans.countByServiceName(serviceName, isDebug, endpoint) +func (m metricsBySvc) countSpansByServiceName(serviceName string, isDebug bool) { + m.spans.countByServiceName(serviceName, isDebug) } // countTracesByServiceName counts how many traces are received per service, // i.e. the counter is only incremented for the root spans. -func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool, endpoint string) { - m.traces.countByServiceName(serviceName, isDebug, endpoint) +func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool) { + m.traces.countByServiceName(serviceName, isDebug) } // countByServiceName maintains a map of counters for each service name it's @@ -180,7 +197,7 @@ func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool, // total number of stored counters, so if it exceeds say the 90% threshold // an alert should be raised to investigate what's causing so many unique // service names. -func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool, endpointType string) { +func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool) { serviceName = NormalizeServiceName(serviceName) counts := m.counts if isDebug { @@ -195,24 +212,12 @@ func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool, endpo if isDebug { debugStr = "true" } - tags := map[string]string{"svc": serviceName, "debug": debugStr, "transport": defaultTransportType} - if endpointType != "" { - tags["transport"] = endpointType - } + tags := map[string]string{"svc": serviceName, "debug": debugStr} c := m.factory.Counter(metrics.Options{Name: m.category, Tags: tags}) counts[serviceName] = c counter = c } else { - switch endpointType { - case HTTPEndpoint: - counter = counts[otherServicesViaHTTP] - case TChannelEndpoint: - counter = counts[otherServicesViaTChannel] - case GRPCEndpoint: - counter = counts[otherServicesViaGRPC] - default: - counter = counts[otherServicesViaDefault] - } + counter = counts[otherServices] } m.lock.Unlock() counter.Inc(1) diff --git a/cmd/collector/app/metrics_test.go b/cmd/collector/app/metrics_test.go index 3e5783b92ec..3d420d1cfc9 100644 --- a/cmd/collector/app/metrics_test.go +++ b/cmd/collector/app/metrics_test.go @@ -30,23 +30,26 @@ func TestProcessorMetrics(t *testing.T) { serviceMetrics := baseMetrics.Namespace(jaegerM.NSOptions{Name: "service", Tags: nil}) hostMetrics := baseMetrics.Namespace(jaegerM.NSOptions{Name: "host", Tags: nil}) spm := NewSpanProcessorMetrics(serviceMetrics, hostMetrics, []string{"scruffy"}) - benderFormatMetrics := spm.GetCountsForFormat("bender") - assert.NotNil(t, benderFormatMetrics) - jFormat := spm.GetCountsForFormat(JaegerFormatType) - assert.NotNil(t, jFormat) - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{ + benderFormatHTTPMetrics := spm.GetCountsForFormat("bender", HTTPEndpoint) + assert.NotNil(t, benderFormatHTTPMetrics) + benderFormatGRPCMetrics := spm.GetCountsForFormat("bender", GRPCEndpoint) + assert.NotNil(t, benderFormatGRPCMetrics) + + jTChannelFormat := spm.GetCountsForFormat(JaegerFormatType, TChannelEndpoint) + assert.NotNil(t, jTChannelFormat) + jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{ Process: &model.Process{}, - }, TChannelEndpoint) + }) mSpan := model.Span{ Process: &model.Process{ ServiceName: "fry", }, } - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, TChannelEndpoint) + jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) mSpan.Flags.SetDebug() - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, TChannelEndpoint) + jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) mSpan.ReplaceParentID(1234) - jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, TChannelEndpoint) + jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) counters, gauges := baseMetrics.Backend.Snapshot() assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport="+TChannelEndpoint]) @@ -60,26 +63,23 @@ func TestNewCountsBySvc(t *testing.T) { baseMetrics := metricstest.NewFactory(time.Hour) metrics := newCountsBySvc(baseMetrics, "not_on_my_level", 3) - metrics.countByServiceName("fry", false, "") - metrics.countByServiceName("leela", false, "") - metrics.countByServiceName("bender", false, "") - metrics.countByServiceName("zoidberg", false, "") - metrics.countByServiceName("zoidberg2", false, TChannelEndpoint) - metrics.countByServiceName("zoidberg3", false, HTTPEndpoint) + metrics.countByServiceName("fry", false) + metrics.countByServiceName("leela", false) + metrics.countByServiceName("bender", false) + metrics.countByServiceName("zoidberg", false) counters, _ := baseMetrics.Backend.Snapshot() - assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=fry|transport="+defaultTransportType]) - assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=leela|transport="+defaultTransportType]) - assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=other-services|transport="+TChannelEndpoint]) - assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=other-services|transport="+HTTPEndpoint]) + assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=fry"]) + assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=leela"]) + assert.EqualValues(t, 2, counters["not_on_my_level|debug=false|svc=other-services"]) - metrics.countByServiceName("zoidberg", true, GRPCEndpoint) - metrics.countByServiceName("bender", true, GRPCEndpoint) - metrics.countByServiceName("leela", true, GRPCEndpoint) - metrics.countByServiceName("fry", true, GRPCEndpoint) + metrics.countByServiceName("zoidberg", true) + metrics.countByServiceName("bender", true) + metrics.countByServiceName("leela", true) + metrics.countByServiceName("fry", true) counters, _ = baseMetrics.Backend.Snapshot() - assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=zoidberg|transport="+GRPCEndpoint]) - assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=bender|transport="+GRPCEndpoint]) - assert.EqualValues(t, 2, counters["not_on_my_level|debug=true|svc=other-services|transport="+GRPCEndpoint]) + assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=zoidberg"]) + assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=bender"]) + assert.EqualValues(t, 2, counters["not_on_my_level|debug=true|svc=other-services"]) } diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index a389ecc4075..0bf8d14335c 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -112,11 +112,11 @@ func (sp *spanProcessor) saveSpan(span *model.Span) { startTime := time.Now() if err := sp.spanWriter.WriteSpan(span); err != nil { sp.logger.Error("Failed to save span", zap.Error(err)) - sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span, "") + sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span) } else { sp.logger.Debug("Span written to the storage by the collector", zap.Stringer("trace-id", span.TraceID), zap.Stringer("span-id", span.SpanID)) - sp.metrics.SavedOkBySvc.ReportServiceNameForSpan(span, "") + sp.metrics.SavedOkBySvc.ReportServiceNameForSpan(span) } sp.metrics.SaveLatency.Record(time.Since(startTime)) } @@ -141,11 +141,11 @@ func (sp *spanProcessor) processItemFromQueue(item *queueItem) { } func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat, endpointType string) bool { - spanCounts := sp.metrics.GetCountsForFormat(originalFormat) - spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span, endpointType) + spanCounts := sp.metrics.GetCountsForFormat(originalFormat, endpointType) + spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span) if !sp.filterSpan(span) { - spanCounts.RejectedBySvc.ReportServiceNameForSpan(span, endpointType) + spanCounts.RejectedBySvc.ReportServiceNameForSpan(span) return true // as in "not dropped", because it's actively rejected } item := &queueItem{ diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index ae7adc53717..1e93f9b3b66 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -109,21 +109,21 @@ func TestBySvcMetrics(t *testing.T) { expected := []metricstest.ExpectedMetric{} if test.debug { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2, + Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=" + unknownTransportType, Value: 2, }) } else { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2, + Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + unknownTransportType, Value: 2, }) } if test.rootSpan { if test.debug { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2, + Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=" + unknownTransportType, Value: 2, }) } else { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2, + Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + unknownTransportType, Value: 2, }) } } @@ -137,7 +137,7 @@ func TestBySvcMetrics(t *testing.T) { }) } else { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2, + Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + unknownTransportType, Value: 2, }) } mb.AssertCounterMetrics(t, expected...) @@ -249,7 +249,7 @@ func TestSpanProcessorErrors(t *testing.T) { }, logBuf.JSONLine(0)) expected := []metricstest.ExpectedMetric{{ - Name: "service.spans.saved-by-svc|debug=false|result=err|svc=x|transport=" + defaultTransportType, Value: 1, + Name: "service.spans.saved-by-svc|debug=false|result=err|svc=x", Value: 1, }} mb.AssertCounterMetrics(t, expected...) } diff --git a/cmd/collector/app/tchannel_handler.go b/cmd/collector/app/tchannel_handler.go index 2b3787739b4..806ff1bb062 100644 --- a/cmd/collector/app/tchannel_handler.go +++ b/cmd/collector/app/tchannel_handler.go @@ -21,11 +21,6 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) -const ( - // TChannelEndpoint is for TChannel endpoint - TChannelEndpoint = "TChannel" -) - // TChannelHandler implements jaeger.TChanCollector and zipkincore.TChanZipkinCollector. type TChannelHandler struct { jaegerHandler JaegerBatchesHandler diff --git a/cmd/collector/app/transport_handler_const.go b/cmd/collector/app/transport_handler_const.go new file mode 100644 index 00000000000..9e93ced7cb8 --- /dev/null +++ b/cmd/collector/app/transport_handler_const.go @@ -0,0 +1,12 @@ +package app + +const ( + // GRPCEndpoint is for gRPC endpoint + GRPCEndpoint = "GRPC" + + // TChannelEndpoint is for TChannel endpoint + TChannelEndpoint = "TChannel" + + // HTTPEndpoint is for HTTP endpoint + HTTPEndpoint = "HTTP" +) From 80396a124f80d3c559e7ed55bd42b01f11ed9d61 Mon Sep 17 00:00:00 2001 From: Jude Wang Date: Fri, 5 Apr 2019 15:14:18 -0400 Subject: [PATCH 09/13] lint & fmt Signed-off-by: Jude Wang --- cmd/collector/app/metrics.go | 3 ++- cmd/collector/app/transport_handler_const.go | 14 ++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/cmd/collector/app/metrics.go b/cmd/collector/app/metrics.go index 305aff2fbdc..caa4918e489 100644 --- a/cmd/collector/app/metrics.go +++ b/cmd/collector/app/metrics.go @@ -62,7 +62,7 @@ type metricsBySvc struct { traces countsBySvc // number of traces originated per service } -// CountsBySpanType measures received, rejected, and receivedByService metrics for a format type +// CountsBySpanType measures metrics by different endpoint types (e.g http, grpc, tchannel) type CountsBySpanType struct { HTTPEndpoint CountsByTransportType TChannelEndpoint CountsByTransportType @@ -70,6 +70,7 @@ type CountsBySpanType struct { UnknownEndpoint CountsByTransportType } +// CountsByTransportType measures received, rejected, and receivedByService metrics for a format type type CountsByTransportType struct { // ReceivedBySvc maintain by-service metrics for a format type ReceivedBySvc metricsBySvc diff --git a/cmd/collector/app/transport_handler_const.go b/cmd/collector/app/transport_handler_const.go index 9e93ced7cb8..6ab3f311c58 100644 --- a/cmd/collector/app/transport_handler_const.go +++ b/cmd/collector/app/transport_handler_const.go @@ -1,3 +1,17 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package app const ( From 5d3d623a08a67cca9d20939b77154a6391e5c2dd Mon Sep 17 00:00:00 2001 From: Jude Wang Date: Fri, 5 Apr 2019 16:08:46 -0400 Subject: [PATCH 10/13] update switch statement to be map due to twice faster benchmark Signed-off-by: Jude Wang --- cmd/collector/app/metrics.go | 27 ++++---------------- cmd/collector/app/transport_handler_const.go | 3 +++ 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/cmd/collector/app/metrics.go b/cmd/collector/app/metrics.go index caa4918e489..67289581f47 100644 --- a/cmd/collector/app/metrics.go +++ b/cmd/collector/app/metrics.go @@ -63,14 +63,9 @@ type metricsBySvc struct { } // CountsBySpanType measures metrics by different endpoint types (e.g http, grpc, tchannel) -type CountsBySpanType struct { - HTTPEndpoint CountsByTransportType - TChannelEndpoint CountsByTransportType - GRPCEndpoint CountsByTransportType - UnknownEndpoint CountsByTransportType -} +type CountsBySpanType map[string]CountsByTransportType -// CountsByTransportType measures received, rejected, and receivedByService metrics for a format type +// CountsByTransportType measures received, rejected metrics for a format type type CountsByTransportType struct { // ReceivedBySvc maintain by-service metrics for a format type ReceivedBySvc metricsBySvc @@ -128,11 +123,11 @@ func newCountsBySvc(factory metrics.Factory, category string, maxServiceNames in } func newCountsBySpanType(factory metrics.Factory) CountsBySpanType { - return CountsBySpanType{ + return map[string]CountsByTransportType{ HTTPEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": HTTPEndpoint}})), TChannelEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": TChannelEndpoint}})), GRPCEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": GRPCEndpoint}})), - UnknownEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": unknownTransportType}})), + EmptyEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": unknownTransportType}})), } } @@ -149,19 +144,7 @@ func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat, endpointType strin if !ok { c = m.spanCounts[UnknownFormatType] } - - var counter CountsByTransportType - switch endpointType { - case HTTPEndpoint: - counter = c.HTTPEndpoint - case TChannelEndpoint: - counter = c.TChannelEndpoint - case GRPCEndpoint: - counter = c.GRPCEndpoint - default: - counter = c.UnknownEndpoint - } - return counter + return c[endpointType] } // reportServiceNameForSpan determines the name of the service that emitted diff --git a/cmd/collector/app/transport_handler_const.go b/cmd/collector/app/transport_handler_const.go index 6ab3f311c58..daab29af68f 100644 --- a/cmd/collector/app/transport_handler_const.go +++ b/cmd/collector/app/transport_handler_const.go @@ -23,4 +23,7 @@ const ( // HTTPEndpoint is for HTTP endpoint HTTPEndpoint = "HTTP" + + // EmptyEndpoint is for unknown endpoint + EmptyEndpoint = "" ) From 5af0d8017e0c183df8564cb223243ea6b4ee2b94 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sat, 6 Apr 2019 00:17:22 -0400 Subject: [PATCH 11/13] Use typed constants, rename types Signed-off-by: Yuri Shkuro --- cmd/collector/app/grpc_handler.go | 4 +- cmd/collector/app/http_handler.go | 2 +- cmd/collector/app/metrics.go | 91 ++++++++++++++------ cmd/collector/app/metrics_test.go | 16 ++-- cmd/collector/app/options.go | 4 +- cmd/collector/app/options_test.go | 2 +- cmd/collector/app/span_processor.go | 8 +- cmd/collector/app/span_processor_test.go | 24 +++--- cmd/collector/app/tchannel_handler.go | 8 +- cmd/collector/app/thrift_span_handler.go | 15 +--- cmd/collector/app/transport_handler_const.go | 29 ------- cmd/collector/app/zipkin/http_handler.go | 2 +- 12 files changed, 104 insertions(+), 101 deletions(-) delete mode 100644 cmd/collector/app/transport_handler_const.go diff --git a/cmd/collector/app/grpc_handler.go b/cmd/collector/app/grpc_handler.go index bd2693ce1b9..c4d05859456 100644 --- a/cmd/collector/app/grpc_handler.go +++ b/cmd/collector/app/grpc_handler.go @@ -44,8 +44,8 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) } } _, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, ProcessSpansOptions{ - InboundTransport: GRPCEndpoint, - SpanFormat: JaegerFormatType, + InboundTransport: GRPCTransport, + SpanFormat: JaegerSpanFormat, }) if err != nil { g.logger.Error("cannot process spans", zap.Error(err)) diff --git a/cmd/collector/app/http_handler.go b/cmd/collector/app/http_handler.go index ce35f8df4d6..9c10dc3308a 100644 --- a/cmd/collector/app/http_handler.go +++ b/cmd/collector/app/http_handler.go @@ -85,7 +85,7 @@ func (aH *APIHandler) saveSpan(w http.ResponseWriter, r *http.Request) { return } batches := []*tJaeger.Batch{batch} - opts := SubmitBatchOptions{InboundTransport: HTTPEndpoint} + opts := SubmitBatchOptions{InboundTransport: HTTPTransport} if _, err = aH.jaegerBatchesHandler.SubmitBatches(batches, opts); err != nil { http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError) return diff --git a/cmd/collector/app/metrics.go b/cmd/collector/app/metrics.go index 67289581f47..2d239f7ba45 100644 --- a/cmd/collector/app/metrics.go +++ b/cmd/collector/app/metrics.go @@ -23,9 +23,8 @@ import ( ) const ( - maxServiceNames = 2000 - unknownTransportType = "Undefined" - otherServices = "other-services" + maxServiceNames = 2000 + otherServices = "other-services" ) // SpanProcessorMetrics contains all the necessary metrics for the SpanProcessor @@ -45,7 +44,7 @@ type SpanProcessorMetrics struct { SavedOkBySvc metricsBySvc // spans actually saved SavedErrBySvc metricsBySvc // spans failed to save serviceNames metrics.Gauge // total number of unique service name metrics reported by this collector - spanCounts map[string]CountsBySpanType + spanCounts SpanCountsByFormat } type countsBySvc struct { @@ -62,26 +61,58 @@ type metricsBySvc struct { traces countsBySvc // number of traces originated per service } -// CountsBySpanType measures metrics by different endpoint types (e.g http, grpc, tchannel) -type CountsBySpanType map[string]CountsByTransportType +// InboundTransport identifies the transport used to receive spans. +type InboundTransport string -// CountsByTransportType measures received, rejected metrics for a format type -type CountsByTransportType struct { - // ReceivedBySvc maintain by-service metrics for a format type +const ( + // GRPCTransport indicates spans received over gRPC + GRPCTransport InboundTransport = "grpc" + // TChannelTransport indicates spans received over TChannel + TChannelTransport InboundTransport = "tchannel" + // HTTPTransport indicates spans received over HTTP + HTTPTransport InboundTransport = "http" + // UnknownTransport is for unknown transport + UnknownTransport InboundTransport = "unknown" +) + +// SpanFormat identifies the data format in which the span was originally received. +type SpanFormat string + +const ( + // JaegerSpanFormat is for Jaeger Thrift spans + JaegerSpanFormat SpanFormat = "jaeger" + // ZipkinSpanFormat is for Zipkin Thrift spans + ZipkinSpanFormat SpanFormat = "zipkin" + // ProtoSpanFormat is for Jaeger protobuf Spans + ProtoSpanFormat SpanFormat = "proto" + // UnknownSpanFormat is for spans that do not have a widely defined/well-known format type + UnknownSpanFormat SpanFormat = "unknown" +) + +// SpanCountsByFormat measures metrics by different span formats (thrift, proto, etc.) +type SpanCountsByFormat map[SpanFormat]SpanCountsByTransport + +// SpanCountsByTransport measures metrics by different endpoint types (e.g http, grpc, tchannel) +type SpanCountsByTransport map[InboundTransport]SpanCounts + +// SpanCounts measures received, rejected metrics +type SpanCounts struct { + // ReceivedBySvc maintain by-service metrics ReceivedBySvc metricsBySvc // RejectedBySvc is the number of spans we rejected (usually due to blacklisting) by-service RejectedBySvc metricsBySvc } // NewSpanProcessorMetrics returns a SpanProcessorMetrics -func NewSpanProcessorMetrics(serviceMetrics metrics.Factory, hostMetrics metrics.Factory, otherFormatTypes []string) *SpanProcessorMetrics { - spanCounts := map[string]CountsBySpanType{ - ZipkinFormatType: newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": ZipkinFormatType}})), - JaegerFormatType: newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": JaegerFormatType}})), - UnknownFormatType: newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": UnknownFormatType}})), +func NewSpanProcessorMetrics(serviceMetrics metrics.Factory, hostMetrics metrics.Factory, otherFormatTypes []SpanFormat) *SpanProcessorMetrics { + spanCounts := SpanCountsByFormat{ + ZipkinSpanFormat: newCountsByTransport(serviceMetrics, ZipkinSpanFormat), + JaegerSpanFormat: newCountsByTransport(serviceMetrics, JaegerSpanFormat), + ProtoSpanFormat: newCountsByTransport(serviceMetrics, ProtoSpanFormat), + UnknownSpanFormat: newCountsByTransport(serviceMetrics, UnknownSpanFormat), } for _, otherFormatType := range otherFormatTypes { - spanCounts[otherFormatType] = newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": otherFormatType}})) + spanCounts[otherFormatType] = newCountsByTransport(serviceMetrics, otherFormatType) } m := &SpanProcessorMetrics{ SaveLatency: hostMetrics.Timer(metrics.TimerOptions{Name: "save-latency", Tags: nil}), @@ -122,29 +153,35 @@ func newCountsBySvc(factory metrics.Factory, category string, maxServiceNames in } } -func newCountsBySpanType(factory metrics.Factory) CountsBySpanType { - return map[string]CountsByTransportType{ - HTTPEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": HTTPEndpoint}})), - TChannelEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": TChannelEndpoint}})), - GRPCEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": GRPCEndpoint}})), - EmptyEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": unknownTransportType}})), +func newCountsByTransport(factory metrics.Factory, format SpanFormat) SpanCountsByTransport { + factory = factory.Namespace(metrics.NSOptions{Tags: map[string]string{"format": string(format)}}) + return SpanCountsByTransport{ + HTTPTransport: newCounts(factory, HTTPTransport), + TChannelTransport: newCounts(factory, TChannelTransport), + GRPCTransport: newCounts(factory, GRPCTransport), + UnknownTransport: newCounts(factory, UnknownTransport), } } -func newCountsByTransport(factory metrics.Factory) CountsByTransportType { - return CountsByTransportType{ +func newCounts(factory metrics.Factory, transport InboundTransport) SpanCounts { + factory = factory.Namespace(metrics.NSOptions{Tags: map[string]string{"transport": string(transport)}}) + return SpanCounts{ RejectedBySvc: newMetricsBySvc(factory, "rejected"), ReceivedBySvc: newMetricsBySvc(factory, "received"), } } -// GetCountsForFormat gets the countsBySpanType for a given format. If none exists, we use the Unknown format. -func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat, endpointType string) CountsByTransportType { +// GetCountsForFormat gets the SpanCounts for a given format and transport. If none exists, we use the Unknown format. +func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat SpanFormat, transport InboundTransport) SpanCounts { c, ok := m.spanCounts[spanFormat] if !ok { - c = m.spanCounts[UnknownFormatType] + c = m.spanCounts[UnknownSpanFormat] + } + t, ok := c[transport] + if !ok { + t = c[UnknownTransport] } - return c[endpointType] + return t } // reportServiceNameForSpan determines the name of the service that emitted diff --git a/cmd/collector/app/metrics_test.go b/cmd/collector/app/metrics_test.go index 3d420d1cfc9..afcd3a7c15b 100644 --- a/cmd/collector/app/metrics_test.go +++ b/cmd/collector/app/metrics_test.go @@ -29,13 +29,13 @@ func TestProcessorMetrics(t *testing.T) { baseMetrics := metricstest.NewFactory(time.Hour) serviceMetrics := baseMetrics.Namespace(jaegerM.NSOptions{Name: "service", Tags: nil}) hostMetrics := baseMetrics.Namespace(jaegerM.NSOptions{Name: "host", Tags: nil}) - spm := NewSpanProcessorMetrics(serviceMetrics, hostMetrics, []string{"scruffy"}) - benderFormatHTTPMetrics := spm.GetCountsForFormat("bender", HTTPEndpoint) + spm := NewSpanProcessorMetrics(serviceMetrics, hostMetrics, []SpanFormat{SpanFormat("scruffy")}) + benderFormatHTTPMetrics := spm.GetCountsForFormat("bender", HTTPTransport) assert.NotNil(t, benderFormatHTTPMetrics) - benderFormatGRPCMetrics := spm.GetCountsForFormat("bender", GRPCEndpoint) + benderFormatGRPCMetrics := spm.GetCountsForFormat("bender", GRPCTransport) assert.NotNil(t, benderFormatGRPCMetrics) - jTChannelFormat := spm.GetCountsForFormat(JaegerFormatType, TChannelEndpoint) + jTChannelFormat := spm.GetCountsForFormat(JaegerSpanFormat, TChannelTransport) assert.NotNil(t, jTChannelFormat) jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{ Process: &model.Process{}, @@ -52,10 +52,10 @@ func TestProcessorMetrics(t *testing.T) { jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) counters, gauges := baseMetrics.Backend.Snapshot() - assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport="+TChannelEndpoint]) - assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry|transport="+TChannelEndpoint]) - assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry|transport="+TChannelEndpoint]) - assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry|transport="+TChannelEndpoint]) + assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport=tchannel"]) + assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry|transport=tchannel"]) + assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry|transport=tchannel"]) + assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry|transport=tchannel"]) assert.Empty(t, gauges) } diff --git a/cmd/collector/app/options.go b/cmd/collector/app/options.go index 6591ce55ccc..625b3c14bd2 100644 --- a/cmd/collector/app/options.go +++ b/cmd/collector/app/options.go @@ -41,7 +41,7 @@ type options struct { blockingSubmit bool queueSize int reportBusy bool - extraFormatTypes []string + extraFormatTypes []SpanFormat } // Option is a function that sets some option on StorageBuilder. @@ -128,7 +128,7 @@ func (options) ReportBusy(reportBusy bool) Option { } // ExtraFormatTypes creates an Option that initializes the extra list of format types -func (options) ExtraFormatTypes(extraFormatTypes []string) Option { +func (options) ExtraFormatTypes(extraFormatTypes []SpanFormat) Option { return func(b *options) { b.extraFormatTypes = extraFormatTypes } diff --git a/cmd/collector/app/options_test.go b/cmd/collector/app/options_test.go index 4a44de0f23c..b4c372c0ca2 100644 --- a/cmd/collector/app/options_test.go +++ b/cmd/collector/app/options_test.go @@ -25,7 +25,7 @@ import ( ) func TestAllOptionSet(t *testing.T) { - types := []string{"sneh"} + types := []SpanFormat{SpanFormat("sneh")} opts := Options.apply( Options.ReportBusy(true), Options.BlockingSubmit(true), diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index 0bf8d14335c..0ef20bf3040 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -28,8 +28,8 @@ import ( // ProcessSpansOptions additional options passed to processor along with the spans. type ProcessSpansOptions struct { - SpanFormat string - InboundTransport string + SpanFormat SpanFormat + InboundTransport InboundTransport } // SpanProcessor handles model spans @@ -140,8 +140,8 @@ func (sp *spanProcessor) processItemFromQueue(item *queueItem) { sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime)) } -func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat, endpointType string) bool { - spanCounts := sp.metrics.GetCountsForFormat(originalFormat, endpointType) +func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat SpanFormat, transport InboundTransport) bool { + spanCounts := sp.metrics.GetCountsForFormat(originalFormat, transport) spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span) if !sp.filterSpan(span) { diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index 1e93f9b3b66..6b143774108 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -38,13 +38,13 @@ func TestBySvcMetrics(t *testing.T) { allowedService := "bender" type TestCase struct { - format string + format SpanFormat serviceName string rootSpan bool debug bool } - spanFormat := [2]string{ZipkinFormatType, JaegerFormatType} + spanFormat := [2]SpanFormat{ZipkinSpanFormat, JaegerSpanFormat} serviceNames := [2]string{allowedService, blackListedService} rootSpanEnabled := [2]bool{true, false} debugEnabled := [2]bool{true, false} @@ -83,13 +83,13 @@ func TestBySvcMetrics(t *testing.T) { ) var metricPrefix, format string switch test.format { - case ZipkinFormatType: + case ZipkinSpanFormat: span := makeZipkinSpan(test.serviceName, test.rootSpan, test.debug) zHandler := NewZipkinSpanHandler(logger, processor, zipkinSanitizer.NewParentIDSanitizer()) zHandler.SubmitZipkinBatch([]*zc.Span{span, span}, SubmitBatchOptions{}) metricPrefix = "service" format = "zipkin" - case JaegerFormatType: + case JaegerSpanFormat: span, process := makeJaegerSpan(test.serviceName, test.rootSpan, test.debug) jHandler := NewJaegerSpanHandler(logger, processor) jHandler.SubmitBatches([]*jaeger.Batch{ @@ -109,21 +109,21 @@ func TestBySvcMetrics(t *testing.T) { expected := []metricstest.ExpectedMetric{} if test.debug { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=" + unknownTransportType, Value: 2, + Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2, }) } else { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + unknownTransportType, Value: 2, + Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2, }) } if test.rootSpan { if test.debug { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=" + unknownTransportType, Value: 2, + Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2, }) } else { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + unknownTransportType, Value: 2, + Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2, }) } } @@ -137,7 +137,7 @@ func TestBySvcMetrics(t *testing.T) { }) } else { expected = append(expected, metricstest.ExpectedMetric{ - Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + unknownTransportType, Value: 2, + Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2, }) } mb.AssertCounterMetrics(t, expected...) @@ -213,7 +213,7 @@ func TestSpanProcessor(t *testing.T) { ServiceName: "x", }, }, - }, ProcessSpansOptions{SpanFormat: JaegerFormatType}) + }, ProcessSpansOptions{SpanFormat: JaegerSpanFormat}) assert.NoError(t, err) assert.Equal(t, []bool{true}, res) } @@ -236,7 +236,7 @@ func TestSpanProcessorErrors(t *testing.T) { ServiceName: "x", }, }, - }, ProcessSpansOptions{SpanFormat: JaegerFormatType}) + }, ProcessSpansOptions{SpanFormat: JaegerSpanFormat}) assert.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -294,7 +294,7 @@ func TestSpanProcessorBusy(t *testing.T) { ServiceName: "x", }, }, - }, ProcessSpansOptions{SpanFormat: JaegerFormatType}) + }, ProcessSpansOptions{SpanFormat: JaegerSpanFormat}) assert.Error(t, err, "expcting busy error") assert.Nil(t, res) diff --git a/cmd/collector/app/tchannel_handler.go b/cmd/collector/app/tchannel_handler.go index 806ff1bb062..80cdd798e77 100644 --- a/cmd/collector/app/tchannel_handler.go +++ b/cmd/collector/app/tchannel_handler.go @@ -43,7 +43,9 @@ func (h *TChannelHandler) SubmitZipkinBatch( _ thrift.Context, spans []*zipkincore.Span, ) ([]*zipkincore.Response, error) { - return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{InboundTransport: TChannelEndpoint}) + return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{ + InboundTransport: TChannelTransport, + }) } // SubmitBatches implements jaeger.TChanCollector. @@ -51,5 +53,7 @@ func (h *TChannelHandler) SubmitBatches( _ thrift.Context, batches []*jaeger.Batch, ) ([]*jaeger.BatchSubmitResponse, error) { - return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{InboundTransport: TChannelEndpoint}) + return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{ + InboundTransport: TChannelTransport, + }) } diff --git a/cmd/collector/app/thrift_span_handler.go b/cmd/collector/app/thrift_span_handler.go index 1d44359d9e3..d7d9173aa29 100644 --- a/cmd/collector/app/thrift_span_handler.go +++ b/cmd/collector/app/thrift_span_handler.go @@ -25,18 +25,9 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) -const ( - // JaegerFormatType is for Jaeger Spans - JaegerFormatType = "jaeger" - // ZipkinFormatType is for zipkin Spans - ZipkinFormatType = "zipkin" - // UnknownFormatType is for spans that do not have a widely defined/well-known format type - UnknownFormatType = "unknown" -) - // SubmitBatchOptions are passed to Submit methods of the handlers. type SubmitBatchOptions struct { - InboundTransport string + InboundTransport InboundTransport } // ZipkinSpansHandler consumes and handles zipkin spans @@ -74,7 +65,7 @@ func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options } oks, err := jbh.modelProcessor.ProcessSpans(mSpans, ProcessSpansOptions{ InboundTransport: options.InboundTransport, - SpanFormat: JaegerFormatType, + SpanFormat: JaegerSpanFormat, }) if err != nil { jbh.logger.Error("Collector failed to process span batch", zap.Error(err)) @@ -121,7 +112,7 @@ func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options } bools, err := h.modelProcessor.ProcessSpans(mSpans, ProcessSpansOptions{ InboundTransport: options.InboundTransport, - SpanFormat: ZipkinFormatType, + SpanFormat: ZipkinSpanFormat, }) if err != nil { h.logger.Error("Collector failed to process Zipkin span batch", zap.Error(err)) diff --git a/cmd/collector/app/transport_handler_const.go b/cmd/collector/app/transport_handler_const.go deleted file mode 100644 index daab29af68f..00000000000 --- a/cmd/collector/app/transport_handler_const.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (c) 2019 The Jaeger Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package app - -const ( - // GRPCEndpoint is for gRPC endpoint - GRPCEndpoint = "GRPC" - - // TChannelEndpoint is for TChannel endpoint - TChannelEndpoint = "TChannel" - - // HTTPEndpoint is for HTTP endpoint - HTTPEndpoint = "HTTP" - - // EmptyEndpoint is for unknown endpoint - EmptyEndpoint = "" -) diff --git a/cmd/collector/app/zipkin/http_handler.go b/cmd/collector/app/zipkin/http_handler.go index 1628523ea2b..c94fe5ec97b 100644 --- a/cmd/collector/app/zipkin/http_handler.go +++ b/cmd/collector/app/zipkin/http_handler.go @@ -173,7 +173,7 @@ func gunzip(r io.ReadCloser) (*gzip.Reader, error) { func (aH *APIHandler) saveThriftSpans(tSpans []*zipkincore.Span) error { if len(tSpans) > 0 { - opts := app.SubmitBatchOptions{InboundTransport: app.HTTPEndpoint} + opts := app.SubmitBatchOptions{InboundTransport: app.HTTPTransport} if _, err := aH.zipkinSpansHandler.SubmitZipkinBatch(tSpans, opts); err != nil { return err } From 6613f0aa557040dcf70c715c61fdbde9af699aee Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sat, 6 Apr 2019 08:32:34 -0400 Subject: [PATCH 12/13] Clean-up comments Signed-off-by: Yuri Shkuro --- cmd/collector/app/grpc_handler.go | 2 +- cmd/collector/app/metrics.go | 33 +++++++++++++++++-------------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/cmd/collector/app/grpc_handler.go b/cmd/collector/app/grpc_handler.go index c4d05859456..dc989cb270d 100644 --- a/cmd/collector/app/grpc_handler.go +++ b/cmd/collector/app/grpc_handler.go @@ -45,7 +45,7 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) } _, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, ProcessSpansOptions{ InboundTransport: GRPCTransport, - SpanFormat: JaegerSpanFormat, + SpanFormat: ProtoSpanFormat, }) if err != nil { g.logger.Error("cannot process spans", zap.Error(err)) diff --git a/cmd/collector/app/metrics.go b/cmd/collector/app/metrics.go index 2d239f7ba45..6ceea8fdeab 100644 --- a/cmd/collector/app/metrics.go +++ b/cmd/collector/app/metrics.go @@ -23,8 +23,11 @@ import ( ) const ( - maxServiceNames = 2000 - otherServices = "other-services" + // TODO this needs to be configurable via CLI. + maxServiceNames = 4000 + + // otherServices is the catch-all label when number of services exceeds maxServiceNames + otherServices = "other-services" ) // SpanProcessorMetrics contains all the necessary metrics for the SpanProcessor @@ -65,13 +68,13 @@ type metricsBySvc struct { type InboundTransport string const ( - // GRPCTransport indicates spans received over gRPC + // GRPCTransport indicates spans received over gRPC. GRPCTransport InboundTransport = "grpc" - // TChannelTransport indicates spans received over TChannel + // TChannelTransport indicates spans received over TChannel. TChannelTransport InboundTransport = "tchannel" - // HTTPTransport indicates spans received over HTTP + // HTTPTransport indicates spans received over HTTP. HTTPTransport InboundTransport = "http" - // UnknownTransport is for unknown transport + // UnknownTransport is the fallback/catch-all category. UnknownTransport InboundTransport = "unknown" ) @@ -79,27 +82,27 @@ const ( type SpanFormat string const ( - // JaegerSpanFormat is for Jaeger Thrift spans + // JaegerSpanFormat is for Jaeger Thrift spans. JaegerSpanFormat SpanFormat = "jaeger" - // ZipkinSpanFormat is for Zipkin Thrift spans + // ZipkinSpanFormat is for Zipkin Thrift spans. ZipkinSpanFormat SpanFormat = "zipkin" - // ProtoSpanFormat is for Jaeger protobuf Spans + // ProtoSpanFormat is for Jaeger protobuf Spans. ProtoSpanFormat SpanFormat = "proto" - // UnknownSpanFormat is for spans that do not have a widely defined/well-known format type + // UnknownSpanFormat is the fallback/catch-all category. UnknownSpanFormat SpanFormat = "unknown" ) -// SpanCountsByFormat measures metrics by different span formats (thrift, proto, etc.) +// SpanCountsByFormat groups metrics by different span formats (thrift, proto, etc.) type SpanCountsByFormat map[SpanFormat]SpanCountsByTransport -// SpanCountsByTransport measures metrics by different endpoint types (e.g http, grpc, tchannel) +// SpanCountsByTransport groups metrics by inbound transport (e.g http, grpc, tchannel) type SpanCountsByTransport map[InboundTransport]SpanCounts -// SpanCounts measures received, rejected metrics +// SpanCounts contains countrs for received and rejected spans. type SpanCounts struct { - // ReceivedBySvc maintain by-service metrics + // ReceivedBySvc maintain by-service metrics. ReceivedBySvc metricsBySvc - // RejectedBySvc is the number of spans we rejected (usually due to blacklisting) by-service + // RejectedBySvc is the number of spans we rejected (usually due to blacklisting) by-service. RejectedBySvc metricsBySvc } From 7ca10eefec7042b4537284f5020a3e0ca91ccdf8 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Sat, 6 Apr 2019 08:49:18 -0400 Subject: [PATCH 13/13] Remove copy-pasta Signed-off-by: Yuri Shkuro --- cmd/collector/app/thrift_span_handler_test.go | 29 ------------------- 1 file changed, 29 deletions(-) diff --git a/cmd/collector/app/thrift_span_handler_test.go b/cmd/collector/app/thrift_span_handler_test.go index 5857d942a1c..99bdebde60f 100644 --- a/cmd/collector/app/thrift_span_handler_test.go +++ b/cmd/collector/app/thrift_span_handler_test.go @@ -55,21 +55,6 @@ func TestJaegerSpanHandler(t *testing.T) { assert.NoError(t, err) assert.True(t, res[0].Ok) } - - res, err = h.SubmitBatches([]*jaeger.Batch{ - { - Process: &jaeger.Process{ServiceName: "someServiceName"}, - Spans: []*jaeger.Span{{SpanId: 21345}}, - }, - }, SubmitBatchOptions{}) - if tc.expectedErr != nil { - assert.Nil(t, res) - assert.Equal(t, tc.expectedErr, err) - } else { - assert.Len(t, res, 1) - assert.NoError(t, err) - assert.True(t, res[0].Ok) - } } } @@ -117,19 +102,5 @@ func TestZipkinSpanHandler(t *testing.T) { assert.NoError(t, err) assert.True(t, res[0].Ok) } - - res, err = h.SubmitZipkinBatch([]*zipkincore.Span{ - { - ID: 12345, - }, - }, SubmitBatchOptions{}) - if tc.expectedErr != nil { - assert.Nil(t, res) - assert.Equal(t, tc.expectedErr, err) - } else { - assert.Len(t, res, 1) - assert.NoError(t, err) - assert.True(t, res[0].Ok) - } } }