Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add inbound transport as label to collector metrics #1446

Merged
merged 14 commits into from
Apr 6, 2019
4 changes: 2 additions & 2 deletions cmd/collector/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
}
}
_, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, ProcessSpansOptions{
InboundTransport: "grpc", // TODO do we have a constant?
SpanFormat: JaegerFormatType,
InboundTransport: GRPCTransport,
SpanFormat: ProtoSpanFormat,
})
if err != nil {
g.logger.Error("cannot process spans", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (aH *APIHandler) saveSpan(w http.ResponseWriter, r *http.Request) {
return
}
batches := []*tJaeger.Batch{batch}
opts := SubmitBatchOptions{InboundTransport: "http"} // TODO do we have a constant?
opts := SubmitBatchOptions{InboundTransport: HTTPTransport}
if _, err = aH.jaegerBatchesHandler.SubmitBatches(batches, opts); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError)
return
Expand Down
94 changes: 74 additions & 20 deletions cmd/collector/app/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@ import (
)

const (
maxServiceNames = 2000
otherServices = "other-services"
// TODO this needs to be configurable via CLI.
maxServiceNames = 4000

// otherServices is the catch-all label when number of services exceeds maxServiceNames
otherServices = "other-services"
)

// SpanProcessorMetrics contains all the necessary metrics for the SpanProcessor
Expand All @@ -44,7 +47,7 @@ type SpanProcessorMetrics struct {
SavedOkBySvc metricsBySvc // spans actually saved
SavedErrBySvc metricsBySvc // spans failed to save
serviceNames metrics.Gauge // total number of unique service name metrics reported by this collector
spanCounts map[string]CountsBySpanType
spanCounts SpanCountsByFormat
}

type countsBySvc struct {
Expand All @@ -61,23 +64,58 @@ type metricsBySvc struct {
traces countsBySvc // number of traces originated per service
}

// CountsBySpanType measures received, rejected, and receivedByService metrics for a format type
type CountsBySpanType struct {
// ReceivedBySvc maintain by-service metrics for a format type
// InboundTransport identifies the transport used to receive spans.
type InboundTransport string

const (
// GRPCTransport indicates spans received over gRPC.
GRPCTransport InboundTransport = "grpc"
// TChannelTransport indicates spans received over TChannel.
TChannelTransport InboundTransport = "tchannel"
// HTTPTransport indicates spans received over HTTP.
HTTPTransport InboundTransport = "http"
// UnknownTransport is the fallback/catch-all category.
UnknownTransport InboundTransport = "unknown"
)

// SpanFormat identifies the data format in which the span was originally received.
type SpanFormat string

const (
// JaegerSpanFormat is for Jaeger Thrift spans.
JaegerSpanFormat SpanFormat = "jaeger"
// ZipkinSpanFormat is for Zipkin Thrift spans.
ZipkinSpanFormat SpanFormat = "zipkin"
// ProtoSpanFormat is for Jaeger protobuf Spans.
ProtoSpanFormat SpanFormat = "proto"
// UnknownSpanFormat is the fallback/catch-all category.
UnknownSpanFormat SpanFormat = "unknown"
)

// SpanCountsByFormat groups metrics by different span formats (thrift, proto, etc.)
type SpanCountsByFormat map[SpanFormat]SpanCountsByTransport

// SpanCountsByTransport groups metrics by inbound transport (e.g http, grpc, tchannel)
type SpanCountsByTransport map[InboundTransport]SpanCounts

// SpanCounts contains countrs for received and rejected spans.
type SpanCounts struct {
// ReceivedBySvc maintain by-service metrics.
ReceivedBySvc metricsBySvc
// RejectedBySvc is the number of spans we rejected (usually due to blacklisting) by-service
// RejectedBySvc is the number of spans we rejected (usually due to blacklisting) by-service.
RejectedBySvc metricsBySvc
}

// NewSpanProcessorMetrics returns a SpanProcessorMetrics
func NewSpanProcessorMetrics(serviceMetrics metrics.Factory, hostMetrics metrics.Factory, otherFormatTypes []string) *SpanProcessorMetrics {
spanCounts := map[string]CountsBySpanType{
ZipkinFormatType: newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": ZipkinFormatType}})),
JaegerFormatType: newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": JaegerFormatType}})),
UnknownFormatType: newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": UnknownFormatType}})),
func NewSpanProcessorMetrics(serviceMetrics metrics.Factory, hostMetrics metrics.Factory, otherFormatTypes []SpanFormat) *SpanProcessorMetrics {
spanCounts := SpanCountsByFormat{
ZipkinSpanFormat: newCountsByTransport(serviceMetrics, ZipkinSpanFormat),
JaegerSpanFormat: newCountsByTransport(serviceMetrics, JaegerSpanFormat),
ProtoSpanFormat: newCountsByTransport(serviceMetrics, ProtoSpanFormat),
UnknownSpanFormat: newCountsByTransport(serviceMetrics, UnknownSpanFormat),
}
for _, otherFormatType := range otherFormatTypes {
spanCounts[otherFormatType] = newCountsBySpanType(serviceMetrics.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"format": otherFormatType}}))
spanCounts[otherFormatType] = newCountsByTransport(serviceMetrics, otherFormatType)
}
m := &SpanProcessorMetrics{
SaveLatency: hostMetrics.Timer(metrics.TimerOptions{Name: "save-latency", Tags: nil}),
Expand Down Expand Up @@ -118,20 +156,35 @@ func newCountsBySvc(factory metrics.Factory, category string, maxServiceNames in
}
}

func newCountsBySpanType(factory metrics.Factory) CountsBySpanType {
return CountsBySpanType{
func newCountsByTransport(factory metrics.Factory, format SpanFormat) SpanCountsByTransport {
factory = factory.Namespace(metrics.NSOptions{Tags: map[string]string{"format": string(format)}})
return SpanCountsByTransport{
HTTPTransport: newCounts(factory, HTTPTransport),
TChannelTransport: newCounts(factory, TChannelTransport),
GRPCTransport: newCounts(factory, GRPCTransport),
UnknownTransport: newCounts(factory, UnknownTransport),
}
}

func newCounts(factory metrics.Factory, transport InboundTransport) SpanCounts {
factory = factory.Namespace(metrics.NSOptions{Tags: map[string]string{"transport": string(transport)}})
return SpanCounts{
RejectedBySvc: newMetricsBySvc(factory, "rejected"),
ReceivedBySvc: newMetricsBySvc(factory, "received"),
}
}

// GetCountsForFormat gets the countsBySpanType for a given format. If none exists, we use the Unknown format.
func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat string) CountsBySpanType {
// GetCountsForFormat gets the SpanCounts for a given format and transport. If none exists, we use the Unknown format.
func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat SpanFormat, transport InboundTransport) SpanCounts {
c, ok := m.spanCounts[spanFormat]
if !ok {
return m.spanCounts[UnknownFormatType]
c = m.spanCounts[UnknownSpanFormat]
}
t, ok := c[transport]
if !ok {
t = c[UnknownTransport]
}
return c
return t
}

// reportServiceNameForSpan determines the name of the service that emitted
Expand Down Expand Up @@ -183,7 +236,8 @@ func (m *countsBySvc) countByServiceName(serviceName string, isDebug bool) {
if isDebug {
debugStr = "true"
}
c := m.factory.Counter(metrics.Options{Name: m.category, Tags: map[string]string{"svc": serviceName, "debug": debugStr}})
tags := map[string]string{"svc": serviceName, "debug": debugStr}
c := m.factory.Counter(metrics.Options{Name: m.category, Tags: tags})
counts[serviceName] = c
counter = c
} else {
Expand Down
29 changes: 16 additions & 13 deletions cmd/collector/app/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,33 @@ func TestProcessorMetrics(t *testing.T) {
baseMetrics := metricstest.NewFactory(time.Hour)
serviceMetrics := baseMetrics.Namespace(jaegerM.NSOptions{Name: "service", Tags: nil})
hostMetrics := baseMetrics.Namespace(jaegerM.NSOptions{Name: "host", Tags: nil})
spm := NewSpanProcessorMetrics(serviceMetrics, hostMetrics, []string{"scruffy"})
benderFormatMetrics := spm.GetCountsForFormat("bender")
assert.NotNil(t, benderFormatMetrics)
jFormat := spm.GetCountsForFormat(JaegerFormatType)
assert.NotNil(t, jFormat)
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{
spm := NewSpanProcessorMetrics(serviceMetrics, hostMetrics, []SpanFormat{SpanFormat("scruffy")})
benderFormatHTTPMetrics := spm.GetCountsForFormat("bender", HTTPTransport)
assert.NotNil(t, benderFormatHTTPMetrics)
benderFormatGRPCMetrics := spm.GetCountsForFormat("bender", GRPCTransport)
assert.NotNil(t, benderFormatGRPCMetrics)

jTChannelFormat := spm.GetCountsForFormat(JaegerSpanFormat, TChannelTransport)
assert.NotNil(t, jTChannelFormat)
jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{
Process: &model.Process{},
})
mSpan := model.Span{
Process: &model.Process{
ServiceName: "fry",
},
}
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
mSpan.Flags.SetDebug()
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
mSpan.ReplaceParentID(1234)
jFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
jTChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
counters, gauges := baseMetrics.Backend.Snapshot()

assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry"])
assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry"])
assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport=tchannel"])
assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry|transport=tchannel"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|svc=fry|transport=tchannel"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|svc=fry|transport=tchannel"])
assert.Empty(t, gauges)
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type options struct {
blockingSubmit bool
queueSize int
reportBusy bool
extraFormatTypes []string
extraFormatTypes []SpanFormat
}

// Option is a function that sets some option on StorageBuilder.
Expand Down Expand Up @@ -128,7 +128,7 @@ func (options) ReportBusy(reportBusy bool) Option {
}

// ExtraFormatTypes creates an Option that initializes the extra list of format types
func (options) ExtraFormatTypes(extraFormatTypes []string) Option {
func (options) ExtraFormatTypes(extraFormatTypes []SpanFormat) Option {
return func(b *options) {
b.extraFormatTypes = extraFormatTypes
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

func TestAllOptionSet(t *testing.T) {
types := []string{"sneh"}
types := []SpanFormat{SpanFormat("sneh")}
opts := Options.apply(
Options.ReportBusy(true),
Options.BlockingSubmit(true),
Expand Down
12 changes: 6 additions & 6 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package app
import (
"time"

"github.com/uber/tchannel-go"
tchannel "github.com/uber/tchannel-go"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer"
Expand All @@ -28,8 +28,8 @@ import (

// ProcessSpansOptions additional options passed to processor along with the spans.
type ProcessSpansOptions struct {
SpanFormat string
InboundTransport string
SpanFormat SpanFormat
InboundTransport InboundTransport
}

// SpanProcessor handles model spans
Expand Down Expand Up @@ -126,7 +126,7 @@ func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options ProcessSpans
sp.metrics.BatchSize.Update(int64(len(mSpans)))
retMe := make([]bool, len(mSpans))
for i, mSpan := range mSpans {
ok := sp.enqueueSpan(mSpan, options.SpanFormat)
ok := sp.enqueueSpan(mSpan, options.SpanFormat, options.InboundTransport)
if !ok && sp.reportBusy {
return nil, tchannel.ErrServerBusy
}
Expand All @@ -140,8 +140,8 @@ func (sp *spanProcessor) processItemFromQueue(item *queueItem) {
sp.metrics.InQueueLatency.Record(time.Since(item.queuedTime))
}

func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat string) bool {
spanCounts := sp.metrics.GetCountsForFormat(originalFormat)
func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat SpanFormat, transport InboundTransport) bool {
spanCounts := sp.metrics.GetCountsForFormat(originalFormat, transport)
spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span)

if !sp.filterSpan(span) {
Expand Down
25 changes: 12 additions & 13 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ func TestBySvcMetrics(t *testing.T) {
allowedService := "bender"

type TestCase struct {
format string
format SpanFormat
serviceName string
rootSpan bool
debug bool
}

spanFormat := [2]string{ZipkinFormatType, JaegerFormatType}
spanFormat := [2]SpanFormat{ZipkinSpanFormat, JaegerSpanFormat}
serviceNames := [2]string{allowedService, blackListedService}
rootSpanEnabled := [2]bool{true, false}
debugEnabled := [2]bool{true, false}
Expand Down Expand Up @@ -83,13 +83,13 @@ func TestBySvcMetrics(t *testing.T) {
)
var metricPrefix, format string
switch test.format {
case ZipkinFormatType:
case ZipkinSpanFormat:
span := makeZipkinSpan(test.serviceName, test.rootSpan, test.debug)
zHandler := NewZipkinSpanHandler(logger, processor, zipkinSanitizer.NewParentIDSanitizer())
zHandler.SubmitZipkinBatch([]*zc.Span{span, span}, SubmitBatchOptions{})
metricPrefix = "service"
format = "zipkin"
case JaegerFormatType:
case JaegerSpanFormat:
span, process := makeJaegerSpan(test.serviceName, test.rootSpan, test.debug)
jHandler := NewJaegerSpanHandler(logger, processor)
jHandler.SubmitBatches([]*jaeger.Batch{
Expand All @@ -109,21 +109,21 @@ func TestBySvcMetrics(t *testing.T) {
expected := []metricstest.ExpectedMetric{}
if test.debug {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName, Value: 2,
Name: metricPrefix + ".spans.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2,
})
} else {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName, Value: 2,
Name: metricPrefix + ".spans.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2,
})
}
if test.rootSpan {
if test.debug {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName, Value: 2,
Name: metricPrefix + ".traces.received|debug=true|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2,
})
} else {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName, Value: 2,
Name: metricPrefix + ".traces.received|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2,
})
}
}
Expand All @@ -137,7 +137,7 @@ func TestBySvcMetrics(t *testing.T) {
})
} else {
expected = append(expected, metricstest.ExpectedMetric{
Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName, Value: 2,
Name: metricPrefix + ".spans.rejected|debug=false|format=" + format + "|svc=" + test.serviceName + "|transport=unknown", Value: 2,
})
}
mb.AssertCounterMetrics(t, expected...)
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestSpanProcessor(t *testing.T) {
ServiceName: "x",
},
},
}, ProcessSpansOptions{SpanFormat: JaegerFormatType})
}, ProcessSpansOptions{SpanFormat: JaegerSpanFormat})
assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)
}
Expand All @@ -236,8 +236,7 @@ func TestSpanProcessorErrors(t *testing.T) {
ServiceName: "x",
},
},
}, ProcessSpansOptions{SpanFormat: JaegerFormatType})

}, ProcessSpansOptions{SpanFormat: JaegerSpanFormat})
assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)

Expand Down Expand Up @@ -295,7 +294,7 @@ func TestSpanProcessorBusy(t *testing.T) {
ServiceName: "x",
},
},
}, ProcessSpansOptions{SpanFormat: JaegerFormatType})
}, ProcessSpansOptions{SpanFormat: JaegerSpanFormat})

assert.Error(t, err, "expcting busy error")
assert.Nil(t, res)
Expand Down
8 changes: 6 additions & 2 deletions cmd/collector/app/tchannel_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ func (h *TChannelHandler) SubmitZipkinBatch(
_ thrift.Context,
spans []*zipkincore.Span,
) ([]*zipkincore.Response, error) {
return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{InboundTransport: "tchannel"})
return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{
InboundTransport: TChannelTransport,
})
}

// SubmitBatches implements jaeger.TChanCollector.
func (h *TChannelHandler) SubmitBatches(
_ thrift.Context,
batches []*jaeger.Batch,
) ([]*jaeger.BatchSubmitResponse, error) {
return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{InboundTransport: "tchannel"})
return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{
InboundTransport: TChannelTransport,
})
}
Loading