Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add inbound transport as label to collector metrics #1446

Merged
merged 14 commits into from
Apr 6, 2019
Prev Previous commit
Next Next commit
refactor
Signed-off-by: Jude Wang <[email protected]>
  • Loading branch information
Jude Wang committed Apr 5, 2019
commit f40afbb0701c359b862390439b3b550eeeccae01
5 changes: 0 additions & 5 deletions cmd/collector/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ import (
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

const (
// GRPCEndpoint is for gRPC endpoint
GRPCEndpoint = "GRPC"
)

// GRPCHandler implements gRPC CollectorService.
type GRPCHandler struct {
logger *zap.Logger
Expand Down
2 changes: 0 additions & 2 deletions cmd/collector/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
const (
// UnableToReadBodyErrFormat is an error message for invalid requests
UnableToReadBodyErrFormat = "Unable to process request body: %v"
// HTTPEndpoint is for HTTP endpoint
HTTPEndpoint = "HTTP"
)

var (
Expand Down
93 changes: 49 additions & 44 deletions cmd/collector/app/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@ import (
)

const (
maxServiceNames = 2000
defaultTransportType = "Undefined"
otherServices = "other-services"
otherServicesViaHTTP = "other-services-via-http"
otherServicesViaTChannel = "other-services-via-tchannel"
otherServicesViaGRPC = "other-services-via-grpc"
otherServicesViaDefault = "other-services-via-undefined"
maxServiceNames = 2000
unknownTransportType = "Undefined"
otherServices = "other-services"
)

// SpanProcessorMetrics contains all the necessary metrics for the SpanProcessor
Expand Down Expand Up @@ -68,6 +64,13 @@ type metricsBySvc struct {

// CountsBySpanType measures received, rejected, and receivedByService metrics for a format type
type CountsBySpanType struct {
HTTPEndpoint CountsByTransportType
TChannelEndpoint CountsByTransportType
GRPCEndpoint CountsByTransportType
UnknownEndpoint CountsByTransportType
}

type CountsByTransportType struct {
// ReceivedBySvc maintain by-service metrics for a format type
ReceivedBySvc metricsBySvc
// RejectedBySvc is the number of spans we rejected (usually due to blacklisting) by-service
Expand Down Expand Up @@ -109,65 +112,79 @@ func newMetricsBySvc(factory metrics.Factory, category string) metricsBySvc {
}

func newCountsBySvc(factory metrics.Factory, category string, maxServiceNames int) countsBySvc {
// Add 3 to maxServiceNames threshold to compensate for extra slots taken by transport types
maxServiceNames = maxServiceNames + 3
return countsBySvc{
counts: newCountsByTransport(factory, category, "false"),
debugCounts: newCountsByTransport(factory, category, "true"),
counts: map[string]metrics.Counter{
otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "false"}}),
},
debugCounts: map[string]metrics.Counter{
otherServices: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": "true"}}),
},
factory: factory,
lock: &sync.Mutex{},
maxServiceNames: maxServiceNames,
category: category,
}
}

func newCountsByTransport(factory metrics.Factory, category string, debugFlag string) map[string]metrics.Counter {
return map[string]metrics.Counter{
otherServicesViaHTTP: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": HTTPEndpoint}}),
otherServicesViaTChannel: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": TChannelEndpoint}}),
otherServicesViaGRPC: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": GRPCEndpoint}}),
otherServicesViaDefault: factory.Counter(metrics.Options{Name: category, Tags: map[string]string{"svc": otherServices, "debug": debugFlag, "transport": defaultTransportType}}),
func newCountsBySpanType(factory metrics.Factory) CountsBySpanType {
return CountsBySpanType{
HTTPEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": HTTPEndpoint}})),
TChannelEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": TChannelEndpoint}})),
GRPCEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": GRPCEndpoint}})),
UnknownEndpoint: newCountsByTransport(factory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"transport": unknownTransportType}})),
}
}

func newCountsBySpanType(factory metrics.Factory) CountsBySpanType {
return CountsBySpanType{
func newCountsByTransport(factory metrics.Factory) CountsByTransportType {
return CountsByTransportType{
RejectedBySvc: newMetricsBySvc(factory, "rejected"),
ReceivedBySvc: newMetricsBySvc(factory, "received"),
}
}

// GetCountsForFormat gets the countsBySpanType for a given format. If none exists, we use the Unknown format.
func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat string) CountsBySpanType {
func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat, endpointType string) CountsByTransportType {
c, ok := m.spanCounts[spanFormat]
if !ok {
return m.spanCounts[UnknownFormatType]
c = m.spanCounts[UnknownFormatType]
}

var counter CountsByTransportType
switch endpointType {
case HTTPEndpoint:
counter = c.HTTPEndpoint
case TChannelEndpoint:
counter = c.TChannelEndpoint
case GRPCEndpoint:
counter = c.GRPCEndpoint
default:
counter = c.UnknownEndpoint
}
return c
return counter
}

// reportServiceNameForSpan determines the name of the service that emitted
// the span and reports a counter stat.
func (m metricsBySvc) ReportServiceNameForSpan(span *model.Span, endpoint string) {
func (m metricsBySvc) ReportServiceNameForSpan(span *model.Span) {
serviceName := span.Process.ServiceName
if serviceName == "" {
return
}
m.countSpansByServiceName(serviceName, span.Flags.IsDebug(), endpoint)
m.countSpansByServiceName(serviceName, span.Flags.IsDebug())
if span.ParentSpanID() == 0 {
m.countTracesByServiceName(serviceName, span.Flags.IsDebug(), endpoint)
m.countTracesByServiceName(serviceName, span.Flags.IsDebug())
}
}

// countSpansByServiceName counts how many spans are received per service.
func (m metricsBySvc) countSpansByServiceName(serviceName string, isDebug bool, endpoint string) {
m.spans.countByServiceName(serviceName, isDebug, endpoint)
func (m metricsBySvc) countSpansByServiceName(serviceName string, isDebug bool) {
m.spans.countByServiceName(serviceName, isDebug)
}

// countTracesByServiceName counts how many traces are received per service,
// i.e. the counter is only incremented for the root spans.
func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool, endpoint string) {
m.traces.countByServiceName(serviceName, isDebug, endpoint)
func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool) {
m.traces.countByServiceName(serviceName, isDebug)
}

// countByServiceName maintains a map of counters for each service name it's
Expand All @@ -180,7 +197,7 @@ func (m metricsBySvc) countTracesByServiceName(serviceName string, isDebug bool,
// total number of stored counters, so if it exceeds say the 90% threshold
// an alert should be raised to investigate what's causing so many unique
// service names.
func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool, endpointType string) {
func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool) {
serviceName = NormalizeServiceName(serviceName)
counts := m.counts
if isDebug {
Expand All @@ -195,24 +212,12 @@ func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool, endpo
if isDebug {
debugStr = "true"
}
tags := map[string]string{"svc": serviceName, "debug": debugStr, "transport": defaultTransportType}
if endpointType != "" {
tags["transport"] = endpointType
}
tags := map[string]string{"svc": serviceName, "debug": debugStr}
c := m.factory.Counter(metrics.Options{Name: m.category, Tags: tags})
counts[serviceName] = c
counter = c
} else {
switch endpointType {
case HTTPEndpoint:
counter = counts[otherServicesViaHTTP]
case TChannelEndpoint:
counter = counts[otherServicesViaTChannel]
case GRPCEndpoint:
counter = counts[otherServicesViaGRPC]
default:
counter = counts[otherServicesViaDefault]
}
counter = counts[otherServices]
}
m.lock.Unlock()
counter.Inc(1)
Expand Down
52 changes: 26 additions & 26 deletions cmd/collector/app/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,26 @@ func TestProcessorMetrics(t *testing.T) {
serviceMetrics := baseMetrics.Namespace(jaegerM.NSOptions{Name: "service", Tags: nil})
hostMetrics := baseMetrics.Namespace(jaegerM.NSOptions{Name: "host", Tags: nil})
spm := NewSpanProcessorMetrics(serviceMetrics, hostMetrics, []string{"scruffy"})
benderFormatMetrics := spm.GetCountsForFormat("bender")
assert.NotNil(t, benderFormatMetrics)
jFormat := spm.GetCountsForFormat(JaegerFormatType)
assert.NotNil(t, jFormat)
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{
benderFormatHTTPMetrics := spm.GetCountsForFormat("bender", HTTPEndpoint)
assert.NotNil(t, benderFormatHTTPMetrics)
benderFormatGRPCMetrics := spm.GetCountsForFormat("bender", GRPCEndpoint)
assert.NotNil(t, benderFormatGRPCMetrics)

jTChannelFormat := spm.GetCountsForFormat(JaegerFormatType, TChannelEndpoint)
assert.NotNil(t, jTChannelFormat)
jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{
Process: &model.Process{},
}, TChannelEndpoint)
})
mSpan := model.Span{
Process: &model.Process{
ServiceName: "fry",
},
}
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, TChannelEndpoint)
jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
mSpan.Flags.SetDebug()
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, TChannelEndpoint)
jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
mSpan.ReplaceParentID(1234)
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan, TChannelEndpoint)
jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
counters, gauges := baseMetrics.Backend.Snapshot()

assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport="+TChannelEndpoint])
Expand All @@ -60,26 +63,23 @@ func TestNewCountsBySvc(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
metrics := newCountsBySvc(baseMetrics, "not_on_my_level", 3)

metrics.countByServiceName("fry", false, "")
metrics.countByServiceName("leela", false, "")
metrics.countByServiceName("bender", false, "")
metrics.countByServiceName("zoidberg", false, "")
metrics.countByServiceName("zoidberg2", false, TChannelEndpoint)
metrics.countByServiceName("zoidberg3", false, HTTPEndpoint)
metrics.countByServiceName("fry", false)
metrics.countByServiceName("leela", false)
metrics.countByServiceName("bender", false)
metrics.countByServiceName("zoidberg", false)

counters, _ := baseMetrics.Backend.Snapshot()
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=fry|transport="+defaultTransportType])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=leela|transport="+defaultTransportType])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=other-services|transport="+TChannelEndpoint])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=other-services|transport="+HTTPEndpoint])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=fry"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=false|svc=leela"])
assert.EqualValues(t, 2, counters["not_on_my_level|debug=false|svc=other-services"])

metrics.countByServiceName("zoidberg", true, GRPCEndpoint)
metrics.countByServiceName("bender", true, GRPCEndpoint)
metrics.countByServiceName("leela", true, GRPCEndpoint)
metrics.countByServiceName("fry", true, GRPCEndpoint)
metrics.countByServiceName("zoidberg", true)
metrics.countByServiceName("bender", true)
metrics.countByServiceName("leela", true)
metrics.countByServiceName("fry", true)

