Skip to content

Commit

Permalink
[Python Otel] Allow start observability without context manager.
Browse files Browse the repository at this point in the history
  • Loading branch information
XuanWang-Amos committed Feb 16, 2024
1 parent 34be0d8 commit a7373ae
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 11 deletions.
13 changes: 12 additions & 1 deletion src/python/grpcio_observability/grpc_observability/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@
from grpc_observability._open_telemetry_observability import (
OpenTelemetryObservability,
)
from grpc_observability._open_telemetry_observability import (
end_open_telemetry_observability,
)
from grpc_observability._open_telemetry_observability import (
start_open_telemetry_observability,
)
from grpc_observability._open_telemetry_plugin import OpenTelemetryPlugin

__all__ = ("OpenTelemetryObservability", "OpenTelemetryPlugin")
__all__ = (
"OpenTelemetryObservability",
"OpenTelemetryPlugin",
"start_open_telemetry_observability",
"end_open_telemetry_observability",
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import logging
import threading
import time
from typing import Any, Iterable, Optional

Expand Down Expand Up @@ -54,6 +55,38 @@
grpc.StatusCode.DATA_LOSS: "DATA_LOSS",
}

_observability_lock: threading.RLock = threading.RLock()
_OPEN_TELEMETRY_OBSERVABILITY: Optional["OpenTelemetryObservability"] = None


def start_open_telemetry_observability(
*,
plugins: Optional[Iterable[OpenTelemetryPlugin]] = None,
) -> None:
global _OPEN_TELEMETRY_OBSERVABILITY # pylint: disable=global-statement
with _observability_lock:
if _OPEN_TELEMETRY_OBSERVABILITY is None:
_OPEN_TELEMETRY_OBSERVABILITY = OpenTelemetryObservability(
plugins=plugins
)
_OPEN_TELEMETRY_OBSERVABILITY.observability_init()
else:
raise RuntimeError(
"gPRC Python observability was already initiated!"
)


def end_open_telemetry_observability() -> None:
global _OPEN_TELEMETRY_OBSERVABILITY # pylint: disable=global-statement
with _observability_lock:
if not _OPEN_TELEMETRY_OBSERVABILITY:
raise RuntimeError(
"end_open_telemetry_observability() was called without initiate observability!"
)
else:
_OPEN_TELEMETRY_OBSERVABILITY.observability_deinit()
_OPEN_TELEMETRY_OBSERVABILITY = None


# pylint: disable=no-self-use
class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
Expand All @@ -66,7 +99,6 @@ class OpenTelemetryObservability(grpc._observability.ObservabilityPlugin):
"""

exporter: "grpc_observability.Exporter"
plugins: Iterable[OpenTelemetryPlugin]

def __init__(
self,
Expand All @@ -80,26 +112,39 @@ def __init__(

self.exporter = _OpenTelemetryExporterDelegator(_plugins)

def __enter__(self):
global _OPEN_TELEMETRY_OBSERVABILITY # pylint: disable=global-statement
with _observability_lock:
if _OPEN_TELEMETRY_OBSERVABILITY:
raise RuntimeError(
"gPRC Python observability was already initiated!"
)
self.observability_init()
_OPEN_TELEMETRY_OBSERVABILITY = self
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
global _OPEN_TELEMETRY_OBSERVABILITY # pylint: disable=global-statement
with _observability_lock:
self.observability_deinit()
_OPEN_TELEMETRY_OBSERVABILITY = None

def observability_init(self):
try:
_cyobservability.activate_stats()
self.set_stats(True)
except Exception as e: # pylint: disable=broad-except
raise ValueError(f"Activate observability metrics failed with: {e}")

def __enter__(self):
try:
_cyobservability.cyobservability_init(self.exporter)
# TODO(xuanwn): Use specific exceptons
except Exception as e: # pylint: disable=broad-except
_LOGGER.exception("Initiate observability failed with: %s", e)

grpc._observability.observability_init(self)
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.exit()

def exit(self) -> None:
def observability_deinit(self) -> None:
# Sleep so we don't loss any data. If we shutdown export thread
# immediately after exit, it's possible that core didn't call RecordEnd
# in callTracer, and all data recorded by calling RecordEnd will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ def testAll(self):
expected_observability_code_elements = (
"OpenTelemetryObservability",
"OpenTelemetryPlugin",
"start_open_telemetry_observability",
"end_open_telemetry_observability",
)

self.assertCountEqual(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import time
from typing import Any, Callable, Dict, List, Optional, Set
import unittest
from unittest.mock import patch

import grpc
import grpc_observability
Expand Down Expand Up @@ -134,7 +135,7 @@ def tearDown(self):
if self._server:
self._server.stop(0)

def testRecordUnaryUnary(self):
def testRecordUnaryUnaryUseContextManager(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
Expand All @@ -146,6 +147,56 @@ def testRecordUnaryUnary(self):
self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)

def testRecordUnaryUnaryUseGlobalInit(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)

grpc_observability.start_open_telemetry_observability(
plugins=[otel_plugin]
)
server, port = _test_server.start_server()
self._server = server
_test_server.unary_unary_call(port=port)

self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)
grpc_observability.end_open_telemetry_observability()

def testCallGlobalInitThrowErrorWhenGlobalCalled(self):
try:
grpc_observability.start_open_telemetry_observability(plugins=[])
grpc_observability.start_open_telemetry_observability(plugins=[])
except RuntimeError as exp:
self.assertIn("observability was already initiated", str(exp))

grpc_observability.end_open_telemetry_observability()

def testCallGlobalInitThrowErrorWhenContextManagerCalled(self):
with grpc_observability.OpenTelemetryObservability(plugins=[]):
try:
grpc_observability.start_open_telemetry_observability(
plugins=[]
)
pass
except RuntimeError as exp:
self.assertIn("observability was already initiated", str(exp))

def testCallContextManagerThrowErrorWhenGlobalInitCalled(self):
grpc_observability.start_open_telemetry_observability(plugins=[])
try:
with grpc_observability.OpenTelemetryObservability(plugins=[]):
pass
except RuntimeError as exp:
self.assertIn("observability was already initiated", str(exp))
grpc_observability.end_open_telemetry_observability()

def testContextManagerThrowErrorWhenContextManagerCalled(self):
with grpc_observability.OpenTelemetryObservability(plugins=[]):
try:
with grpc_observability.OpenTelemetryObservability(plugins=[]):
pass
except RuntimeError as exp:
self.assertIn("observability was already initiated", str(exp))

def testRecordUnaryUnaryWithClientInterceptor(self):
interceptor = _ClientUnaryUnaryInterceptor()
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
Expand Down Expand Up @@ -212,7 +263,7 @@ def testNoRecordBeforeInit(self):
self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)

def testNoRecordAfterExit(self):
def testNoRecordAfterExitUseContextManager(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)
with grpc_observability.OpenTelemetryObservability(
plugins=[otel_plugin]
Expand All @@ -230,6 +281,26 @@ def testNoRecordAfterExit(self):
with self.assertRaisesRegex(AssertionError, "No metrics was exported"):
self._validate_metrics_exist(self.all_metrics)

def testNoRecordAfterExitUseGlobal(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)

grpc_observability.start_open_telemetry_observability(
plugins=[otel_plugin]
)
server, port = _test_server.start_server()
self._server = server
self._port = port
_test_server.unary_unary_call(port=port)
grpc_observability.end_open_telemetry_observability()

self._validate_metrics_exist(self.all_metrics)
self._validate_all_metrics_names(self.all_metrics)

self.all_metrics = defaultdict(list)
_test_server.unary_unary_call(port=self._port)
with self.assertRaisesRegex(AssertionError, "No metrics was exported"):
self._validate_metrics_exist(self.all_metrics)

def testRecordUnaryStream(self):
otel_plugin = BaseTestOpenTelemetryPlugin(self._provider)

Expand Down Expand Up @@ -342,7 +413,7 @@ def assert_eventually(
message: Optional[Callable[[], str]] = None,
) -> None:
message = message or (lambda: "Proposition did not evaluate to true")
timeout = timeout or datetime.timedelta(seconds=10)
timeout = timeout or datetime.timedelta(seconds=5)
end = datetime.datetime.now() + timeout
while datetime.datetime.now() < end:
if predicate():
Expand Down

0 comments on commit a7373ae

Please sign in to comment.