Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tenant ID processor #12

Merged
merged 15 commits into from
Jan 27, 2021
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ package:
.PHONY: lint
lint:
@echo "Running linters..."
@golangci-lint run ./... && echo "Done."
@golangci-lint run ./... && echo "Done."
16 changes: 15 additions & 1 deletion cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@ import (
"fmt"
"log"

"github.com/hypertrace/collector/processors/piifilterprocessor"
"go.opencensus.io/stats/view"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/service"
"go.opentelemetry.io/collector/service/defaultcomponents"

"github.com/hypertrace/collector/processors/piifilterprocessor"
"github.com/hypertrace/collector/processors/tenantidprocessor"
)

func main() {
if err := registerMetricViews(); err != nil {
log.Fatal(err)
}

factories, err := components()
if err != nil {
log.Fatalf("failed to build default components: %v", err)
Expand All @@ -35,6 +42,8 @@ func components() (component.Factories, error) {
if err != nil {
return component.Factories{}, err
}
tIDprocessor := tenantidprocessor.NewFactory()
factories.Processors[tIDprocessor.Type()] = tIDprocessor

processors := []component.ProcessorFactory{
piifilterprocessor.NewFactory(),
Expand Down Expand Up @@ -63,3 +72,8 @@ func run(params service.Parameters) error {

return nil
}

func registerMetricViews() error {
views := tenantidprocessor.MetricViews()
return view.Register(views...)
}
13 changes: 2 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,10 @@ module github.com/hypertrace/collector
go 1.15

require (
github.com/go-playground/validator/v10 v10.4.0 // indirect
github.com/go-sql-driver/mysql v1.5.0 // indirect
github.com/golangci/golangci-lint v1.31.0 // indirect
github.com/google/addlicense v0.0.0-20200622132530-df58acafd6d5 // indirect
github.com/joshdk/go-junit v0.0.0-20200702055522-6efcf4050909 // indirect
github.com/json-iterator/go v1.1.10
github.com/ory/go-acc v0.2.6 // indirect
github.com/pavius/impi v0.0.3 // indirect
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect
github.com/sectioneight/md-to-godoc v0.0.0-20161108233149-55e43be6c335 // indirect
github.com/stretchr/testify v1.6.1
github.com/tcnksm/ghr v0.13.0 // indirect
go.opencensus.io v0.22.5
go.opentelemetry.io/collector v0.18.0
go.uber.org/zap v1.16.0
sigs.k8s.io/structured-merge-diff/v3 v3.0.0 // indirect
google.golang.org/grpc v1.34.1
)
322 changes: 12 additions & 310 deletions go.sum

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions processors/tenantidprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package tenantidprocessor

import "go.opentelemetry.io/collector/config/configmodels"

// Config defines config for tenantid processor
type Config struct {
configmodels.ProcessorSettings `mapstructure:",squash"`

// TenantIDHeaderName defines tenant HTTP header name
TenantIDHeaderName string `mapstructure:"tenantid_header_name"`
// TenantIDAttributeKey defines span attribute key for tenant
TenantIDAttributeKey string `mapstructure:"tenantid_attribute_key"`
}
60 changes: 60 additions & 0 deletions processors/tenantidprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package tenantidprocessor

import (
"context"
"go.opencensus.io/stats/view"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor/processorhelper"
)

const (
typeStr = "hypertrace_tenantid"
defaultTenantIdHeaderName = "x-tenant-id"
defaultTenantIdAttributeKey = "tenant-id"
)

// NewFactory creates a factory for the tenantid processor.
// The processor adds tenant ID to every received span.
// The processor returns an error when the tenant ID is missing.
// The tenant ID header is obtained from the context object.
// The batch processor cleans context, therefore this processor
// has to be added before, ideally right after the receiver.
func NewFactory() component.ProcessorFactory {
return processorhelper.NewFactory(
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTraceProcessor),
)
}

func createDefaultConfig() configmodels.Processor {
return &Config{
ProcessorSettings: configmodels.ProcessorSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
TenantIDHeaderName: defaultTenantIdHeaderName,
TenantIDAttributeKey: defaultTenantIdAttributeKey,
}
}

func createTraceProcessor(
_ context.Context,
params component.ProcessorCreateParams,
cfg configmodels.Processor,
nextConsumer consumer.TracesConsumer,
) (component.TracesProcessor, error) {
pCfg := cfg.(*Config)
return processorhelper.NewTraceProcessor(
cfg,
nextConsumer,
&processor{
tenantIDAttributeKey: pCfg.TenantIDAttributeKey,
tenantIDHeaderName: pCfg.TenantIDHeaderName,
logger: params.Logger,
tenantIDViews: make(map[string]*view.View),
})
}
30 changes: 30 additions & 0 deletions processors/tenantidprocessor/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package tenantidprocessor

import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

var (
tagTenantID = tag.MustNewKey("tenant-id")

statSpanPerTenant = stats.Int64("tenant_id_span_count", "Number of spans received from a tenant", stats.UnitDimensionless)
)

// MetricViews returns the metrics views related to storage.
func MetricViews() []*view.View {
tags := []tag.Key{tagTenantID}

viewSpanCount := &view.View{
Name: statSpanPerTenant.Name(),
Description: statSpanPerTenant.Description(),
Measure: statSpanPerTenant,
Aggregation: view.Sum(),
TagKeys: tags,
}

return []*view.View{
viewSpanCount,
}
}
68 changes: 68 additions & 0 deletions processors/tenantidprocessor/tenantidprocessor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package tenantidprocessor

import (
"context"
"fmt"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"strings"

"go.opencensus.io/stats/view"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.uber.org/zap"
"google.golang.org/grpc/metadata"
)

type processor struct {
tenantIDHeaderName string
tenantIDAttributeKey string
logger *zap.Logger
tenantIDViews map[string]*view.View
}

var _ processorhelper.TProcessor = (*processor)(nil)

// ProcessTraces implements processorhelper.TProcessor
func (p *processor) ProcessTraces(ctx context.Context, traces pdata.Traces) (pdata.Traces, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
p.logger.Error("Could not extract headers from context", zap.Int("num-spans", traces.SpanCount()))
return traces, fmt.Errorf("missing header %s", p.tenantIDHeaderName)
}

tenantIDHeaders := md.Get(p.tenantIDHeaderName)
if len(tenantIDHeaders) == 0 {
return traces, nil
} else if len(tenantIDHeaders) > 1 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there ever a case where this would be valid? I'm inclined to err on the side of caution and
just drop spans which have multiple tenant header values.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I don't know, it's probably safer to return an error in this case. Every user should have assigned a single tenant.

p.logger.Warn("Multiple tenant IDs provided, only the first one will be used",
zap.String("header-name", p.tenantIDHeaderName), zap.String("header-value", strings.Join(tenantIDHeaders, ",")))
}

tenantID := tenantIDHeaders[0]
p.addTenantIdToSpans(traces, tenantID)

ctx, _ = tag.New(ctx,
tag.Insert(tagTenantID, tenantID))
stats.Record(ctx, statSpanPerTenant.M(int64(traces.SpanCount())))

return traces, nil
}

func (p *processor) addTenantIdToSpans(traces pdata.Traces, tenantIDHeaderValue string) {
rss := traces.ResourceSpans()
for i := 0; i < rss.Len(); i++ {
rs := rss.At(i)

ilss := rs.InstrumentationLibrarySpans()
for j := 0; j < ilss.Len(); j++ {
ils := ilss.At(j)

spans := ils.Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
span.Attributes().Insert(p.tenantIDAttributeKey, pdata.NewAttributeValueString(tenantIDHeaderValue))
}
}
}
}
Loading