diff --git a/examples/python/observability/README.md b/examples/python/observability/README.md new file mode 100644 index 0000000000000..f46f47f073a3b --- /dev/null +++ b/examples/python/observability/README.md @@ -0,0 +1,59 @@ +gRPC Observability Example +===================== + +The examples here demonstrate how to setup gRPC Python Observability with Opentelemetry. + +More details about how to use gRPC Python Observability APIs can be found in [OpenTelemetry Metrics gRFC](https://github.com/grpc/proposal/blob/master/A66-otel-stats.md#opentelemetry-metrics). + +### Requirements + +The examples here depends on grpcio and grpcio-observability version of 1.62.0 or newer. + +### Run the Server + +1. Navigate to this directory: + +```sh +cd grpc/examples/python/observability +``` + +2. Run the server: + +```sh +python -m observability_greeter_server +``` + +### Run the Client + +Note that client should start within 10 seconds of the server becoming active. + +```sh +python -m observability_greeter_client +``` + +### Verifying Metrics + +The example will print a list of metric names collected. + +Server Side: + +```sh +Server started, listening on 50051 +Metrics exported on Server side: +grpc.server.call.started +grpc.server.call.sent_total_compressed_message_size +grpc.server.call.rcvd_total_compressed_message_size +grpc.server.call.duration +``` + +Client Side: + +```sh +Greeter client received: Hello You +Metrics exported on client side: +grpc.client.call.duration +grpc.client.attempt.started +grpc.client.attempt.sent_total_compressed_message_size +grpc.client.attempt.rcvd_total_compressed_message_size +grpc.client.attempt.duration +``` diff --git a/examples/python/observability/helloworld_pb2.py b/examples/python/observability/helloworld_pb2.py new file mode 100644 index 0000000000000..f5b4f2d27dce7 --- /dev/null +++ b/examples/python/observability/helloworld_pb2.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: helloworld.proto +"""Generated protocol buffer code.""" +from google.protobuf.internal import builder as _builder +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10helloworld.proto\x12\nhelloworld\"\x1c\n\x0cHelloRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\"\x1d\n\nHelloReply\x12\x0f\n\x07message\x18\x01 \x01(\t2I\n\x07Greeter\x12>\n\x08SayHello\x12\x18.helloworld.HelloRequest\x1a\x16.helloworld.HelloReply\"\x00\x42\x36\n\x1bio.grpc.examples.helloworldB\x0fHelloWorldProtoP\x01\xa2\x02\x03HLWb\x06proto3') + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'helloworld_pb2', globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + + DESCRIPTOR._options = None + DESCRIPTOR._serialized_options = b'\n\033io.grpc.examples.helloworldB\017HelloWorldProtoP\001\242\002\003HLW' + _HELLOREQUEST._serialized_start=32 + _HELLOREQUEST._serialized_end=60 + _HELLOREPLY._serialized_start=62 + _HELLOREPLY._serialized_end=91 + _GREETER._serialized_start=93 + _GREETER._serialized_end=166 +# @@protoc_insertion_point(module_scope) diff --git a/examples/python/observability/helloworld_pb2.pyi b/examples/python/observability/helloworld_pb2.pyi new file mode 100644 index 0000000000000..8c4b5b22805d9 --- /dev/null +++ b/examples/python/observability/helloworld_pb2.pyi @@ -0,0 +1,17 @@ +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from typing import ClassVar as _ClassVar, Optional as _Optional + +DESCRIPTOR: _descriptor.FileDescriptor + +class HelloReply(_message.Message): + __slots__ = ["message"] + MESSAGE_FIELD_NUMBER: _ClassVar[int] + message: str + def __init__(self, message: _Optional[str] = ...) -> None: ... + +class HelloRequest(_message.Message): + __slots__ = ["name"] + NAME_FIELD_NUMBER: _ClassVar[int] + name: str + def __init__(self, name: _Optional[str] = ...) -> None: ... diff --git a/examples/python/observability/helloworld_pb2_grpc.py b/examples/python/observability/helloworld_pb2_grpc.py new file mode 100644 index 0000000000000..47c186976e1c6 --- /dev/null +++ b/examples/python/observability/helloworld_pb2_grpc.py @@ -0,0 +1,70 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +import helloworld_pb2 as helloworld__pb2 + + +class GreeterStub(object): + """The greeting service definition. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SayHello = channel.unary_unary( + '/helloworld.Greeter/SayHello', + request_serializer=helloworld__pb2.HelloRequest.SerializeToString, + response_deserializer=helloworld__pb2.HelloReply.FromString, + ) + + +class GreeterServicer(object): + """The greeting service definition. + """ + + def SayHello(self, request, context): + """Sends a greeting + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_GreeterServicer_to_server(servicer, server): + rpc_method_handlers = { + 'SayHello': grpc.unary_unary_rpc_method_handler( + servicer.SayHello, + request_deserializer=helloworld__pb2.HelloRequest.FromString, + response_serializer=helloworld__pb2.HelloReply.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'helloworld.Greeter', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class Greeter(object): + """The greeting service definition. + """ + + @staticmethod + def SayHello(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/helloworld.Greeter/SayHello', + helloworld__pb2.HelloRequest.SerializeToString, + helloworld__pb2.HelloReply.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/examples/python/observability/observability_greeter_client.py b/examples/python/observability/observability_greeter_client.py new file mode 100644 index 0000000000000..25f7d68a4c8cf --- /dev/null +++ b/examples/python/observability/observability_greeter_client.py @@ -0,0 +1,71 @@ +# Copyright 2024 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""gRPC Python helloworld.Greeter client with observability enabled.""" + +from collections import defaultdict +import logging +import time +from typing import Optional + +import grpc +import grpc_observability +import helloworld_pb2 +import helloworld_pb2_grpc +import open_telemetry_exporter +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + +OTEL_EXPORT_INTERVAL_S = 0.5 + + +class BaseOpenTelemetryPlugin(grpc_observability.OpenTelemetryPlugin): + def __init__(self, provider: MeterProvider): + self.provider = provider + + def get_meter_provider(self) -> Optional[MeterProvider]: + return self.provider + + +def run(): + all_metrics = defaultdict(list) + otel_exporter = open_telemetry_exporter.OTelMetricExporter(all_metrics) + reader = PeriodicExportingMetricReader( + exporter=otel_exporter, + export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000, + ) + provider = MeterProvider(metric_readers=[reader]) + otel_plugin = BaseOpenTelemetryPlugin(provider) + + with grpc_observability.OpenTelemetryObservability(plugins=[otel_plugin]): + with grpc.insecure_channel(target="localhost:50051") as channel: + stub = helloworld_pb2_grpc.GreeterStub(channel) + try: + response = stub.SayHello( + helloworld_pb2.HelloRequest(name="You") + ) + print(f"Greeter client received: {response.message}") + except grpc.RpcError as rpc_error: + print("Call failed with code: ", rpc_error.code()) + + # Sleep to make sure all metrics are exported. + time.sleep(5) + + print("Metrics exported on client side:") + for metric in all_metrics: + print(metric) + + +if __name__ == "__main__": + logging.basicConfig() + run() diff --git a/examples/python/observability/observability_greeter_server.py b/examples/python/observability/observability_greeter_server.py new file mode 100644 index 0000000000000..2582c78687a92 --- /dev/null +++ b/examples/python/observability/observability_greeter_server.py @@ -0,0 +1,80 @@ +# Copyright 2024 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""The Python implementation of the GRPC helloworld.Greeter server with observability enabled.""" + +from collections import defaultdict +from concurrent import futures +import logging +import time +from typing import Optional + +import grpc +import grpc_observability +import helloworld_pb2 +import helloworld_pb2_grpc +import open_telemetry_exporter +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader + +OTEL_EXPORT_INTERVAL_S = 0.5 + + +class BaseOpenTelemetryPlugin(grpc_observability.OpenTelemetryPlugin): + def __init__(self, provider: MeterProvider): + self.provider = provider + + def get_meter_provider(self) -> Optional[MeterProvider]: + return self.provider + + +class Greeter(helloworld_pb2_grpc.GreeterServicer): + def SayHello(self, request, context): + message = request.name + return helloworld_pb2.HelloReply(message=f"Hello {message}") + + +def serve(): + all_metrics = defaultdict(list) + otel_exporter = open_telemetry_exporter.OTelMetricExporter( + all_metrics, print_live=False + ) + reader = PeriodicExportingMetricReader( + exporter=otel_exporter, + export_interval_millis=OTEL_EXPORT_INTERVAL_S * 1000, + ) + provider = MeterProvider(metric_readers=[reader]) + otel_plugin = BaseOpenTelemetryPlugin(provider) + port = "50051" + + with grpc_observability.OpenTelemetryObservability(plugins=[otel_plugin]): + server = grpc.server( + thread_pool=futures.ThreadPoolExecutor(max_workers=10), + ) + helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server) + server.add_insecure_port("[::]:" + port) + server.start() + print("Server started, listening on " + port) + + # Sleep to make sure client made RPC call and all metrics are exported. + time.sleep(10) + print("Metrics exported on Server side:") + for metric in all_metrics: + print(metric) + + server.stop(0) + + +if __name__ == "__main__": + logging.basicConfig() + serve() diff --git a/examples/python/observability/open_telemetry_exporter.py b/examples/python/observability/open_telemetry_exporter.py new file mode 100644 index 0000000000000..af70bdb05a9d3 --- /dev/null +++ b/examples/python/observability/open_telemetry_exporter.py @@ -0,0 +1,75 @@ +# Copyright 2024 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Dict, List + +from opentelemetry.sdk.metrics.export import AggregationTemporality +from opentelemetry.sdk.metrics.export import MetricExportResult +from opentelemetry.sdk.metrics.export import MetricExporter +from opentelemetry.sdk.metrics.export import MetricsData + + +class OTelMetricExporter(MetricExporter): + """Implementation of :class:`MetricExporter` that export metrics to the + provided metric_list. + + all_metrics: A dict which key is grpc_observability._opentelemetry_measures.Metric.name, + value is a list of labels recorded for that metric. + An example item of this dict: + {"grpc.client.attempt.started": + [{'grpc.method': 'test/UnaryUnary', 'grpc.target': 'localhost:42517'}, + {'grpc.method': 'other', 'grpc.target': 'localhost:42517'}]} + """ + + def __init__( + self, + all_metrics: Dict[str, List], + preferred_temporality: Dict[type, AggregationTemporality] = None, + preferred_aggregation: Dict[ + type, "opentelemetry.sdk.metrics.view.Aggregation" + ] = None, + print_live: bool = False, + ): + super().__init__( + preferred_temporality=preferred_temporality, + preferred_aggregation=preferred_aggregation, + ) + self._all_metrics = all_metrics + self._print_live = print_live + + def export( + self, + metrics_data: MetricsData, + timeout_millis: float = 10_000, + **kwargs, + ) -> MetricExportResult: + self.record_metric(metrics_data) + return MetricExportResult.SUCCESS + + def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: + pass + + def force_flush(self, timeout_millis: float = 10_000) -> bool: + return True + + def record_metric(self, metrics_data: MetricsData) -> None: + for resource_metric in metrics_data.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + for data_point in metric.data.data_points: + self._all_metrics[metric.name].append( + data_point.attributes + ) + if self._print_live: + print(f"Metric exporter received: {metric.name}")