From ba1e545cca04d01cb23561e57b9fd2f290eaa6de Mon Sep 17 00:00:00 2001 From: Diego Hurtado Date: Mon, 2 May 2022 18:07:11 -0600 Subject: [PATCH] Refactor metric format Fixes #2646 --- .../proto/grpc/_metric_exporter/__init__.py | 196 +++++++----- .../metrics/test_otlp_metrics_exporter.py | 285 ++++++++++++++++-- .../exporter/prometheus/__init__.py | 24 +- .../sdk/_metrics/_internal/export/__init__.py | 29 +- .../sdk/_metrics/_internal/metric_reader.py | 68 ++++- .../sdk/_metrics/metric_reader.py | 1 + .../test_disable_default_views.py | 7 +- .../metrics/test_in_memory_metric_reader.py | 32 +- .../tests/metrics/test_metrics.py | 16 +- .../test_periodic_exporting_metric_reader.py | 3 +- .../src/opentelemetry/test/metrictestutil.py | 18 ++ 11 files changed, 549 insertions(+), 130 deletions(-) diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py index 1ad5cc4d808..8b17272f16e 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/src/opentelemetry/exporter/otlp/proto/grpc/_metric_exporter/__init__.py @@ -11,9 +11,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from logging import getLogger from os import environ -from typing import Optional, Sequence +from typing import Optional, Sequence, Dict, Iterable from grpc import ChannelCredentials, Compression from opentelemetry.exporter.otlp.proto.grpc.exporter import ( OTLPExporterMixin, @@ -41,8 +41,13 @@ MetricExporter, MetricExportResult, ) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import ( + InstrumentationScope as SDKInstrumentationScope, +) +from opentelemetry.sdk._metrics.metric_reader import MetricReaderMetric -logger = logging.getLogger(__name__) +_logger = getLogger(__name__) class OTLPMetricExporter( @@ -79,87 +84,113 @@ def __init__( ) def _translate_data( - self, data: Sequence[Metric] + self, + data: Dict[ + Resource, + Dict[SDKInstrumentationScope, Iterable[MetricReaderMetric]], + ], ) -> ExportMetricsServiceRequest: - sdk_resource_scope_metrics = {} - - for metric in data: - resource = metric.resource - scope_map = sdk_resource_scope_metrics.get(resource, {}) - if not scope_map: - sdk_resource_scope_metrics[resource] = scope_map - - scope_metrics = scope_map.get(metric.instrumentation_scope) - - if not scope_metrics: - if metric.instrumentation_scope is not None: - scope_map[metric.instrumentation_scope] = pb2.ScopeMetrics( - scope=InstrumentationScope( - name=metric.instrumentation_scope.name, - version=metric.instrumentation_scope.version, + + sdk_resource_scope_metrics: Dict[ + Resource, Dict[SDKInstrumentationScope, pb2.ScopeMetrics] + ] = {} + + for resource, instrumentation_scope_metrics in data.items(): + + if resource not in sdk_resource_scope_metrics: + sdk_resource_scope_metrics[resource] = {} + + for ( + instrumentation_scope, + metrics, + ) in instrumentation_scope_metrics.items(): + if instrumentation_scope not in sdk_resource_scope_metrics: + if instrumentation_scope is None: + sdk_resource_scope_metrics[resource][ + instrumentation_scope + ] = pb2.ScopeMetrics() + else: + sdk_resource_scope_metrics[resource][ + instrumentation_scope + ] = pb2.ScopeMetrics( + scope=InstrumentationScope( + name=instrumentation_scope.name, + version=instrumentation_scope.version, + ) ) + scope_metrics = sdk_resource_scope_metrics[resource][ + instrumentation_scope + ] + + for metric in metrics: + + pbmetric = pb2.Metric( + name=metric.name, + description=metric.description, + unit=metric.unit, ) - else: - scope_map[ - metric.instrumentation_scope - ] = pb2.ScopeMetrics() + if isinstance(metric.point, Gauge): + pt = pb2.NumberDataPoint( + attributes=self._translate_attributes( + metric.attributes + ), + time_unix_nano=metric.point.time_unix_nano, + ) + if isinstance(metric.point.value, int): + pt.as_int = metric.point.value + else: + pt.as_double = metric.point.value + pbmetric.gauge.data_points.append(pt) + elif isinstance(metric.point, Histogram): + pt = pb2.HistogramDataPoint( + attributes=self._translate_attributes( + metric.attributes + ), + time_unix_nano=metric.point.time_unix_nano, + start_time_unix_nano=( + metric.point.start_time_unix_nano + ), + count=sum(metric.point.bucket_counts), + sum=metric.point.sum, + bucket_counts=metric.point.bucket_counts, + explicit_bounds=metric.point.explicit_bounds, + ) + pbmetric.histogram.aggregation_temporality = ( + metric.point.aggregation_temporality + ) + pbmetric.histogram.data_points.append(pt) + elif isinstance(metric.point, Sum): + pt = pb2.NumberDataPoint( + attributes=self._translate_attributes( + metric.attributes + ), + start_time_unix_nano=( + metric.point.start_time_unix_nano + ), + time_unix_nano=metric.point.time_unix_nano, + ) + if isinstance(metric.point.value, int): + pt.as_int = metric.point.value + else: + pt.as_double = metric.point.value + # note that because sum is a message type, the fields + # must be set individually rather than instantiating a + # pb2.Sum and setting it once + pbmetric.sum.aggregation_temporality = ( + metric.point.aggregation_temporality + ) + pbmetric.sum.is_monotonic = metric.point.is_monotonic + pbmetric.sum.data_points.append(pt) + else: + _logger.warn( + "unsupported datapoint type %s", metric.point + ) + continue - scope_metrics = scope_map.get(metric.instrumentation_scope) + scope_metrics.metrics.append( + pbmetric, + ) - pbmetric = pb2.Metric( - name=metric.name, - description=metric.description, - unit=metric.unit, - ) - if isinstance(metric.point, Gauge): - pt = pb2.NumberDataPoint( - attributes=self._translate_attributes(metric.attributes), - time_unix_nano=metric.point.time_unix_nano, - ) - if isinstance(metric.point.value, int): - pt.as_int = metric.point.value - else: - pt.as_double = metric.point.value - pbmetric.gauge.data_points.append(pt) - elif isinstance(metric.point, Histogram): - pt = pb2.HistogramDataPoint( - attributes=self._translate_attributes(metric.attributes), - time_unix_nano=metric.point.time_unix_nano, - start_time_unix_nano=metric.point.start_time_unix_nano, - count=sum(metric.point.bucket_counts), - sum=metric.point.sum, - bucket_counts=metric.point.bucket_counts, - explicit_bounds=metric.point.explicit_bounds, - ) - pbmetric.histogram.aggregation_temporality = ( - metric.point.aggregation_temporality - ) - pbmetric.histogram.data_points.append(pt) - elif isinstance(metric.point, Sum): - pt = pb2.NumberDataPoint( - attributes=self._translate_attributes(metric.attributes), - start_time_unix_nano=metric.point.start_time_unix_nano, - time_unix_nano=metric.point.time_unix_nano, - ) - if isinstance(metric.point.value, int): - pt.as_int = metric.point.value - else: - pt.as_double = metric.point.value - # note that because sum is a message type, the fields must be - # set individually rather than instantiating a pb2.Sum and setting - # it once - pbmetric.sum.aggregation_temporality = ( - metric.point.aggregation_temporality - ) - pbmetric.sum.is_monotonic = metric.point.is_monotonic - pbmetric.sum.data_points.append(pt) - else: - logger.warn("unsupported datapoint type %s", metric.point) - continue - - scope_metrics.metrics.append( - pbmetric, - ) return ExportMetricsServiceRequest( resource_metrics=get_resource_data( sdk_resource_scope_metrics, @@ -168,7 +199,12 @@ def _translate_data( ) ) - def export(self, metrics: Sequence[Metric]) -> MetricExportResult: + def export( + self, + metrics: Dict[ + Resource, Dict[InstrumentationScope, Iterable[MetricReaderMetric]] + ], + ) -> MetricExportResult: return self._export(metrics) def shutdown(self): diff --git a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py index af5d39ff597..b386d23b9e3 100644 --- a/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py +++ b/exporter/opentelemetry-exporter-otlp-proto-grpc/tests/metrics/test_otlp_metrics_exporter.py @@ -41,13 +41,24 @@ Resource as OTLPResource, ) from opentelemetry.sdk._metrics.export import MetricExportResult -from opentelemetry.sdk._metrics.point import AggregationTemporality, Histogram +from opentelemetry.sdk._metrics.metric_reader import MetricReaderMetric +from opentelemetry.sdk._metrics.point import ( + AggregationTemporality, + Gauge, + Histogram, + Sum, +) from opentelemetry.sdk.environment_variables import ( OTEL_EXPORTER_OTLP_METRICS_INSECURE, ) +from opentelemetry.sdk.resources import SERVICE_NAME, Resource +from opentelemetry.sdk.util.instrumentation import ( + InstrumentationScope as SDKInstrumentationScope, +) from opentelemetry.test.metrictestutil import ( _generate_gauge, _generate_metric, + _generate_metric_reader_metric, _generate_sum, ) @@ -110,21 +121,31 @@ def setUp(self): self.server.start() self.metrics = { - "sum_int": _generate_sum("sum_int", 33), - "sum_double": _generate_sum("sum_double", 2.98), - "gauge_int": _generate_gauge("gauge_int", 9000), - "gauge_double": _generate_gauge("gauge_double", 52.028), - "histogram": _generate_metric( - "histogram", - Histogram( - aggregation_temporality=AggregationTemporality.DELTA, - bucket_counts=[1, 4], - explicit_bounds=[10.0, 20.0], - max=18, - min=8, - start_time_unix_nano=1641946016139533244, - sum=67, - time_unix_nano=1641946016139533244, + "sum_int": _generate_metric_reader_metric( + _generate_sum("sum_int", 33) + ), + "sum_double": _generate_metric_reader_metric( + _generate_sum("sum_double", 2.98) + ), + "gauge_int": _generate_metric_reader_metric( + _generate_gauge("gauge_int", 9000) + ), + "gauge_double": _generate_metric_reader_metric( + _generate_gauge("gauge_double", 52.028) + ), + "histogram": _generate_metric_reader_metric( + _generate_metric( + "histogram", + Histogram( + aggregation_temporality=AggregationTemporality.DELTA, + bucket_counts=[1, 4], + explicit_bounds=[10.0, 20.0], + max=18, + min=8, + start_time_unix_nano=1641946016139533244, + sum=67, + time_unix_nano=1641946016139533244, + ), ), ), } @@ -243,7 +264,7 @@ def test_unavailable(self, mock_sleep, mock_expo): MetricsServiceServicerUNAVAILABLE(), self.server ) self.assertEqual( - self.exporter.export([self.metrics["sum_int"]]), + self.exporter.export(self.metrics["sum_int"]), MetricExportResult.FAILURE, ) mock_sleep.assert_called_with(1) @@ -258,7 +279,7 @@ def test_unavailable_delay(self, mock_sleep, mock_expo): MetricsServiceServicerUNAVAILABLEDelay(), self.server ) self.assertEqual( - self.exporter.export([self.metrics["sum_int"]]), + self.exporter.export(self.metrics["sum_int"]), MetricExportResult.FAILURE, ) mock_sleep.assert_called_with(4) @@ -268,7 +289,7 @@ def test_success(self): MetricsServiceServicerSUCCESS(), self.server ) self.assertEqual( - self.exporter.export([self.metrics["sum_int"]]), + self.exporter.export(self.metrics["sum_int"]), MetricExportResult.SUCCESS, ) @@ -277,7 +298,7 @@ def test_failure(self): MetricsServiceServicerALREADY_EXISTS(), self.server ) self.assertEqual( - self.exporter.export([self.metrics["sum_int"]]), + self.exporter.export(self.metrics["sum_int"]), MetricExportResult.FAILURE, ) @@ -336,7 +357,7 @@ def test_translate_sum_int(self): ] ) # pylint: disable=protected-access - actual = self.exporter._translate_data([self.metrics["sum_int"]]) + actual = self.exporter._translate_data(self.metrics["sum_int"]) self.assertEqual(expected, actual) def test_translate_sum_double(self): @@ -394,7 +415,7 @@ def test_translate_sum_double(self): ] ) # pylint: disable=protected-access - actual = self.exporter._translate_data([self.metrics["sum_double"]]) + actual = self.exporter._translate_data(self.metrics["sum_double"]) self.assertEqual(expected, actual) def test_translate_gauge_int(self): @@ -449,7 +470,7 @@ def test_translate_gauge_int(self): ] ) # pylint: disable=protected-access - actual = self.exporter._translate_data([self.metrics["gauge_int"]]) + actual = self.exporter._translate_data(self.metrics["gauge_int"]) self.assertEqual(expected, actual) def test_translate_gauge_double(self): @@ -504,7 +525,7 @@ def test_translate_gauge_double(self): ] ) # pylint: disable=protected-access - actual = self.exporter._translate_data([self.metrics["gauge_double"]]) + actual = self.exporter._translate_data(self.metrics["gauge_double"]) self.assertEqual(expected, actual) def test_translate_histogram(self): @@ -566,5 +587,219 @@ def test_translate_histogram(self): ] ) # pylint: disable=protected-access - actual = self.exporter._translate_data([self.metrics["histogram"]]) + actual = self.exporter._translate_data(self.metrics["histogram"]) + self.assertEqual(expected, actual) + + def test_translate_several_metrics(self): + # pylint: disable=too-many-locals + + resource_0 = Resource({SERVICE_NAME: "resource_0"}, "") + resource_1 = Resource({SERVICE_NAME: "resource_1"}, "") + + instrumentation_scope_0 = SDKInstrumentationScope( + "instrumentation_scope_0", None, None + ) + instrumentation_scope_1 = SDKInstrumentationScope( + "instrumentation_scope_1", None, None + ) + + metric_reader_metric_0 = MetricReaderMetric( + attributes={"foo": "1"}, + description="description_0", + name="counter1", + unit="s", + point=Sum( + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + start_time_unix_nano=1651529022025608591, + time_unix_nano=1651529022025645974, + value=1, + ), + ) + + metric_reader_metric_1 = MetricReaderMetric( + attributes={"foo": "2"}, + description="description_1", + name="counter1", + unit="m", + point=Sum( + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + start_time_unix_nano=1651529022025618863, + time_unix_nano=1651529022025668946, + value=1, + ), + ) + + metric_reader_metric_2 = MetricReaderMetric( + attributes={"foo": "3"}, + description="description_2", + name="observable_gauge1", + unit="Hz", + point=Gauge(time_unix_nano=1651529022025687868, value=12), + ) + + metric_reader_metric_3 = MetricReaderMetric( + attributes={"foo": "4"}, + description="description_3", + name="observable_gauge1", + unit="Hz", + point=Gauge(time_unix_nano=1651529022025687868, value=12), + ) + + metrics = { + resource_0: { + instrumentation_scope_0: [ + metric_reader_metric_0, + metric_reader_metric_1, + ], + instrumentation_scope_1: [metric_reader_metric_3], + }, + resource_1: {instrumentation_scope_1: [metric_reader_metric_2]}, + } + + pb2_metric_0 = pb2.Metric( + name="counter1", + unit="s", + description="description_0", + sum=pb2.Sum( + data_points=[ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="foo", + value=AnyValue(string_value="1"), + ), + ], + start_time_unix_nano=1651529022025608591, + time_unix_nano=1651529022025645974, + as_int=1, + ) + ], + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + ) + + pb2_metric_1 = pb2.Metric( + name="counter1", + unit="m", + description="description_1", + sum=pb2.Sum( + data_points=[ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="foo", + value=AnyValue(string_value="2"), + ), + ], + start_time_unix_nano=1651529022025618863, + time_unix_nano=1651529022025668946, + as_int=1, + ) + ], + aggregation_temporality=AggregationTemporality.CUMULATIVE, + is_monotonic=True, + ), + ) + + pb2_metric_2 = pb2.Metric( + name="observable_gauge1", + unit="Hz", + description="description_2", + gauge=pb2.Gauge( + data_points=[ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="foo", value=AnyValue(string_value="3") + ) + ], + time_unix_nano=1651529022025687868, + as_int=12, + ) + ] + ), + ) + + pb2_metric_3 = pb2.Metric( + name="observable_gauge1", + unit="Hz", + description="description_3", + gauge=pb2.Gauge( + data_points=[ + pb2.NumberDataPoint( + attributes=[ + KeyValue( + key="foo", + value=AnyValue(string_value="4"), + ), + ], + time_unix_nano=1651529022025687868, + as_int=12, + ) + ] + ), + ) + + pb2_resource_0 = OTLPResource( + attributes=[ + KeyValue( + key="service.name", + value=AnyValue(string_value="resource_0"), + ) + ] + ) + + pb2_resource_1 = OTLPResource( + attributes=[ + KeyValue( + key="service.name", + value=AnyValue(string_value="resource_1"), + ) + ] + ) + + pb2_instrumentation_scope_0 = InstrumentationScope( + name="instrumentation_scope_0" + ) + pb2_instrumentation_scope_1 = InstrumentationScope( + name="instrumentation_scope_1" + ) + # pylint: disable=protected-access + actual = self.exporter._translate_data(metrics) + expected = ExportMetricsServiceRequest( + resource_metrics=[ + pb2.ResourceMetrics( + resource=pb2_resource_0, + scope_metrics=[ + pb2.ScopeMetrics( + scope=pb2_instrumentation_scope_0, + metrics=[ + pb2_metric_0, + pb2_metric_1, + ], + ), + pb2.ScopeMetrics( + scope=pb2_instrumentation_scope_1, + metrics=[ + pb2_metric_3, + ], + ), + ], + ), + pb2.ResourceMetrics( + resource=pb2_resource_1, + scope_metrics=[ + pb2.ScopeMetrics( + scope=pb2_instrumentation_scope_1, + metrics=[ + pb2_metric_2, + ], + ) + ], + ), + ] + ) self.assertEqual(expected, actual) diff --git a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py index 4d506626750..c9177a884a1 100644 --- a/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py +++ b/exporter/opentelemetry-exporter-prometheus/src/opentelemetry/exporter/prometheus/__init__.py @@ -77,8 +77,13 @@ ) from prometheus_client.core import Metric as PrometheusMetric -from opentelemetry.sdk._metrics.metric_reader import MetricReader +from opentelemetry.sdk._metrics.metric_reader import ( + MetricReader, + MetricReaderMetric, +) from opentelemetry.sdk._metrics.point import Gauge, Histogram, Metric, Sum +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope _logger = getLogger(__name__) @@ -110,10 +115,19 @@ def __init__(self, prefix: str = "") -> None: REGISTRY.register(self._collector) self._collector._callback = self.collect - def _receive_metrics(self, metrics: Iterable[Metric]) -> None: + def _receive_metrics( + self, + metrics: Dict[ + Resource, Dict[InstrumentationScope, Iterable[MetricReaderMetric]] + ], + ) -> None: if metrics is None: return - self._collector.add_metrics_data(metrics) + for instrumentation_scope_metric_reader_metrics in metrics.values(): + for metric_reader_metrics in ( + instrumentation_scope_metric_reader_metrics + ).values(): + self._collector.add_metrics_data(metric_reader_metrics) def shutdown(self) -> bool: REGISTRY.unregister(self._collector) @@ -135,7 +149,9 @@ def __init__(self, prefix: str = ""): r"[^\w]", UNICODE | IGNORECASE ) - def add_metrics_data(self, export_records: Sequence[Metric]) -> None: + def add_metrics_data( + self, export_records: Iterable[MetricReaderMetric] + ) -> None: """Add metrics to Prometheus data""" self._metrics_to_export.append(export_records) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py index b99d7f8f03b..154703729c1 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/export/__init__.py @@ -28,8 +28,13 @@ set_value, ) from opentelemetry.sdk._metrics._internal.aggregation import Aggregation -from opentelemetry.sdk._metrics.metric_reader import MetricReader +from opentelemetry.sdk._metrics.metric_reader import ( + MetricReader, + MetricReaderMetric, +) from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope from opentelemetry.util._once import Once _logger = logging.getLogger(__name__) @@ -113,19 +118,26 @@ def __init__( preferred_aggregation=preferred_aggregation, ) self._lock = RLock() - self._metrics: List[Metric] = [] + self._metrics: Dict[ + Resource, Dict[InstrumentationScope, Iterable[MetricReaderMetric]] + ] = None def get_metrics(self) -> List[Metric]: """Reads and returns current metrics from the SDK""" with self._lock: self.collect() metrics = self._metrics - self._metrics = [] + self._metrics = None return metrics - def _receive_metrics(self, metrics: Iterable[Metric]): + def _receive_metrics( + self, + metrics: Dict[ + Resource, Dict[InstrumentationScope, Iterable[MetricReaderMetric]] + ], + ) -> None: with self._lock: - self._metrics = list(metrics) + self._metrics = metrics def shutdown(self): pass @@ -193,7 +205,12 @@ def _ticker(self) -> None: # one last collection below before shutting down completely self.collect() - def _receive_metrics(self, metrics: Iterable[Metric]) -> None: + def _receive_metrics( + self, + metrics: Dict[ + Resource, Dict[InstrumentationScope, Iterable[MetricReaderMetric]] + ], + ) -> None: if metrics is None: return token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py index b84b873854f..987bcdf15b4 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/_internal/metric_reader.py @@ -13,6 +13,7 @@ # limitations under the License. from abc import ABC, abstractmethod +from dataclasses import dataclass from logging import getLogger from os import environ from typing import Callable, Dict, Iterable @@ -31,14 +32,30 @@ ObservableUpDownCounter, UpDownCounter, ) -from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric +from opentelemetry.sdk._metrics.point import ( + AggregationTemporality, + Metric, + PointT, +) from opentelemetry.sdk.environment_variables import ( _OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE, ) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope +from opentelemetry.util.types import Attributes _logger = getLogger(__name__) +@dataclass(frozen=True) +class MetricReaderMetric: + attributes: Attributes + description: str + name: str + unit: str + point: PointT + + class MetricReader(ABC): """ Base class for all metric readers @@ -147,10 +164,48 @@ def collect(self) -> None: "Cannot call collect on a MetricReader until it is registered on a MeterProvider" ) return - self._receive_metrics( - self._collect(self, self._instrument_class_temporality) + + collected_metrics = self._collect( + self, self._instrument_class_temporality ) + resource_instrumentation_scope_metrics = {} + + for collected_metric in collected_metrics: + if collected_metric.resource not in ( + resource_instrumentation_scope_metrics + ): + resource_instrumentation_scope_metrics[ + collected_metric.resource + ] = {} + + instrumentation_scope_metrics = ( + resource_instrumentation_scope_metrics[ + collected_metric.resource + ] + ) + + if collected_metric.instrumentation_scope not in ( + instrumentation_scope_metrics + ): + instrumentation_scope_metrics[ + collected_metric.instrumentation_scope + ] = [] + + instrumentation_scope_metrics[ + collected_metric.instrumentation_scope + ].append( + MetricReaderMetric( + attributes=collected_metric.attributes, + description=collected_metric.description, + name=collected_metric.name, + unit=collected_metric.unit, + point=collected_metric.point, + ) + ) + + self._receive_metrics(resource_instrumentation_scope_metrics) + @final def _set_collect_callback( self, @@ -162,7 +217,12 @@ def _set_collect_callback( self._collect = func @abstractmethod - def _receive_metrics(self, metrics: Iterable[Metric]): + def _receive_metrics( + self, + metrics: Dict[ + Resource, Dict[InstrumentationScope, Iterable[MetricReaderMetric]] + ], + ) -> None: """Called by `MetricReader.collect` when it receives a batch of metrics""" @abstractmethod diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py index d184e9fbef0..e424590581a 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/_metrics/metric_reader.py @@ -16,6 +16,7 @@ from opentelemetry.sdk._metrics._internal.metric_reader import ( # noqa: F401 MetricReader, + MetricReaderMetric, ) __all__ = [] diff --git a/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py b/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py index cb2ab8b54c6..79a01b2659e 100644 --- a/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py +++ b/opentelemetry-sdk/tests/metrics/integration_test/test_disable_default_views.py @@ -33,7 +33,7 @@ def test_disable_default_views(self): counter.add(10, {"label": "value2"}) counter.add(10, {"label": "value3"}) - self.assertEqual(reader.get_metrics(), []) + self.assertEqual(reader.get_metrics(), {}) def test_disable_default_views_add_custom(self): reader = InMemoryMetricReader() @@ -54,4 +54,9 @@ def test_disable_default_views_add_custom(self): metrics = reader.get_metrics() self.assertEqual(len(metrics), 1) + + resource = [*metrics][0] + instrumentation_scope = [*metrics[resource]][0] + metrics = metrics[resource][instrumentation_scope] + self.assertEqual(metrics[0].name, "testhist") diff --git a/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py index 58b2aad3e82..49a6cefadc0 100644 --- a/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_in_memory_metric_reader.py @@ -32,16 +32,20 @@ def test_no_metrics(self): mock_collect_callback = Mock(return_value=[]) reader = InMemoryMetricReader() reader._set_collect_callback(mock_collect_callback) - self.assertEqual(reader.get_metrics(), []) + self.assertEqual(reader.get_metrics(), {}) mock_collect_callback.assert_called_once() - def test_converts_metrics_to_list(self): + def test_store_metrics(self): + + resource = Resource.create() + instrumentation_scope = InstrumentationScope("testmetrics") + metric = Metric( attributes={"myattr": "baz"}, description="", - instrumentation_scope=InstrumentationScope("testmetrics"), + instrumentation_scope=instrumentation_scope, name="foo", - resource=Resource.create(), + resource=resource, unit="", point=Sum( start_time_unix_nano=1647626444152947792, @@ -57,9 +61,17 @@ def test_converts_metrics_to_list(self): returned_metrics = reader.get_metrics() mock_collect_callback.assert_called_once() - self.assertIsInstance(returned_metrics, list) - self.assertEqual(len(returned_metrics), 1) - self.assertIs(returned_metrics[0], metric) + self.assertIsInstance(returned_metrics, dict) + + metric_reader_metric = returned_metrics[resource][ + instrumentation_scope + ][0] + + self.assertEqual(metric_reader_metric.attributes, metric.attributes) + self.assertEqual(metric_reader_metric.description, metric.description) + self.assertEqual(metric_reader_metric.name, metric.name) + self.assertEqual(metric_reader_metric.unit, metric.unit) + self.assertEqual(metric_reader_metric.point, metric.point) def test_shutdown(self): # shutdown should always be successful @@ -77,4 +89,8 @@ def test_integration(self): metrics = reader.get_metrics() # should be 3 metrics, one from the observable gauge and one for each labelset from the counter - self.assertEqual(len(metrics), 3) + + resource = [*metrics][0] + instrumentation_scope = [*metrics[resource]][0] + + self.assertEqual(len(metrics[resource][instrumentation_scope]), 3) diff --git a/opentelemetry-sdk/tests/metrics/test_metrics.py b/opentelemetry-sdk/tests/metrics/test_metrics.py index ca256319354..8d9d2544a2c 100644 --- a/opentelemetry-sdk/tests/metrics/test_metrics.py +++ b/opentelemetry-sdk/tests/metrics/test_metrics.py @@ -241,7 +241,13 @@ def test_measurement_collect_callback( DummyMetricReader(), ] sync_consumer_instance = mock_sync_measurement_consumer() - sync_consumer_instance.collect = MockFunc() + + class ListMockFunc(MockFunc): + def __init__(self): + super().__init__() + self.mock = [] + + sync_consumer_instance.collect = ListMockFunc() MeterProvider(metric_readers=metric_readers) for reader in metric_readers: @@ -496,6 +502,14 @@ def test_duplicate_instrument_aggregate_data(self): metrics = exporter.metrics[0] + resource = [*metrics][0] + instrumentation_scope_0 = [*metrics[resource]][0] + instrumentation_scope_1 = [*metrics[resource]][1] + metrics_0 = metrics[resource][instrumentation_scope_0] + metrics_1 = metrics[resource][instrumentation_scope_1] + + metrics = [metrics_0[0], metrics_1[0]] + self.assertEqual(len(metrics), 2) metric_0 = metrics[0] diff --git a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py index ff67e848afe..06d2782cd40 100644 --- a/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py +++ b/opentelemetry-sdk/tests/metrics/test_periodic_exporting_metric_reader.py @@ -91,12 +91,13 @@ def _create_periodic_reader( def _collect(reader, temp): time.sleep(collect_wait) pmr._receive_metrics(metrics) + return [] pmr._set_collect_callback(_collect) return pmr def test_ticker_called(self): - collect_mock = Mock() + collect_mock = Mock(**{"return_value": []}) pmr = PeriodicExportingMetricReader(Mock(), export_interval_millis=1) pmr._set_collect_callback(collect_mock) time.sleep(0.1) diff --git a/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py b/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py index 97b411ac4b6..9f1cb835ebf 100644 --- a/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py +++ b/tests/opentelemetry-test-utils/src/opentelemetry/test/metrictestutil.py @@ -16,6 +16,7 @@ from collections import OrderedDict from opentelemetry.attributes import BoundedAttributes +from opentelemetry.sdk._metrics.metric_reader import MetricReaderMetric from opentelemetry.sdk._metrics.point import ( AggregationTemporality, Gauge, @@ -26,6 +27,23 @@ from opentelemetry.sdk.util.instrumentation import InstrumentationScope +def _generate_metric_reader_metric(metric: Metric): + + return { + metric.resource: { + metric.instrumentation_scope: [ + MetricReaderMetric( + attributes=metric.attributes, + description=metric.description, + name=metric.name, + unit=metric.unit, + point=metric.point, + ) + ] + } + } + + def _generate_metric( name, point, attributes=None, description=None, unit=None ) -> Metric: