From 468688c5b63d18ef5b9496d31f885849a22f5807 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Wed, 21 Oct 2020 02:22:56 +1100 Subject: [PATCH 01/18] add metrics processor --- sdk/metric/config.go | 12 ++++++++++ sdk/metric/controller/push/config.go | 13 +++++++++++ sdk/metric/controller/push/push.go | 1 + sdk/metric/sdk.go | 34 ++++++++++++++++++++++------ 4 files changed, 53 insertions(+), 7 deletions(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 02773ca75e4..52ec40b49fe 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -23,6 +23,8 @@ type Config struct { // Resource describes all the metric records processed by the // Accumulator. Resource *resource.Resource + + MetricsProcessors []MetricsProcessor } // Option is the interface that applies the value to a configuration option. @@ -43,3 +45,13 @@ type resourceOption struct { func (o resourceOption) Apply(config *Config) { config.Resource = o.Resource } + +func WithMetricsProcessors(processors []MetricsProcessor) Option { + return metricsProcessorsOption(processors) +} + +type metricsProcessorsOption []MetricsProcessor + +func (p metricsProcessorsOption) Apply(config *Config) { + config.MetricsProcessors = p +} diff --git a/sdk/metric/controller/push/config.go b/sdk/metric/controller/push/config.go index a22b300b5ea..83e94e13d88 100644 --- a/sdk/metric/controller/push/config.go +++ b/sdk/metric/controller/push/config.go @@ -17,6 +17,7 @@ package push import ( "time" + "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" ) @@ -33,6 +34,8 @@ type Config struct { // integrate, and export) can last before it is canceled. Defaults to // the controller push period. Timeout time.Duration + + MetricsProcessors []metric.MetricsProcessor } // Option is the interface that applies the value to a configuration option. @@ -73,3 +76,13 @@ type timeoutOption time.Duration func (o timeoutOption) Apply(config *Config) { config.Timeout = time.Duration(o) } + +func WithMetricsProcessor(p metric.MetricsProcessor) Option { + return metricsProcessorOption{p} +} + +type metricsProcessorOption struct{ metric.MetricsProcessor } + +func (m metricsProcessorOption) Apply(config *Config) { + config.MetricsProcessors = append(config.MetricsProcessors, m.MetricsProcessor) +} diff --git a/sdk/metric/controller/push/push.go b/sdk/metric/controller/push/push.go index 1261f0a0e33..55913715cf7 100644 --- a/sdk/metric/controller/push/push.go +++ b/sdk/metric/controller/push/push.go @@ -62,6 +62,7 @@ func New(checkpointer export.Checkpointer, exporter export.Exporter, opts ...Opt impl := sdk.NewAccumulator( checkpointer, sdk.WithResource(c.Resource), + sdk.WithMetricsProcessors(c.MetricsProcessors), ) return &Controller{ provider: registry.NewMeterProvider(impl), diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index dc7e1a8f23f..5bab46797ec 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -67,6 +67,8 @@ type ( // resource is applied to all records in this Accumulator. resource *resource.Resource + + metricsProcessors []MetricsProcessor } syncInstrument struct { @@ -123,8 +125,9 @@ type ( } instrument struct { - meter *Accumulator - descriptor otel.Descriptor + meter *Accumulator + descriptor otel.Descriptor + metricsProcessors []MetricsProcessor } asyncInstrument struct { @@ -154,6 +157,14 @@ func (inst *instrument) Descriptor() api.Descriptor { return inst.descriptor } +func (inst *instrument) AddMetricsProcessor(p MetricsProcessor) { + inst.metricsProcessors = append(inst.metricsProcessors, p) +} + +func (inst *instrument) getMetricsProcessors() []MetricsProcessor { + return inst.metricsProcessors +} + func (a *asyncInstrument) Implementation() interface{} { return a } @@ -290,7 +301,14 @@ func (s *syncInstrument) Bind(kvs []label.KeyValue) api.BoundSyncImpl { return s.acquireHandle(kvs, nil) } +type MetricsProcessor interface { + OnMetricRecorded(context.Context, *[]label.KeyValue) +} + func (s *syncInstrument) RecordOne(ctx context.Context, number api.Number, kvs []label.KeyValue) { + for _, processor := range s.getMetricsProcessors() { + processor.OnMetricRecorded(ctx, &kvs) + } h := s.acquireHandle(kvs, nil) defer h.Unbind() h.RecordOne(ctx, number) @@ -312,9 +330,10 @@ func NewAccumulator(processor export.Processor, opts ...Option) *Accumulator { } return &Accumulator{ - processor: processor, - asyncInstruments: internal.NewAsyncInstrumentState(), - resource: c.Resource, + asyncInstruments: internal.NewAsyncInstrumentState(), + processor: processor, + resource: c.Resource, + metricsProcessors: c.MetricsProcessors, } } @@ -322,8 +341,9 @@ func NewAccumulator(processor export.Processor, opts ...Option) *Accumulator { func (m *Accumulator) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl, error) { return &syncInstrument{ instrument: instrument{ - descriptor: descriptor, - meter: m, + descriptor: descriptor, + meter: m, + metricsProcessors: m.metricsProcessors, }, }, nil } From fc0b5755a3430962404a4b5e6ff45e8b5bfc5a9c Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Wed, 21 Oct 2020 02:24:42 +1100 Subject: [PATCH 02/18] remove unused method --- sdk/metric/sdk.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 5bab46797ec..e6d3840f7c1 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -157,10 +157,6 @@ func (inst *instrument) Descriptor() api.Descriptor { return inst.descriptor } -func (inst *instrument) AddMetricsProcessor(p MetricsProcessor) { - inst.metricsProcessors = append(inst.metricsProcessors, p) -} - func (inst *instrument) getMetricsProcessors() []MetricsProcessor { return inst.metricsProcessors } From 4e641ac33e742324d88157fb2c513ac068cc6045 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Wed, 21 Oct 2020 20:32:54 +1100 Subject: [PATCH 03/18] move interface and add comments --- sdk/metric/config.go | 2 ++ sdk/metric/sdk.go | 9 +++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 52ec40b49fe..478a01438e4 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -24,6 +24,8 @@ type Config struct { // Accumulator. Resource *resource.Resource + // MetricProcessors are executed each time a metric is recorded + // by the Accumulator's sync instrument implementation MetricsProcessors []MetricsProcessor } diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index e6d3840f7c1..1167d5e508a 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -68,6 +68,7 @@ type ( // resource is applied to all records in this Accumulator. resource *resource.Resource + // metricsProcessors are applied to all records in this Accumulator metricsProcessors []MetricsProcessor } @@ -142,6 +143,10 @@ type ( labels *label.Set observed export.Aggregator } + + MetricsProcessor interface { + OnMetricRecorded(context.Context, *[]label.KeyValue) + } ) var ( @@ -297,10 +302,6 @@ func (s *syncInstrument) Bind(kvs []label.KeyValue) api.BoundSyncImpl { return s.acquireHandle(kvs, nil) } -type MetricsProcessor interface { - OnMetricRecorded(context.Context, *[]label.KeyValue) -} - func (s *syncInstrument) RecordOne(ctx context.Context, number api.Number, kvs []label.KeyValue) { for _, processor := range s.getMetricsProcessors() { processor.OnMetricRecorded(ctx, &kvs) From fe2c130e2079f525f6f2db4290260ebabc9a3e28 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Wed, 21 Oct 2020 20:51:29 +1100 Subject: [PATCH 04/18] add comments for MetricsProcessor interface --- sdk/metric/sdk.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 1167d5e508a..6b54232dbef 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -144,7 +144,12 @@ type ( observed export.Aggregator } + // MetricsProcessor can be provided as an config option when creating + // a new push controller MetricsProcessor interface { + // OnMetricRecorded is execute everytime a metric is recorded by + // the sync instrument implementation of an Accumulator, it generally + // provides ability to correlate the context with the metrics OnMetricRecorded(context.Context, *[]label.KeyValue) } ) From b297e948ceb870678add84530ed85adb2e8ec68e Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Wed, 28 Oct 2020 13:47:57 +1100 Subject: [PATCH 05/18] add changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e8c1b4480a5..adc069f439c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `EventOption` and the related `NewEventConfig` function are added to the `go.opentelemetry.io/otel` package to configure Span events. (#1254) - A `TextMapPropagator` and associated `TextMapCarrier` are added to the `go.opentelemetry.io/otel/oteltest` package to test TextMap type propagators and their use. (#1259) +- `WithMetricsProcessor` config option is added to the `go.opentelemetry.io/otel/sdk/push` package to allow providing processors that implement `MetricsProcessor` interface, which is + added to `go.opentelemetry.io/otel/sdk/metric` package ### Changed From 21c833fc16335299c5738075c5e7c3a6376718ee Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Thu, 29 Oct 2020 23:19:49 +1100 Subject: [PATCH 06/18] make the option api accept variadic signature for metrics processors --- sdk/metric/config.go | 4 ++-- sdk/metric/controller/push/config.go | 8 ++++---- sdk/metric/controller/push/push.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 478a01438e4..6795f90ef1f 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -48,12 +48,12 @@ func (o resourceOption) Apply(config *Config) { config.Resource = o.Resource } -func WithMetricsProcessors(processors []MetricsProcessor) Option { +func WithMetricsProcessors(processors ...MetricsProcessor) Option { return metricsProcessorsOption(processors) } type metricsProcessorsOption []MetricsProcessor func (p metricsProcessorsOption) Apply(config *Config) { - config.MetricsProcessors = p + config.MetricsProcessors = append(config.MetricsProcessors, p...) } diff --git a/sdk/metric/controller/push/config.go b/sdk/metric/controller/push/config.go index 83e94e13d88..ccc7d5d04a3 100644 --- a/sdk/metric/controller/push/config.go +++ b/sdk/metric/controller/push/config.go @@ -77,12 +77,12 @@ func (o timeoutOption) Apply(config *Config) { config.Timeout = time.Duration(o) } -func WithMetricsProcessor(p metric.MetricsProcessor) Option { - return metricsProcessorOption{p} +func WithMetricsProcessors(processors ...metric.MetricsProcessor) Option { + return metricsProcessorOption(processors) } -type metricsProcessorOption struct{ metric.MetricsProcessor } +type metricsProcessorOption []metric.MetricsProcessor func (m metricsProcessorOption) Apply(config *Config) { - config.MetricsProcessors = append(config.MetricsProcessors, m.MetricsProcessor) + config.MetricsProcessors = append(config.MetricsProcessors, m...) } diff --git a/sdk/metric/controller/push/push.go b/sdk/metric/controller/push/push.go index 55913715cf7..cb25b0d6969 100644 --- a/sdk/metric/controller/push/push.go +++ b/sdk/metric/controller/push/push.go @@ -62,7 +62,7 @@ func New(checkpointer export.Checkpointer, exporter export.Exporter, opts ...Opt impl := sdk.NewAccumulator( checkpointer, sdk.WithResource(c.Resource), - sdk.WithMetricsProcessors(c.MetricsProcessors), + sdk.WithMetricsProcessors(c.MetricsProcessors...), ) return &Controller{ provider: registry.NewMeterProvider(impl), From 7b115744d1ab50f777a3bb9c56cf7e87a5f9b6ff Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Thu, 29 Oct 2020 23:33:40 +1100 Subject: [PATCH 07/18] split changelog into two separate lines --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 201c0265397..924137dba61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,8 +13,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `EventOption` and the related `NewEventConfig` function are added to the `go.opentelemetry.io/otel` package to configure Span events. (#1254) - A `TextMapPropagator` and associated `TextMapCarrier` are added to the `go.opentelemetry.io/otel/oteltest` package to test TextMap type propagators and their use. (#1259) - `SpanContextFromContext` returns `SpanContext` from context. (#1255) -- `WithMetricsProcessor` config option is added to the `go.opentelemetry.io/otel/sdk/push` package to allow providing processors that implement `MetricsProcessor` interface, which is - added to `go.opentelemetry.io/otel/sdk/metric` package. (#1271) +- `MetricsProcessor` interface is added to `go.opentelemetry.io/otel/sdk/metric` package. (#1271) +- `WithMetricsProcessor` config option is added to `go.opentelemetry.io/otel/sdk/push` package to allow providing custom metrics processors. (#1271) ### Changed From 015599a155fd5a5116b88370015b640625c26fe6 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Fri, 30 Oct 2020 12:30:50 +1100 Subject: [PATCH 08/18] update `MetricsProcessor` documentation --- sdk/metric/sdk.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 6b54232dbef..962116eb4ae 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -144,8 +144,8 @@ type ( observed export.Aggregator } - // MetricsProcessor can be provided as an config option when creating - // a new push controller + // Implementations of MetricsProcessor can be provided as an config option to provide an opportunity + // to re-compose metrics labels based on the context when the metrics are recorded. MetricsProcessor interface { // OnMetricRecorded is execute everytime a metric is recorded by // the sync instrument implementation of an Accumulator, it generally From e6b051e2ac70ebb5bfb8277fa107ffc077f0fd22 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Wed, 11 Nov 2020 21:34:39 +1100 Subject: [PATCH 09/18] use a function type instead of interface and rename it to a more meaningful name --- sdk/metric/config.go | 14 +++++----- sdk/metric/controller/push/config.go | 12 ++++---- sdk/metric/controller/push/push.go | 2 +- sdk/metric/sdk.go | 41 ++++++++++------------------ 4 files changed, 29 insertions(+), 40 deletions(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 6f47469b2b5..e3526cbe5cb 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -16,9 +16,9 @@ package metric // Config contains configuration for an SDK. type Config struct { - // MetricProcessors are executed each time a metric is recorded + // If provided, MetricsLabelsEnricher is executed each time a metric is recorded // by the Accumulator's sync instrument implementation - MetricsProcessors []MetricsProcessor + MetricsLabelsEnricher MetricsLabelsEnricher } // Option is the interface that applies the value to a configuration option. @@ -27,12 +27,12 @@ type Option interface { Apply(*Config) } -func WithMetricsProcessors(processors ...MetricsProcessor) Option { - return metricsProcessorsOption(processors) +func WithMetricsLabelsEnricher(e MetricsLabelsEnricher) Option { + return metricsLabelsEnricherOption(e) } -type metricsProcessorsOption []MetricsProcessor +type metricsLabelsEnricherOption MetricsLabelsEnricher -func (p metricsProcessorsOption) Apply(config *Config) { - config.MetricsProcessors = append(config.MetricsProcessors, p...) +func (e metricsLabelsEnricherOption) Apply(config *Config) { + config.MetricsLabelsEnricher = MetricsLabelsEnricher(e) } diff --git a/sdk/metric/controller/push/config.go b/sdk/metric/controller/push/config.go index 3002750c152..622b129d7d7 100644 --- a/sdk/metric/controller/push/config.go +++ b/sdk/metric/controller/push/config.go @@ -35,7 +35,7 @@ type Config struct { // the controller push period. Timeout time.Duration - MetricsProcessors []metric.MetricsProcessor + MetricsLabelsEnricher metric.MetricsLabelsEnricher } // Option is the interface that applies the value to a configuration option. @@ -77,12 +77,12 @@ func (o timeoutOption) Apply(config *Config) { config.Timeout = time.Duration(o) } -func WithMetricsProcessors(processors ...metric.MetricsProcessor) Option { - return metricsProcessorOption(processors) +func WithMetricsLabelsEnricher(e metric.MetricsLabelsEnricher) Option { + return metricsLabelsEnricherOption(e) } -type metricsProcessorOption []metric.MetricsProcessor +type metricsLabelsEnricherOption metric.MetricsLabelsEnricher -func (m metricsProcessorOption) Apply(config *Config) { - config.MetricsProcessors = append(config.MetricsProcessors, m...) +func (e metricsLabelsEnricherOption) Apply(config *Config) { + config.MetricsLabelsEnricher = metric.MetricsLabelsEnricher(e) } diff --git a/sdk/metric/controller/push/push.go b/sdk/metric/controller/push/push.go index b875b909b4d..2202e927376 100644 --- a/sdk/metric/controller/push/push.go +++ b/sdk/metric/controller/push/push.go @@ -62,7 +62,7 @@ func New(checkpointer export.Checkpointer, exporter export.Exporter, opts ...Opt impl := sdk.NewAccumulator( checkpointer, c.Resource, - sdk.WithMetricsProcessors(c.MetricsProcessors...), + sdk.WithMetricsLabelsEnricher(c.MetricsLabelsEnricher), ) return &Controller{ provider: registry.NewMeterProvider(impl), diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index f3da725c701..9d9ae9eb358 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -68,8 +68,8 @@ type ( // resource is applied to all records in this Accumulator. resource *resource.Resource - // metricsProcessors are applied to all records in this Accumulator - metricsProcessors []MetricsProcessor + // metricsLabelsEnricher is applied to all records in this Accumulator + metricsLabelsEnricher MetricsLabelsEnricher } syncInstrument struct { @@ -126,9 +126,8 @@ type ( } instrument struct { - meter *Accumulator - descriptor otel.Descriptor - metricsProcessors []MetricsProcessor + meter *Accumulator + descriptor otel.Descriptor } asyncInstrument struct { @@ -144,14 +143,9 @@ type ( observed export.Aggregator } - // Implementations of MetricsProcessor can be provided as an config option to provide an opportunity - // to re-compose metrics labels based on the context when the metrics are recorded. - MetricsProcessor interface { - // OnMetricRecorded is execute everytime a metric is recorded by - // the sync instrument implementation of an Accumulator, it generally - // provides ability to correlate the context with the metrics - OnMetricRecorded(context.Context, *[]label.KeyValue) - } + // MetricsLabelsEnricher can be provided as a config option to enrich metrics labels based on + // the context when the metrics are recorded + MetricsLabelsEnricher func(context.Context, []label.KeyValue) ([]label.KeyValue, error) ) var ( @@ -167,10 +161,6 @@ func (inst *instrument) Descriptor() api.Descriptor { return inst.descriptor } -func (inst *instrument) getMetricsProcessors() []MetricsProcessor { - return inst.metricsProcessors -} - func (a *asyncInstrument) Implementation() interface{} { return a } @@ -308,8 +298,8 @@ func (s *syncInstrument) Bind(kvs []label.KeyValue) api.BoundSyncImpl { } func (s *syncInstrument) RecordOne(ctx context.Context, number api.Number, kvs []label.KeyValue) { - for _, processor := range s.getMetricsProcessors() { - processor.OnMetricRecorded(ctx, &kvs) + if s.meter.metricsLabelsEnricher != nil { + kvs, _ = s.meter.metricsLabelsEnricher(ctx, kvs) } h := s.acquireHandle(kvs, nil) defer h.Unbind() @@ -332,10 +322,10 @@ func NewAccumulator(processor export.Processor, resource *resource.Resource, opt } return &Accumulator{ - processor: processor, - asyncInstruments: internal.NewAsyncInstrumentState(), - resource: resource, - metricsProcessors: c.MetricsProcessors, + processor: processor, + asyncInstruments: internal.NewAsyncInstrumentState(), + resource: resource, + metricsLabelsEnricher: c.MetricsLabelsEnricher, } } @@ -343,9 +333,8 @@ func NewAccumulator(processor export.Processor, resource *resource.Resource, opt func (m *Accumulator) NewSyncInstrument(descriptor api.Descriptor) (api.SyncImpl, error) { return &syncInstrument{ instrument: instrument{ - descriptor: descriptor, - meter: m, - metricsProcessors: m.metricsProcessors, + descriptor: descriptor, + meter: m, }, }, nil } From afa378d808f608fb140c0da5f5e5ef308e99c0e8 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Wed, 11 Nov 2020 21:48:14 +1100 Subject: [PATCH 10/18] update changelog --- CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b57917fcb8d..f6fd49870b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,8 +13,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `EventOption` and the related `NewEventConfig` function are added to the `go.opentelemetry.io/otel` package to configure Span events. (#1254) - A `TextMapPropagator` and associated `TextMapCarrier` are added to the `go.opentelemetry.io/otel/oteltest` package to test TextMap type propagators and their use. (#1259) - `SpanContextFromContext` returns `SpanContext` from context. (#1255) -- `MetricsProcessor` interface is added to `go.opentelemetry.io/otel/sdk/metric` package. (#1271) -- `WithMetricsProcessor` config option is added to `go.opentelemetry.io/otel/sdk/push` package to allow providing custom metrics processors. (#1271) +- `MetricsLabelsEnricher` type is added to `go.opentelemetry.io/otel/sdk/metric` package. (#1271) +- `WithMetricsLabelsEnricher` config option is added to `go.opentelemetry.io/otel/sdk/push` package to allow providing a function to enrich metrics labels based on context. (#1271) ### Changed From 8ea16168d9557d7bd14e332dbfcd7b54c6d6d6a7 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Wed, 11 Nov 2020 22:31:40 +1100 Subject: [PATCH 11/18] add metrics labels enricher to pull controller --- sdk/metric/controller/pull/config.go | 15 +++++++++++++++ sdk/metric/controller/pull/pull.go | 1 + sdk/metric/controller/push/config.go | 2 ++ 3 files changed, 18 insertions(+) diff --git a/sdk/metric/controller/pull/config.go b/sdk/metric/controller/pull/config.go index 363285bf8c7..bddddc90f57 100644 --- a/sdk/metric/controller/pull/config.go +++ b/sdk/metric/controller/pull/config.go @@ -15,6 +15,7 @@ package pull // import "go.opentelemetry.io/otel/sdk/metric/controller/pull" import ( + "go.opentelemetry.io/otel/sdk/metric" "time" "go.opentelemetry.io/otel/sdk/resource" @@ -33,6 +34,10 @@ type Config struct { // If the period is zero, caching of the result is disabled. // The default value is 10 seconds. CachePeriod time.Duration + + // MetricsLabelsEnricher is a function that enriches metrics labels based + // on kvs stored in context when metrics are recorded. + MetricsLabelsEnricher metric.MetricsLabelsEnricher } // Option is the interface that applies the value to a configuration option. @@ -62,3 +67,13 @@ type cachePeriodOption time.Duration func (o cachePeriodOption) Apply(config *Config) { config.CachePeriod = time.Duration(o) } + +func WithMetricsLabelsEnricher(e metric.MetricsLabelsEnricher) Option { + return metricsLabelsEnricherOption(e) +} + +type metricsLabelsEnricherOption metric.MetricsLabelsEnricher + +func (e metricsLabelsEnricherOption) Apply(config *Config) { + config.MetricsLabelsEnricher = metric.MetricsLabelsEnricher(e) +} diff --git a/sdk/metric/controller/pull/pull.go b/sdk/metric/controller/pull/pull.go index 8412075ff0e..5d48062e806 100644 --- a/sdk/metric/controller/pull/pull.go +++ b/sdk/metric/controller/pull/pull.go @@ -61,6 +61,7 @@ func New(checkpointer export.Checkpointer, options ...Option) *Controller { accum := sdk.NewAccumulator( checkpointer, config.Resource, + sdk.WithMetricsLabelsEnricher(config.MetricsLabelsEnricher), ) return &Controller{ accumulator: accum, diff --git a/sdk/metric/controller/push/config.go b/sdk/metric/controller/push/config.go index 622b129d7d7..5abb694b366 100644 --- a/sdk/metric/controller/push/config.go +++ b/sdk/metric/controller/push/config.go @@ -35,6 +35,8 @@ type Config struct { // the controller push period. Timeout time.Duration + // MetricsLabelsEnricher is a function that enriches metrics labels based + // on kvs stored in context when metrics are recorded. MetricsLabelsEnricher metric.MetricsLabelsEnricher } From f73d433769365df112e57d30ad6ed18eb9709c12 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Wed, 11 Nov 2020 22:32:01 +1100 Subject: [PATCH 12/18] add test for pull with metrics labels enricher --- sdk/metric/controller/pull/pull_test.go | 32 +++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/sdk/metric/controller/pull/pull_test.go b/sdk/metric/controller/pull/pull_test.go index a9502d673a6..1166d952faa 100644 --- a/sdk/metric/controller/pull/pull_test.go +++ b/sdk/metric/controller/pull/pull_test.go @@ -117,3 +117,35 @@ func TestPullWithCache(t *testing.T) { }, records.Map()) } + +func TestPullWithMetricsLabelEnricher(t *testing.T) { + metricsLabelsEnricher := func(ctx context.Context, kvs []label.KeyValue) ([]label.KeyValue, error) { + baggage := otel.Baggage(ctx) + kvs = append(baggage.ToSlice(), kvs...) + return kvs, nil + } + + puller := pull.New( + basic.New( + selector.NewWithExactDistribution(), + export.CumulativeExporter, + basic.WithMemory(true), + ), + pull.WithCachePeriod(0), + pull.WithMetricsLabelsEnricher(metricsLabelsEnricher), + ) + + ctx := otel.ContextWithBaggageValues(context.Background(), label.String("A", "B")) + meter := puller.MeterProvider().Meter("withLabelEnricher") + counter := otel.Must(meter).NewInt64Counter("counter.sum") + + counter.Add(ctx, 10) + + require.NoError(t, puller.Collect(context.Background())) + records := processortest.NewOutput(label.DefaultEncoder()) + require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) + + require.EqualValues(t, map[string]float64{ + "counter.sum/A=B/": 10, + }, records.Map()) +} From 208a049e957792e3c9ce6830cf00ca9754d856b4 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Wed, 11 Nov 2020 22:42:43 +1100 Subject: [PATCH 13/18] add test for push with metrics labels enricher --- sdk/metric/controller/push/push_test.go | 36 +++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/sdk/metric/controller/push/push_test.go b/sdk/metric/controller/push/push_test.go index ddb325a7bf3..af1466e2c72 100644 --- a/sdk/metric/controller/push/push_test.go +++ b/sdk/metric/controller/push/push_test.go @@ -223,3 +223,39 @@ func TestPushExportError(t *testing.T) { }) } } + +func TestWithMetricsLabelsEnricher(t *testing.T) { + exporter := newExporter() + checkpointer := newCheckpointer() + metricsLabelsEnricher := func(ctx context.Context, kvs []label.KeyValue) ([]label.KeyValue, error) { + baggage := otel.Baggage(ctx) + kvs = append(baggage.ToSlice(), kvs...) + return kvs, nil + } + p := push.New( + checkpointer, + exporter, + push.WithPeriod(time.Second), + push.WithMetricsLabelsEnricher(metricsLabelsEnricher), + ) + meter := p.MeterProvider().Meter("name") + + mock := controllertest.NewMockClock() + p.SetClock(mock) + + counter := otel.Must(meter).NewInt64Counter("counter.sum") + + p.Start() + + ctx := otel.ContextWithBaggageValues(context.Background(), label.String("A", "B")) + counter.Add(ctx, 1) + + require.EqualValues(t, map[string]float64{}, exporter.Values()) + + mock.Add(time.Second) + runtime.Gosched() + + require.EqualValues(t, map[string]float64{ + "counter.sum/A=B/": 1, + }, exporter.Values()) +} From 4f91f17cbd4d68391c14aaf5875531b5d400ca1c Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Wed, 11 Nov 2020 22:52:20 +1100 Subject: [PATCH 14/18] sort import order --- sdk/metric/controller/pull/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/controller/pull/config.go b/sdk/metric/controller/pull/config.go index bddddc90f57..826c42b9858 100644 --- a/sdk/metric/controller/pull/config.go +++ b/sdk/metric/controller/pull/config.go @@ -15,9 +15,9 @@ package pull // import "go.opentelemetry.io/otel/sdk/metric/controller/pull" import ( - "go.opentelemetry.io/otel/sdk/metric" "time" + "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" ) From 4739df6a2ded2420f2a2735ad0cd74e9f2f7d3df Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Wed, 11 Nov 2020 22:57:41 +1100 Subject: [PATCH 15/18] update changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f6fd49870b0..f609315f3ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - A `TextMapPropagator` and associated `TextMapCarrier` are added to the `go.opentelemetry.io/otel/oteltest` package to test TextMap type propagators and their use. (#1259) - `SpanContextFromContext` returns `SpanContext` from context. (#1255) - `MetricsLabelsEnricher` type is added to `go.opentelemetry.io/otel/sdk/metric` package. (#1271) -- `WithMetricsLabelsEnricher` config option is added to `go.opentelemetry.io/otel/sdk/push` package to allow providing a function to enrich metrics labels based on context. (#1271) +- `WithMetricsLabelsEnricher` config option is added to `go.opentelemetry.io/otel/sdk/push` and `go.opentelemetry.io/otel/sdk/pull` packages to allow providing a function to enrich metrics labels based on context. (#1271) ### Changed From 85faa2eeec8fc481dcbd324e0d0873e77291b44c Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Wed, 11 Nov 2020 23:21:30 +1100 Subject: [PATCH 16/18] handle error returned from the enricher function --- sdk/metric/sdk.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 9d9ae9eb358..57384cbf3be 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -299,8 +299,14 @@ func (s *syncInstrument) Bind(kvs []label.KeyValue) api.BoundSyncImpl { func (s *syncInstrument) RecordOne(ctx context.Context, number api.Number, kvs []label.KeyValue) { if s.meter.metricsLabelsEnricher != nil { - kvs, _ = s.meter.metricsLabelsEnricher(ctx, kvs) + var err error + kvs, err = s.meter.metricsLabelsEnricher(ctx, kvs) + if err != nil { + global.Handle(err) + return + } } + h := s.acquireHandle(kvs, nil) defer h.Unbind() h.RecordOne(ctx, number) From b3caafa55aff6b8b257833b57fd6c23547cd7b8b Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Mon, 14 Dec 2020 13:58:15 +1100 Subject: [PATCH 17/18] clean up --- sdk/metric/sdk.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 7a7b0dcc587..1d4466afccc 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -299,7 +299,7 @@ func (s *syncInstrument) Bind(kvs []label.KeyValue) metric.BoundSyncImpl { func (s *syncInstrument) RecordOne(ctx context.Context, num number.Number, kvs []label.KeyValue) { - if s.meter.metricsLabelsEnricher != nil { + if s.meter.metricsLabelsEnricher != nil { var err error kvs, err = s.meter.metricsLabelsEnricher(ctx, kvs) if err != nil { From 6dab82e0bedd1b80a9ad17e2e33cbef4457df248 Mon Sep 17 00:00:00 2001 From: Hanshuo Tan Date: Mon, 14 Dec 2020 15:13:26 +1100 Subject: [PATCH 18/18] fix tests --- sdk/metric/controller/pull/pull_test.go | 12 +++++++----- sdk/metric/controller/push/push_test.go | 8 +++++--- sdk/metric/sdk.go | 2 +- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/sdk/metric/controller/pull/pull_test.go b/sdk/metric/controller/pull/pull_test.go index 153e63e639d..2450dae5f1d 100644 --- a/sdk/metric/controller/pull/pull_test.go +++ b/sdk/metric/controller/pull/pull_test.go @@ -20,6 +20,8 @@ import ( "testing" "time" + "go.opentelemetry.io/otel/baggage" + "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/label" @@ -120,7 +122,7 @@ func TestPullWithCache(t *testing.T) { func TestPullWithMetricsLabelEnricher(t *testing.T) { metricsLabelsEnricher := func(ctx context.Context, kvs []label.KeyValue) ([]label.KeyValue, error) { - baggage := otel.Baggage(ctx) + baggage := baggage.Set(ctx) kvs = append(baggage.ToSlice(), kvs...) return kvs, nil } @@ -128,22 +130,22 @@ func TestPullWithMetricsLabelEnricher(t *testing.T) { puller := pull.New( basic.New( selector.NewWithExactDistribution(), - export.CumulativeExporter, + export.CumulativeExportKindSelector(), basic.WithMemory(true), ), pull.WithCachePeriod(0), pull.WithMetricsLabelsEnricher(metricsLabelsEnricher), ) - ctx := otel.ContextWithBaggageValues(context.Background(), label.String("A", "B")) + ctx := baggage.ContextWithValues(context.Background(), label.String("A", "B")) meter := puller.MeterProvider().Meter("withLabelEnricher") - counter := otel.Must(meter).NewInt64Counter("counter.sum") + counter := metric.Must(meter).NewInt64Counter("counter.sum") counter.Add(ctx, 10) require.NoError(t, puller.Collect(context.Background())) records := processortest.NewOutput(label.DefaultEncoder()) - require.NoError(t, puller.ForEach(export.CumulativeExporter, records.AddRecord)) + require.NoError(t, puller.ForEach(export.CumulativeExportKindSelector(), records.AddRecord)) require.EqualValues(t, map[string]float64{ "counter.sum/A=B/": 10, diff --git a/sdk/metric/controller/push/push_test.go b/sdk/metric/controller/push/push_test.go index 8b8ec7478a2..961d2376487 100644 --- a/sdk/metric/controller/push/push_test.go +++ b/sdk/metric/controller/push/push_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "go.opentelemetry.io/otel/baggage" + "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" @@ -228,7 +230,7 @@ func TestWithMetricsLabelsEnricher(t *testing.T) { exporter := newExporter() checkpointer := newCheckpointer() metricsLabelsEnricher := func(ctx context.Context, kvs []label.KeyValue) ([]label.KeyValue, error) { - baggage := otel.Baggage(ctx) + baggage := baggage.Set(ctx) kvs = append(baggage.ToSlice(), kvs...) return kvs, nil } @@ -243,11 +245,11 @@ func TestWithMetricsLabelsEnricher(t *testing.T) { mock := controllertest.NewMockClock() p.SetClock(mock) - counter := otel.Must(meter).NewInt64Counter("counter.sum") + counter := metric.Must(meter).NewInt64Counter("counter.sum") p.Start() - ctx := otel.ContextWithBaggageValues(context.Background(), label.String("A", "B")) + ctx := baggage.ContextWithValues(context.Background(), label.String("A", "B")) counter.Add(ctx, 1) require.EqualValues(t, map[string]float64{}, exporter.Values()) diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index dffc7b8ee25..efab61fcbe9 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -299,7 +299,7 @@ func (s *syncInstrument) RecordOne(ctx context.Context, num number.Number, kvs [ var err error kvs, err = s.meter.metricsLabelsEnricher(ctx, kvs) if err != nil { - global.Handle(err) + otel.Handle(err) return } }