counters, _ = baseMetrics.Backend.Snapshot()
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=zoidberg|transport="+GRPCEndpoint])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=bender|transport="+GRPCEndpoint])
assert.EqualValues(t, 2, counters["not_on_my_level|debug=true|svc=other-services|transport="+GRPCEndpoint])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=zoidberg"])
assert.EqualValues(t, 1, counters["not_on_my_level|debug=true|svc=bender"])
assert.EqualValues(t, 2, counters["not_on_my_level|debug=true|svc=other-services"])
}
10 changes: 5 additions & 5 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func (sp *spanProcessor) saveSpan(span *model.Span) {
startTime := time.Now()
if err := sp.spanWriter.WriteSpan(span); err != nil {
sp.logger.Error("Failed to save span", zap.Error(err))
sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span, "")
sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span)
} else {
sp.logger.Debug("Span written to the storage by the collector",
zap.Stringer("trace-id", span.TraceID), zap.Stringer("span-id", span.SpanID))
sp.metrics.SavedOkBySvc.ReportServiceNameForSpan(span, "")
sp.metrics.SavedOkBySvc.ReportServiceNameForSpan(span)
}
sp.metrics.SaveLatency.Record(time.Since(startTime))
}
Expand All @@ -141,11 +141,11 @@ func (sp *spanProcessor) processItemFromQueue(item *queueItem) {
}

func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat, endpointType string) bool {
spanCounts := sp.metrics.GetCountsForFormat(originalFormat)
spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span, endpointType)
spanCounts := sp.metrics.GetCountsForFormat(originalFormat, endpointType)
spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span)

if !sp.filterSpan(span) {
spanCounts.RejectedBySvc.ReportServiceNameForSpan(span, endpointType)
spanCounts.RejectedBySvc.ReportServiceNameForSpan(span)
return true // as in "not dropped", because it's actively rejected
}
item := &queueItem{
Expand Down
12 changes: 6 additions & 6 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,21 +109,21 @@ func TestBySvcMetrics(t *testing.T) {
expected := []metricstest.ExpectedMetric{}
if test.debug {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2,
Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=" + unknownTransportType, Value: 2,
})
} else {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2,
Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + unknownTransportType, Value: 2,
})
}
if test.rootSpan {
if test.debug {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2,
Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=" + unknownTransportType, Value: 2,
})
} else {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2,
Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + unknownTransportType, Value: 2,
})
}
}
Expand All @@ -137,7 +137,7 @@ func TestBySvcMetrics(t *testing.T) {
})
} else {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + defaultTransportType, Value: 2,
Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=" + unknownTransportType, Value: 2,
})
}
mb.AssertCounterMetrics(t, expected...)
Expand Down Expand Up @@ -249,7 +249,7 @@ func TestSpanProcessorErrors(t *testing.T) {
}, logBuf.JSONLine(0))

expected := []metricstest.ExpectedMetric{{
Name: "service.spans.saved-by-svc|debug=false|result=err|svc=x|transport=" + defaultTransportType, Value: 1,
Name: "service.spans.saved-by-svc|debug=false|result=err|svc=x", Value: 1,
}}
mb.AssertCounterMetrics(t, expected...)
}
Expand Down
5 changes: 0 additions & 5 deletions cmd/collector/app/tchannel_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ import (
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

const (
// TChannelEndpoint is for TChannel endpoint
TChannelEndpoint = "TChannel"
)

// TChannelHandler implements jaeger.TChanCollector and zipkincore.TChanZipkinCollector.
type TChannelHandler struct {
jaegerHandler JaegerBatchesHandler
Expand Down
12 changes: 12 additions & 0 deletions cmd/collector/app/transport_handler_const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package app

const (
// GRPCEndpoint is for gRPC endpoint
GRPCEndpoint = "GRPC"

// TChannelEndpoint is for TChannel endpoint
TChannelEndpoint = "TChannel"

// HTTPEndpoint is for HTTP endpoint
HTTPEndpoint = "HTTP"
)