From 105ec0ddabec5597cfdbdc86f900dc89afad43ef Mon Sep 17 00:00:00 2001 From: Connor Adams Date: Mon, 20 Jul 2020 09:35:00 -0400 Subject: [PATCH] Fixes, changelog --- ext/opentelemetry-ext-grpc/CHANGELOG.md | 2 + .../src/opentelemetry/ext/grpc/_client.py | 38 +++++++++++-------- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/ext/opentelemetry-ext-grpc/CHANGELOG.md b/ext/opentelemetry-ext-grpc/CHANGELOG.md index 3be32e05e59..d938c2d20cf 100644 --- a/ext/opentelemetry-ext-grpc/CHANGELOG.md +++ b/ext/opentelemetry-ext-grpc/CHANGELOG.md @@ -5,6 +5,8 @@ - Add status code to gRPC client spans ([896](https://github.com/open-telemetry/opentelemetry-python/pull/896)) +- Add metric recording (bytes in/out, errors, latency) to gRPC client + ## 0.8b0 Released 2020-05-27 diff --git a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py index a9977f771e3..3e460810630 100644 --- a/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py +++ b/ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py @@ -63,7 +63,7 @@ def append_metadata( propagators.inject(append_metadata, metadata) -def _make_future_done_callback(span, rpc_info): +def _make_future_done_callback(span, rpc_info, client_info, metrics_recorder): def callback(response_future): with span: code = response_future.code() @@ -72,6 +72,10 @@ def callback(response_future): return response = response_future.result() rpc_info.response = response + if "ByteSize" in dir(response): + metrics_recorder.record_bytes_in( + response.ByteSize(), client_info.full_method + ) return callback @@ -90,12 +94,17 @@ def _start_span(self, method): ) # pylint:disable=no-self-use - def _trace_result(self, guarded_span, rpc_info, result): + def _trace_result(self, guarded_span, rpc_info, result, client_info): # If the RPC is called asynchronously, release the guard and add a # callback so that the span can be finished once the future is done. if isinstance(result, grpc.Future): result.add_done_callback( - _make_future_done_callback(guarded_span.release(), rpc_info) + _make_future_done_callback( + guarded_span.release(), + rpc_info, + client_info, + self._metrics_recorder, + ) ) return result response = result @@ -106,6 +115,11 @@ def _trace_result(self, guarded_span, rpc_info, result): if isinstance(result, tuple): response = result[0] rpc_info.response = response + + if "ByteSize" in dir(response): + self._metrics_recorder.record_bytes_in( + response.ByteSize(), client_info.full_method + ) return result def _start_guarded_span(self, *args, **kwargs): @@ -154,13 +168,9 @@ def intercept_unary(self, request, metadata, client_info, invoker): ) raise - ret = self._trace_result(guarded_span, rpc_info, result) - - if "ByteSize" in dir(rpc_info.response): - self._metrics_recorder.record_bytes_in( - rpc_info.response.ByteSize(), client_info.full_method - ) - return ret + return self._trace_result( + guarded_span, rpc_info, result, client_info + ) # For RPCs that stream responses, the result can be a generator. To record # the span across the generated responses and detect any errors, we wrap @@ -253,10 +263,6 @@ def intercept_stream( ) raise - ret = self._trace_result(guarded_span, rpc_info, result) - - self._metrics_recorder.record_bytes_in( - rpc_info.response.ByteSize(), client_info.full_method + return self._trace_result( + guarded_span, rpc_info, result, client_info ) - - return ret