diff --git a/CHANGELOG.md b/CHANGELOG.md index 74a3b9321f06..5a98d3f15020 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,12 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The metric portion of the OpenCensus bridge (`go.opentelemetry.io/otel/bridge/opencensus`) has been reintroduced. (#3192) - The OpenCensus bridge example (`go.opentelemetry.io/otel/example/opencensus`) has been reintroduced. (#3206) +- metric.NewMeterProvider (`go.opentelemetry.io/otel/sdk/metric`) now supports a WithProducer option for installing "bridges" to external metric libraries. (#3093) +- The OpenCensus bridge (`go.opentelemetry.io/otel/example/opencensus`) can now instantiate a metric.Producer with NewProducer(). (#3093) + +### Deprecated + +- NewMetricExporter in the OpenCensus bridge (`go.opentelemetry.io/otel/example/opencensus`) is depreacted in favor of NewProducer. (#3093) ### Fixed diff --git a/bridge/opencensus/go.mod b/bridge/opencensus/go.mod index b36e8a4a1ef3..e7e8c286d99d 100644 --- a/bridge/opencensus/go.mod +++ b/bridge/opencensus/go.mod @@ -3,6 +3,7 @@ module go.opentelemetry.io/otel/bridge/opencensus go 1.18 require ( + github.com/stretchr/testify v1.7.1 go.opencensus.io v0.23.0 go.opentelemetry.io/otel v1.10.0 go.opentelemetry.io/otel/metric v0.32.1 @@ -12,10 +13,13 @@ require ( ) require ( + github.com/davecgh/go-spew v1.1.0 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect ) replace go.opentelemetry.io/otel => ../.. diff --git a/bridge/opencensus/go.sum b/bridge/opencensus/go.sum index c7ddc1e13074..a78d785aa87a 100644 --- a/bridge/opencensus/go.sum +++ b/bridge/opencensus/go.sum @@ -41,6 +41,7 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -92,6 +93,7 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/bridge/opencensus/metric.go b/bridge/opencensus/metric.go index d9c44545c659..a8f8a570e6d3 100644 --- a/bridge/opencensus/metric.go +++ b/bridge/opencensus/metric.go @@ -22,6 +22,7 @@ import ( ocmetricdata "go.opencensus.io/metric/metricdata" "go.opencensus.io/metric/metricexport" + "go.opencensus.io/metric/metricproducer" internal "go.opentelemetry.io/otel/bridge/opencensus/internal/ocmetric" "go.opentelemetry.io/otel/sdk/instrumentation" @@ -30,6 +31,42 @@ import ( "go.opentelemetry.io/otel/sdk/resource" ) +const ( + // instrumentationName is the name of this instrumentation package. + instrumentationName = "go.opentelemetry.io/otel/bridge/opencensus" +) + +// producer is a producer which provides metrics collected using OpenCensus +// instrumentation. +type producer struct { + scope instrumentation.Scope + manager *metricproducer.Manager +} + +// NewProducer returns a producer which can be invoked to collect metrics. +func NewProducer() metric.Producer { + return &producer{ + scope: instrumentation.Scope{Name: instrumentationName, Version: SemVersion()}, + manager: metricproducer.GlobalManager(), + } +} + +// Produce gathers all metrics from the OpenCensus in-memory state. +func (p *producer) Produce(context.Context) ([]metricdata.Metrics, error) { + producers := p.manager.GetAll() + data := []*ocmetricdata.Metric{} + for _, ocProducer := range producers { + data = append(data, ocProducer.Read()...) + } + return internal.ConvertMetrics(data) +} + +// InstrumentationScope returns the instrumentation scope for the OpenCensus +// metrics bridge. +func (p *producer) InstrumentationScope() instrumentation.Scope { + return p.scope +} + // exporter implements the OpenCensus metric Exporter interface using an // OpenTelemetry base exporter. type exporter struct { @@ -39,6 +76,7 @@ type exporter struct { // NewMetricExporter returns an OpenCensus exporter that exports to an // OpenTelemetry exporter. +// Deprecated: Pass metric.WithProducer(opencensus.NewProducer()) to NewMeterProvider instead. func NewMetricExporter(base metric.Exporter, res *resource.Resource) metricexport.Exporter { return &exporter{base: base} } diff --git a/bridge/opencensus/metric_test.go b/bridge/opencensus/metric_test.go new file mode 100644 index 000000000000..8b00c86a05c6 --- /dev/null +++ b/bridge/opencensus/metric_test.go @@ -0,0 +1,110 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package opencensus // import "go.opentelemetry.io/otel/bridge/opencensus" + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + ocmetricdata "go.opencensus.io/metric/metricdata" + "go.opencensus.io/metric/metricproducer" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/unit" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +func TestProducePartialError(t *testing.T) { + badProducer := &fakeOCProducer{ + metrics: []*ocmetricdata.Metric{ + { + Descriptor: ocmetricdata.Descriptor{ + Name: "foo.com/bad-point", + Description: "a bad type", + Unit: ocmetricdata.UnitDimensionless, + Type: ocmetricdata.TypeGaugeDistribution, + }, + }, + }, + } + metricproducer.GlobalManager().AddProducer(badProducer) + defer metricproducer.GlobalManager().DeleteProducer(badProducer) + + end := time.Now() + goodProducer := &fakeOCProducer{ + metrics: []*ocmetricdata.Metric{ + { + Descriptor: ocmetricdata.Descriptor{ + Name: "foo.com/gauge-a", + Description: "an int testing gauge", + Unit: ocmetricdata.UnitBytes, + Type: ocmetricdata.TypeGaugeInt64, + }, + TimeSeries: []*ocmetricdata.TimeSeries{ + { + Points: []ocmetricdata.Point{ + ocmetricdata.NewInt64Point(end, 123), + }, + }, + }, + }, + }, + } + metricproducer.GlobalManager().AddProducer(goodProducer) + defer metricproducer.GlobalManager().DeleteProducer(goodProducer) + + otelProducer := NewProducer() + out, err := otelProducer.Produce(context.Background()) + assert.NotNil(t, err) + expected := []metricdata.Metrics{ + { + Name: "foo.com/gauge-a", + Description: "an int testing gauge", + Unit: unit.Bytes, + Data: metricdata.Gauge[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(), + Time: end, + Value: 123, + }, + }, + }, + }, + } + assert.Equal(t, len(out), len(expected)) + for i := range out { + metricdatatest.AssertEqual[metricdata.Metrics](t, out[i], expected[i]) + } +} + +func TestInstrumentationScope(t *testing.T) { + assert.Equal(t, NewProducer().InstrumentationScope(), instrumentation.Scope{Name: instrumentationName, Version: SemVersion()}) +} + +type fakeOCProducer struct { + metrics []*ocmetricdata.Metric +} + +func (f *fakeOCProducer) Read() []*ocmetricdata.Metric { + return f.metrics +} diff --git a/bridge/opencensus/version.go b/bridge/opencensus/version.go new file mode 100644 index 000000000000..8386c15f0111 --- /dev/null +++ b/bridge/opencensus/version.go @@ -0,0 +1,25 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package opencensus // import "go.opentelemetry.io/otel/bridge/opencensus" + +// Version is the current release version of the opencensus bridge. +func Version() string { + return "0.32.1" +} + +// SemVersion is the semantic version to be supplied to tracer/meter creation. +func SemVersion() string { + return "semver:" + Version() +} diff --git a/example/opencensus/go.mod b/example/opencensus/go.mod index 82bb202b4d6c..211d9865874b 100644 --- a/example/opencensus/go.mod +++ b/example/opencensus/go.mod @@ -14,6 +14,7 @@ require ( go.opentelemetry.io/otel/bridge/opencensus v0.32.1 go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.32.1 go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.9.0 + go.opentelemetry.io/otel/metric v0.32.1 go.opentelemetry.io/otel/sdk v1.10.0 go.opentelemetry.io/otel/sdk/metric v0.32.1 ) @@ -22,7 +23,6 @@ require ( github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect - go.opentelemetry.io/otel/metric v0.32.1 // indirect go.opentelemetry.io/otel/trace v1.10.0 // indirect golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7 // indirect ) diff --git a/example/opencensus/main.go b/example/opencensus/main.go index c35c47cac096..5c25a3c8a1e7 100644 --- a/example/opencensus/main.go +++ b/example/opencensus/main.go @@ -22,7 +22,6 @@ import ( ocmetric "go.opencensus.io/metric" "go.opencensus.io/metric/metricdata" - "go.opencensus.io/metric/metricexport" "go.opencensus.io/metric/metricproducer" "go.opencensus.io/stats" "go.opencensus.io/stats/view" @@ -30,9 +29,11 @@ import ( octrace "go.opencensus.io/trace" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/bridge/opencensus" "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" + "go.opentelemetry.io/otel/metric/instrument" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -104,19 +105,30 @@ func tracing(otExporter sdktrace.SpanExporter) { // exporter to send metrics to the exporter by using either an OpenCensus // registry or an OpenCensus view. func monitoring(otExporter metric.Exporter) error { - log.Println("Using the OpenTelemetry stdoutmetric exporter to export OpenCensus metrics. This allows routing telemetry from both OpenTelemetry and OpenCensus to a single exporter.") - ocExporter := opencensus.NewMetricExporter(otExporter, resource.Default()) - intervalReader, err := metricexport.NewIntervalReader(&metricexport.Reader{}, ocExporter) - if err != nil { - return fmt.Errorf("failed to create interval reader: %w", err) - } - intervalReader.ReportingInterval = 10 * time.Second - log.Println("Emitting metrics using OpenCensus APIs. These should be printed out using the OpenTelemetry stdoutmetric exporter.") - err = intervalReader.Start() + ctx := context.Background() + log.Println("Using the OpenCensus bridge to export OpenCensus and OpenTelemetry metrics to a single OpenTelemetry exporter.") + + // Register the exporter with an SDK via a periodic reader. + provider := metric.NewMeterProvider( + metric.WithResource(resource.Default()), + metric.WithReader(metric.NewPeriodicReader(otExporter)), + // Add the OpenCensus producer to the SDK. This causes metrics from + // OpenCensus to be included in the batch of metrics sent to our exporter. + metric.WithProducer(opencensus.NewProducer()), + ) + + log.Println("Emitting a 'foo' metric using OpenTelemetry APIs, which is emitted with an OpenTelemetry stdout exporter") + meter := provider.Meter("github.com/open-telemetry/opentelemetry-go/example/opencensus") + counter, err := meter.SyncFloat64().Counter("foo", instrument.WithDescription("a simple counter")) if err != nil { - return fmt.Errorf("failed to start interval reader: %w", err) + log.Fatal(err) } - defer intervalReader.Stop() + counter.Add(ctx, 5, []attribute.KeyValue{ + attribute.Key("A").String("B"), + attribute.Key("C").String("D"), + }...) + + log.Println("Emitting 'test_gauge' and 'test_count' metrics using OpenCensus APIs. These are printed out using the same OpenTelemetry stdoutmetric exporter.") log.Println("Registering a gauge metric using an OpenCensus registry.") r := ocmetric.NewRegistry() diff --git a/sdk/metric/config.go b/sdk/metric/config.go index a12533345057..9c404931c6be 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -19,14 +19,16 @@ import ( "fmt" "sync" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/view" "go.opentelemetry.io/otel/sdk/resource" ) // config contains configuration options for a MeterProvider. type config struct { - res *resource.Resource - readers map[Reader][]view.View + res *resource.Resource + readers map[Reader][]view.View + producers map[instrumentation.Scope]Producer } // readerSignals returns a force-flush and shutdown function for a @@ -79,7 +81,7 @@ func unifyShutdown(funcs []func(context.Context) error) func(context.Context) er // newConfig returns a config configured with options. func newConfig(options []Option) config { - conf := config{res: resource.Default()} + conf := config{res: resource.Default(), producers: make(map[instrumentation.Scope]Producer)} for _, o := range options { conf = o.apply(conf) } @@ -134,3 +136,14 @@ func WithReader(r Reader, views ...view.View) Option { return cfg }) } + +// WithProducer adds a Producer as a source of aggregated metric data for the +// MeterProvider. When Collect is invoked from Readers, metric data from +// Producers will be integrated into the returned batch of metrics. +// This is commonly used as way to "bridge" external metric libraries. +func WithProducer(p Producer) Option { + return optionFunc(func(conf config) config { + conf.producers[p.InstrumentationScope()] = p + return conf + }) +} diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 6783aef5d0fa..014bf8831e83 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -64,6 +64,7 @@ type pipeline struct { sync.Mutex aggregations map[instrumentation.Scope]map[instrumentKey]instrumentValue callbacks []func(context.Context) + producers map[instrumentation.Scope]Producer } var errAlreadyRegistered = errors.New("instrument already registered") @@ -149,6 +150,21 @@ func (p *pipeline) produce(ctx context.Context) (metricdata.ResourceMetrics, err } } + for scope, prod := range p.producers { + metrics, err := prod.Produce(ctx) + if err != nil { + // TODO: should we use otel.Handle(err), and continue? + // This would prevent a bridge from blocking SDK metric collection. + return metricdata.ResourceMetrics{}, err + } + if len(metrics) > 0 { + sm = append(sm, metricdata.ScopeMetrics{ + Scope: scope, + Metrics: metrics, + }) + } + } + return metricdata.ResourceMetrics{ Resource: p.resource, ScopeMetrics: sm, @@ -162,10 +178,13 @@ type pipelineRegistry struct { pipelines map[Reader]*pipeline } -func newPipelineRegistries(res *resource.Resource, views map[Reader][]view.View) *pipelineRegistry { +func newPipelineRegistries(res *resource.Resource, views map[Reader][]view.View, producers map[instrumentation.Scope]Producer) *pipelineRegistry { pipelines := map[Reader]*pipeline{} for rdr := range views { - pipe := &pipeline{resource: res} + pipe := &pipeline{ + resource: res, + producers: producers, + } rdr.register(pipe) pipelines[rdr] = pipe } diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 52183360fd38..38b6bac841b4 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -61,7 +61,7 @@ func TestEmptyPipeline(t *testing.T) { } func TestNewPipeline(t *testing.T) { - pipe := newPipeline(nil) + pipe := newPipeline(nil, nil) output, err := pipe.produce(context.Background()) require.NoError(t, err) @@ -158,7 +158,7 @@ func TestPipelineDuplicateRegistration(t *testing.T) { } for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - pipe := newPipeline(nil) + pipe := newPipeline(nil, nil) err := pipe.addAggregator(instrumentation.Scope{}, "name", "desc", unit.Dimensionless, testSumAggregator{}) require.NoError(t, err) @@ -177,7 +177,7 @@ func TestPipelineDuplicateRegistration(t *testing.T) { func TestPipelineUsesResource(t *testing.T) { res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource")) - pipe := newPipeline(res) + pipe := newPipeline(res, nil) output, err := pipe.produce(context.Background()) assert.NoError(t, err) @@ -185,7 +185,7 @@ func TestPipelineUsesResource(t *testing.T) { } func TestPipelineConcurrency(t *testing.T) { - pipe := newPipeline(nil) + pipe := newPipeline(nil, nil) ctx := context.Background() var wg sync.WaitGroup diff --git a/sdk/metric/producer.go b/sdk/metric/producer.go new file mode 100644 index 000000000000..3602c587e38c --- /dev/null +++ b/sdk/metric/producer.go @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build go1.18 +// +build go1.18 + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "context" + + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// Producer can produce metrics from non-otel instrumentation. +type Producer interface { + // Produce returns aggregated metrics from a single collection. + Produce(context.Context) ([]metricdata.Metrics, error) + // InstrumentationScope returns the instrumentation scope which shoud be + // associated with the produced metrics. + InstrumentationScope() instrumentation.Scope +} diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index d3a940bce584..30aca7d33f26 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -48,7 +48,7 @@ func NewMeterProvider(options ...Option) *MeterProvider { flush, sdown := conf.readerSignals() - registry := newPipelineRegistries(conf.res, conf.readers) + registry := newPipelineRegistries(conf.res, conf.readers, conf.producers) return &MeterProvider{ res: conf.res,