Skip to content

Commit

Permalink
Implement MetricReader default aggregation controls (#2638)
Browse files Browse the repository at this point in the history
  • Loading branch information
ocelotl authored Apr 27, 2022
1 parent 7aac852 commit a4b4c45
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 23 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [1.11.1-0.30b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.11.1-0.30b1) - 2022-04-21


- Add parameter to MetricReader constructor to select aggregation per instrument kind
([#2638](https://github.com/open-telemetry/opentelemetry-python/pull/2638))
- Add parameter to MetricReader constructor to select temporality per instrument kind
([#2637](https://github.com/open-telemetry/opentelemetry-python/pull/2637))
- Fix unhandled callback exceptions on async instruments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
from typing import TYPE_CHECKING, Dict, Iterable

from opentelemetry.sdk._metrics.aggregation import (
DefaultAggregation,
_Aggregation,
_AggregationFactory,
_convert_aggregation_temporality,
_PointVarT,
)
Expand All @@ -39,13 +41,15 @@ def __init__(
view: View,
instrument: "_Instrument",
sdk_config: SdkConfiguration,
instrument_class_aggregation: Dict[type, _AggregationFactory],
):
self._view = view
self._instrument = instrument
self._sdk_config = sdk_config
self._attributes_aggregation: Dict[frozenset, _Aggregation] = {}
self._attributes_previous_point: Dict[frozenset, _PointVarT] = {}
self._lock = Lock()
self._instrument_class_aggregation = instrument_class_aggregation

# pylint: disable=protected-access
def consume_measurement(self, measurement: Measurement) -> None:
Expand All @@ -67,11 +71,19 @@ def consume_measurement(self, measurement: Measurement) -> None:
if attributes not in self._attributes_aggregation:
with self._lock:
if attributes not in self._attributes_aggregation:
self._attributes_aggregation[
attributes
] = self._view._aggregation._create_aggregation(
self._instrument
)
if not isinstance(
self._view._aggregation, DefaultAggregation
):
aggregation = (
self._view._aggregation._create_aggregation(
self._instrument
)
)
else:
aggregation = self._instrument_class_aggregation[
self._instrument.__class__
]._create_aggregation(self._instrument)
self._attributes_aggregation[attributes] = aggregation

self._attributes_aggregation[attributes].aggregate(measurement)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
detach,
set_value,
)
from opentelemetry.sdk._metrics.aggregation import _AggregationFactory
from opentelemetry.sdk._metrics.metric_reader import MetricReader
from opentelemetry.sdk._metrics.point import AggregationTemporality, Metric
from opentelemetry.util._once import Once
Expand Down Expand Up @@ -103,9 +104,14 @@ class InMemoryMetricReader(MetricReader):
"""

def __init__(
self, preferred_temporality: Dict[type, AggregationTemporality] = None
self,
preferred_temporality: Dict[type, AggregationTemporality] = None,
preferred_aggregation: Dict[type, _AggregationFactory] = None,
) -> None:
super().__init__(preferred_temporality=preferred_temporality)
super().__init__(
preferred_temporality=preferred_temporality,
preferred_aggregation=preferred_aggregation,
)
self._lock = RLock()
self._metrics: List[Metric] = []

Expand Down Expand Up @@ -135,10 +141,14 @@ def __init__(
self,
exporter: MetricExporter,
preferred_temporality: Dict[type, AggregationTemporality] = None,
preferred_aggregation: Dict[type, _AggregationFactory] = None,
export_interval_millis: Optional[float] = None,
export_timeout_millis: Optional[float] = None,
) -> None:
super().__init__(preferred_temporality=preferred_temporality)
super().__init__(
preferred_temporality=preferred_temporality,
preferred_aggregation=preferred_aggregation,
)
self._exporter = exporter
if export_interval_millis is None:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

# pylint: disable=too-many-ancestors

import logging
from logging import getLogger
from typing import TYPE_CHECKING, Dict, Generator, Iterable, Optional, Union

from opentelemetry._metrics.instrument import CallbackT
Expand All @@ -38,7 +38,8 @@
MeasurementConsumer,
)

_logger = logging.getLogger(__name__)

_logger = getLogger(__name__)


class _Synchronous:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ def __init__(self, sdk_config: SdkConfiguration) -> None:
self._sdk_config = sdk_config
# should never be mutated
self._reader_storages: Mapping[MetricReader, MetricReaderStorage] = {
reader: MetricReaderStorage(sdk_config)
reader: MetricReaderStorage(
sdk_config, reader._instrument_class_aggregation
)
for reader in sdk_config.metric_readers
}
self._async_instruments: List["_Asynchronous"] = []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

from typing_extensions import final

from opentelemetry.sdk._metrics.aggregation import (
DefaultAggregation,
_AggregationFactory,
)
from opentelemetry.sdk._metrics.instrument import (
Counter,
Histogram,
Expand Down Expand Up @@ -52,6 +56,19 @@ class MetricReader(ABC):
their association to their default aggregation temporalities.
The value passed here will override the corresponding values set
via the environment variable
preferred_aggregation: A mapping between instrument classes and
aggregation instances. By default maps all instrument classes to an
instance of `DefaultAggregation`. This mapping will be used to
define the default aggregation of every instrument class. If the
user wants to make a change in the default aggregation of an
instrument class, it is enough to pass here a dictionary whose keys
are the instrument classes and the values are the corresponding
desired aggregation for the instrument classes that the user wants
to change, not necessarily all of them. The classes not included in
the passed dictionary will retain their association to their
default aggregations. The aggregation defined here will be
overriden by an aggregation defined by a view that is not
`DefaultAggregation`.
.. document protected _receive_metrics which is a intended to be overriden by subclass
.. automethod:: _receive_metrics
Expand All @@ -61,7 +78,9 @@ class MetricReader(ABC):
# to the end of the documentation paragraph above.

def __init__(
self, preferred_temporality: Dict[type, AggregationTemporality] = None
self,
preferred_temporality: Dict[type, AggregationTemporality] = None,
preferred_aggregation: Dict[type, _AggregationFactory] = None,
) -> None:
self._collect: Callable[
["MetricReader", AggregationTemporality], Iterable[Metric]
Expand Down Expand Up @@ -106,6 +125,17 @@ def __init__(
)

self._instrument_class_temporality.update(preferred_temporality or {})
self._preferred_temporality = preferred_temporality
self._instrument_class_aggregation = {
Counter: DefaultAggregation(),
UpDownCounter: DefaultAggregation(),
Histogram: DefaultAggregation(),
ObservableCounter: DefaultAggregation(),
ObservableUpDownCounter: DefaultAggregation(),
ObservableGauge: DefaultAggregation(),
}

self._instrument_class_aggregation.update(preferred_aggregation or {})

@final
def collect(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
from opentelemetry.sdk._metrics._view_instrument_match import (
_ViewInstrumentMatch,
)
from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
from opentelemetry.sdk._metrics.aggregation import (
AggregationTemporality,
_AggregationFactory,
)
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.point import Metric
from opentelemetry.sdk._metrics.sdk_configuration import SdkConfiguration
Expand All @@ -31,12 +34,17 @@
class MetricReaderStorage:
"""The SDK's storage for a given reader"""

def __init__(self, sdk_config: SdkConfiguration) -> None:
def __init__(
self,
sdk_config: SdkConfiguration,
instrument_class_aggregation: Dict[type, _AggregationFactory],
) -> None:
self._lock = RLock()
self._sdk_config = sdk_config
self._view_instrument_match: Dict[
Instrument, List[_ViewInstrumentMatch]
] = {}
self._instrument_class_aggregation = instrument_class_aggregation

def _get_or_init_view_instrument_match(
self, instrument: Instrument
Expand All @@ -62,6 +70,9 @@ def _get_or_init_view_instrument_match(
view=view,
instrument=instrument,
sdk_config=self._sdk_config,
instrument_class_aggregation=(
self._instrument_class_aggregation
),
)
)

Expand All @@ -72,6 +83,9 @@ def _get_or_init_view_instrument_match(
view=_DEFAULT_VIEW,
instrument=instrument,
sdk_config=self._sdk_config,
instrument_class_aggregation=(
self._instrument_class_aggregation
),
)
)
self._view_instrument_match[instrument] = view_instrument_matches
Expand Down
53 changes: 51 additions & 2 deletions opentelemetry-sdk/tests/metrics/test_metric_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
from unittest import TestCase
from unittest.mock import patch

from opentelemetry.sdk._metrics.aggregation import AggregationTemporality
from opentelemetry.sdk._metrics.aggregation import (
AggregationTemporality,
DefaultAggregation,
LastValueAggregation,
_AggregationFactory,
)
from opentelemetry.sdk._metrics.instrument import (
Counter,
Histogram,
Expand All @@ -34,10 +39,13 @@

class DummyMetricReader(MetricReader):
def __init__(
self, preferred_temporality: Dict[type, AggregationTemporality] = None
self,
preferred_temporality: Dict[type, AggregationTemporality] = None,
preferred_aggregation: Dict[type, _AggregationFactory] = None,
) -> None:
super().__init__(
preferred_temporality=preferred_temporality,
preferred_aggregation=preferred_aggregation,
)

def _receive_metrics(self, metrics):
Expand Down Expand Up @@ -173,3 +181,44 @@ def test_configure_temporality_parameter(self):
dummy_metric_reader._instrument_class_temporality[ObservableGauge],
AggregationTemporality.DELTA,
)

def test_default_temporality(self):
dummy_metric_reader = DummyMetricReader()
self.assertEqual(
dummy_metric_reader._instrument_class_aggregation.keys(),
set(
[
Counter,
UpDownCounter,
Histogram,
ObservableCounter,
ObservableUpDownCounter,
ObservableGauge,
]
),
)
for (
value
) in dummy_metric_reader._instrument_class_aggregation.values():
self.assertIsInstance(value, DefaultAggregation)

dummy_metric_reader = DummyMetricReader(
preferred_aggregation={Counter: LastValueAggregation()}
)
self.assertEqual(
dummy_metric_reader._instrument_class_aggregation.keys(),
set(
[
Counter,
UpDownCounter,
Histogram,
ObservableCounter,
ObservableUpDownCounter,
ObservableGauge,
]
),
)
self.assertIsInstance(
dummy_metric_reader._instrument_class_aggregation[Counter],
LastValueAggregation,
)
22 changes: 15 additions & 7 deletions opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from unittest.mock import Mock, patch
from unittest.mock import MagicMock, Mock, patch

from opentelemetry.sdk._metrics.aggregation import DropAggregation
from opentelemetry.sdk._metrics.aggregation import (
DefaultAggregation,
DropAggregation,
)
from opentelemetry.sdk._metrics.instrument import Counter
from opentelemetry.sdk._metrics.measurement import Measurement
from opentelemetry.sdk._metrics.metric_reader_storage import (
Expand Down Expand Up @@ -56,7 +59,8 @@ def test_creates_view_instrument_matches(
resource=Mock(),
metric_readers=(),
views=(view1, view2),
)
),
MagicMock(**{"__getitem__.return_value": DefaultAggregation()}),
)

# instrument1 matches view1 and view2, so should create two ViewInstrumentMatch objects
Expand Down Expand Up @@ -100,7 +104,8 @@ def test_forwards_calls_to_view_instrument_match(
resource=Mock(),
metric_readers=(),
views=(view1, view2),
)
),
MagicMock(**{"__getitem__.return_value": DefaultAggregation()}),
)

# Measurements from an instrument should be passed on to each ViewInstrumentMatch objects
Expand Down Expand Up @@ -147,7 +152,8 @@ def test_race_concurrent_measurements(self, MockViewInstrumentMatch: Mock):
resource=Mock(),
metric_readers=(),
views=(view1,),
)
),
MagicMock(**{"__getitem__.return_value": DefaultAggregation()}),
)

def send_measurement():
Expand All @@ -172,7 +178,8 @@ def test_default_view_enabled(self, MockViewInstrumentMatch: Mock):
resource=Mock(),
metric_readers=(),
views=(),
)
),
MagicMock(**{"__getitem__.return_value": DefaultAggregation()}),
)

storage.consume_measurement(Measurement(1, instrument1))
Expand Down Expand Up @@ -200,7 +207,8 @@ def test_drop_aggregation(self):
instrument_name="name", aggregation=DropAggregation()
),
),
)
),
MagicMock(**{"__getitem__.return_value": DefaultAggregation()}),
)
metric_reader_storage.consume_measurement(Measurement(1, counter))

Expand Down
Loading

0 comments on commit a4b4c45

Please sign in to comment.