From 07a00ff12434333f747bdca6fd7faf1b2d9f03ed Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Tue, 30 Jan 2024 17:11:04 +0000 Subject: [PATCH] [service] Move TracerProvider initialization to service/telemetry package (#9384) Second redo of https://github.com/open-telemetry/opentelemetry-collector/pull/8171 that does not depend on https://github.com/open-telemetry/opentelemetry-collector/pull/9131 Link to tracking Issue: Updates https://github.com/open-telemetry/opentelemetry-collector/issues/8170 --------- Co-authored-by: Alex Boten --- service/service.go | 4 +- service/telemetry.go | 57 --------------------------- service/telemetry/telemetry.go | 71 +++++++++++++++++++++++++++++++--- 3 files changed, 68 insertions(+), 64 deletions(-) diff --git a/service/service.go b/service/service.go index 4f7b04d42ed..ace8f4f3037 100644 --- a/service/service.go +++ b/service/service.go @@ -91,7 +91,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { collectorConf: set.CollectorConf, } var err error - srv.telemetry, err = telemetry.New(ctx, telemetry.Settings{ZapOptions: set.LoggingOptions}, cfg.Telemetry) + srv.telemetry, err = telemetry.New(ctx, telemetry.Settings{BuildInfo: set.BuildInfo, ZapOptions: set.LoggingOptions}, cfg.Telemetry) if err != nil { return nil, fmt.Errorf("failed to get logger: %w", err) } @@ -104,7 +104,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { } srv.telemetrySettings = servicetelemetry.TelemetrySettings{ Logger: logger, - TracerProvider: srv.telemetryInitializer.tp, + TracerProvider: srv.telemetry.TracerProvider(), MeterProvider: srv.telemetryInitializer.mp, MetricsLevel: cfg.Telemetry.Metrics.Level, // Construct telemetry attributes from build info and config's resource attributes. diff --git a/service/telemetry.go b/service/telemetry.go index bbf10f45f55..f644160fe8d 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -5,7 +5,6 @@ package service // import "go.opentelemetry.io/collector/service" import ( "context" - "errors" "net" "net/http" "strconv" @@ -13,16 +12,10 @@ import ( ocmetric "go.opencensus.io/metric" "go.opencensus.io/metric/metricproducer" "go.opentelemetry.io/contrib/config" - "go.opentelemetry.io/contrib/propagators/b3" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" noopmetric "go.opentelemetry.io/otel/metric/noop" - "go.opentelemetry.io/otel/propagation" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - "go.opentelemetry.io/otel/trace" - nooptrace "go.opentelemetry.io/otel/trace/noop" "go.uber.org/multierr" "go.uber.org/zap" @@ -34,20 +27,11 @@ import ( const ( zapKeyTelemetryAddress = "address" zapKeyTelemetryLevel = "level" - - // supported trace propagators - traceContextPropagator = "tracecontext" - b3Propagator = "b3" -) - -var ( - errUnsupportedPropagator = errors.New("unsupported trace propagator") ) type telemetryInitializer struct { ocRegistry *ocmetric.Registry mp metric.MeterProvider - tp trace.TracerProvider servers []*http.Server disableHighCardinality bool @@ -57,7 +41,6 @@ type telemetryInitializer struct { func newColTelemetry(disableHighCardinality bool, extendedConfig bool) *telemetryInitializer { return &telemetryInitializer{ mp: noopmetric.NewMeterProvider(), - tp: nooptrace.NewTracerProvider(), disableHighCardinality: disableHighCardinality, extendedConfig: extendedConfig, } @@ -74,34 +57,9 @@ func (tel *telemetryInitializer) init(res *resource.Resource, logger *zap.Logger } logger.Info("Setting up own telemetry...") - - if tp, err := tel.initTraces(res, cfg); err == nil { - tel.tp = tp - } else { - return err - } - - if tp, err := textMapPropagatorFromConfig(cfg.Traces.Propagators); err == nil { - otel.SetTextMapPropagator(tp) - } else { - return err - } - return tel.initMetrics(res, logger, cfg, asyncErrorChannel) } -func (tel *telemetryInitializer) initTraces(res *resource.Resource, cfg telemetry.Config) (trace.TracerProvider, error) { - opts := []sdktrace.TracerProviderOption{} - for _, processor := range cfg.Traces.Processors { - sp, err := proctelemetry.InitSpanProcessor(context.Background(), processor) - if err != nil { - return nil, err - } - opts = append(opts, sdktrace.WithSpanProcessor(sp)) - } - return proctelemetry.InitTracerProvider(res, opts) -} - func (tel *telemetryInitializer) initMetrics(res *resource.Resource, logger *zap.Logger, cfg telemetry.Config, asyncErrorChannel chan error) error { // Initialize the ocRegistry, still used by the process metrics. tel.ocRegistry = ocmetric.NewRegistry() @@ -171,18 +129,3 @@ func (tel *telemetryInitializer) shutdown() error { } return errs } - -func textMapPropagatorFromConfig(props []string) (propagation.TextMapPropagator, error) { - var textMapPropagators []propagation.TextMapPropagator - for _, prop := range props { - switch prop { - case traceContextPropagator: - textMapPropagators = append(textMapPropagators, propagation.TraceContext{}) - case b3Propagator: - textMapPropagators = append(textMapPropagators, b3.New()) - default: - return nil, errUnsupportedPropagator - } - } - return propagation.NewCompositeTextMapPropagator(textMapPropagators...), nil -} diff --git a/service/telemetry/telemetry.go b/service/telemetry/telemetry.go index 56dec2da0a6..d557d1e957e 100644 --- a/service/telemetry/telemetry.go +++ b/service/telemetry/telemetry.go @@ -5,12 +5,30 @@ package telemetry // import "go.opentelemetry.io/collector/service/telemetry" import ( "context" + "errors" + "go.opentelemetry.io/contrib/propagators/b3" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagation" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/trace" "go.uber.org/multierr" "go.uber.org/zap" "go.uber.org/zap/zapcore" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/proctelemetry" + "go.opentelemetry.io/collector/service/internal/resource" +) + +const ( + // supported trace propagators + traceContextPropagator = "tracecontext" + b3Propagator = "b3" +) + +var ( + errUnsupportedPropagator = errors.New("unsupported trace propagator") ) type Telemetry struct { @@ -35,25 +53,68 @@ func (t *Telemetry) Shutdown(ctx context.Context) error { // Settings holds configuration for building Telemetry. type Settings struct { + BuildInfo component.BuildInfo ZapOptions []zap.Option } // New creates a new Telemetry from Config. -func New(_ context.Context, set Settings, cfg Config) (*Telemetry, error) { +func New(ctx context.Context, set Settings, cfg Config) (*Telemetry, error) { logger, err := newLogger(cfg.Logs, set.ZapOptions) if err != nil { return nil, err } - tp := sdktrace.NewTracerProvider( - // needed for supporting the zpages extension - sdktrace.WithSampler(alwaysRecord()), - ) + + tp, err := newTracerProvider(ctx, set, cfg) + if err != nil { + return nil, err + } + return &Telemetry{ logger: logger, tracerProvider: tp, }, nil } +func newTracerProvider(ctx context.Context, set Settings, cfg Config) (*sdktrace.TracerProvider, error) { + opts := []sdktrace.TracerProviderOption{sdktrace.WithSampler(alwaysRecord())} + for _, processor := range cfg.Traces.Processors { + sp, err := proctelemetry.InitSpanProcessor(ctx, processor) + if err != nil { + return nil, err + } + opts = append(opts, sdktrace.WithSpanProcessor(sp)) + } + + res := resource.New(set.BuildInfo, cfg.Resource) + tp, err := proctelemetry.InitTracerProvider(res, opts) + if err != nil { + return nil, err + } + + if tp, err := textMapPropagatorFromConfig(cfg.Traces.Propagators); err == nil { + otel.SetTextMapPropagator(tp) + } else { + return nil, err + } + + return tp, nil +} + +func textMapPropagatorFromConfig(props []string) (propagation.TextMapPropagator, error) { + var textMapPropagators []propagation.TextMapPropagator + for _, prop := range props { + switch prop { + case traceContextPropagator: + textMapPropagators = append(textMapPropagators, propagation.TraceContext{}) + case b3Propagator: + textMapPropagators = append(textMapPropagators, b3.New()) + default: + return nil, errUnsupportedPropagator + } + } + return propagation.NewCompositeTextMapPropagator(textMapPropagators...), nil +} + func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) { // Copied from NewProductionConfig. zapCfg := &zap.Config{