Skip to content

Commit

Permalink
grpc client metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
cnnradams committed Jul 16, 2020
1 parent ee9d28a commit dc79e3a
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from opentelemetry.ext.grpc.version import __version__


def client_interceptor(tracer_provider=None):
def client_interceptor(tracer_provider=None, meter=None):
"""Create a gRPC client channel interceptor.
Args:
Expand All @@ -34,7 +34,7 @@ def client_interceptor(tracer_provider=None):

tracer = trace.get_tracer(__name__, __version__, tracer_provider)

return _client.OpenTelemetryClientInterceptor(tracer)
return _client.OpenTelemetryClientInterceptor(tracer, meter)


def server_interceptor(tracer_provider=None):
Expand Down
164 changes: 112 additions & 52 deletions ext/opentelemetry-ext-grpc/src/opentelemetry/ext/grpc/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from opentelemetry.trace.status import Status, StatusCanonicalCode

from . import grpcext
from ._utilities import RpcInfo
from ._utilities import RpcInfo, TimedMetricRecorder


class _GuardedSpan:
Expand Down Expand Up @@ -79,8 +79,10 @@ def callback(response_future):
class OpenTelemetryClientInterceptor(
grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor
):
def __init__(self, tracer):
def __init__(self, tracer, meter):
self._tracer = tracer
self._meter = meter
self._metrics_recorder = TimedMetricRecorder(self._meter, "client")

def _start_span(self, method):
return self._tracer.start_as_current_span(
Expand Down Expand Up @@ -109,32 +111,56 @@ def _trace_result(self, guarded_span, rpc_info, result):
def _start_guarded_span(self, *args, **kwargs):
return _GuardedSpan(self._start_span(*args, **kwargs))

def _bytes_out_iterator_wrapper(self, iterator, client_info):
for request in iterator:
if "ByteSize" in dir(request):
self._metrics_recorder.record_bytes_out(
request.ByteSize(), client_info.full_method
)
yield request

def intercept_unary(self, request, metadata, client_info, invoker):
if not metadata:
mutable_metadata = OrderedDict()
else:
mutable_metadata = OrderedDict(metadata)

with self._start_guarded_span(client_info.full_method) as guarded_span:
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())

rpc_info = RpcInfo(
full_method=client_info.full_method,
metadata=metadata,
timeout=client_info.timeout,
request=request,
)

try:
result = invoker(request, metadata)
except grpc.RpcError as exc:
guarded_span.generated_span.set_status(
Status(StatusCanonicalCode(exc.code().value[0]))
with self._metrics_recorder.record_latency(
client_info.full_method
):
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())

# If protobuf is used, we can record the bytes in/out. Otherwise, we have no way
# to get the size of the request/response properly, so don't record anything
if "ByteSize" in dir(request):
self._metrics_recorder.record_bytes_out(
request.ByteSize(), client_info.full_method
)

rpc_info = RpcInfo(
full_method=client_info.full_method,
metadata=metadata,
timeout=client_info.timeout,
request=request,
)
raise

return self._trace_result(guarded_span, rpc_info, result)
try:
result = invoker(request, metadata)
except grpc.RpcError as exc:
guarded_span.generated_span.set_status(
Status(StatusCanonicalCode(exc.code().value[0]))
)
raise

ret = self._trace_result(guarded_span, rpc_info, result)

if "ByteSize" in dir(rpc_info.response):
self._metrics_recorder.record_bytes_in(
rpc_info.response.ByteSize(), client_info.full_method
)
return ret

# For RPCs that stream responses, the result can be a generator. To record
# the span across the generated responses and detect any errors, we wrap
Expand All @@ -148,25 +174,44 @@ def _intercept_server_stream(
mutable_metadata = OrderedDict(metadata)

with self._start_span(client_info.full_method) as span:
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())
rpc_info = RpcInfo(
full_method=client_info.full_method,
metadata=metadata,
timeout=client_info.timeout,
)
if client_info.is_client_stream:
rpc_info.request = request_or_iterator

try:
result = invoker(request_or_iterator, metadata)
for response in result:
yield response
except grpc.RpcError as exc:
span.set_status(
Status(StatusCanonicalCode(exc.code().value[0]))
with self._metrics_recorder.record_latency(
client_info.full_method
):
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())
rpc_info = RpcInfo(
full_method=client_info.full_method,
metadata=metadata,
timeout=client_info.timeout,
)
raise

if client_info.is_client_stream:
rpc_info.request = request_or_iterator
request_or_iterator = self._bytes_out_iterator_wrapper(
request_or_iterator, client_info
)
else:
if "ByteSize" in dir(request_or_iterator):
self._metrics_recorder.record_bytes_out(
request_or_iterator.ByteSize(),
client_info.full_method,
)

try:
result = invoker(request_or_iterator, metadata)

# Rewrap the result stream into a generator, and record the bytes received
for response in result:
if "ByteSize" in dir(response):
self._metrics_recorder.record_bytes_in(
response.ByteSize(), client_info.full_method
)
yield response
except grpc.RpcError as exc:
span.set_status(
Status(StatusCanonicalCode(exc.code().value[0]))
)
raise

def intercept_stream(
self, request_or_iterator, metadata, client_info, invoker
Expand All @@ -182,21 +227,36 @@ def intercept_stream(
mutable_metadata = OrderedDict(metadata)

with self._start_guarded_span(client_info.full_method) as guarded_span:
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())
rpc_info = RpcInfo(
full_method=client_info.full_method,
metadata=metadata,
timeout=client_info.timeout,
request=request_or_iterator,
)
with self._metrics_recorder.record_latency(
client_info.full_method
):
_inject_span_context(mutable_metadata)
metadata = tuple(mutable_metadata.items())
rpc_info = RpcInfo(
full_method=client_info.full_method,
metadata=metadata,
timeout=client_info.timeout,
request=request_or_iterator,
)

rpc_info.request = request_or_iterator

request_or_iterator = self._bytes_out_iterator_wrapper(
request_or_iterator, client_info
)

try:
result = invoker(request_or_iterator, metadata)
except grpc.RpcError as exc:
guarded_span.generated_span.set_status(
Status(StatusCanonicalCode(exc.code().value[0]))
)
raise

ret = self._trace_result(guarded_span, rpc_info, result)

try:
result = invoker(request_or_iterator, metadata)
except grpc.RpcError as exc:
guarded_span.generated_span.set_status(
Status(StatusCanonicalCode(exc.code().value[0]))
self._metrics_recorder.record_bytes_in(
rpc_info.response.ByteSize(), client_info.full_method
)
raise

return self._trace_result(guarded_span, rpc_info, result)
return ret
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@

"""Internal utilities."""

from contextlib import contextmanager
from time import time

import grpc

from opentelemetry.sdk.metrics import Counter, ValueRecorder


class RpcInfo:
def __init__(
Expand All @@ -31,3 +38,75 @@ def __init__(
self.request = request
self.response = response
self.error = error


class TimedMetricRecorder:
def __init__(self, meter, span_kind):
self._meter = meter
service_name = "grpcio"
self._span_kind = span_kind
base_attributes = ["method"]

if self._meter:
self._duration = self._meter.create_metric(
name="{}/{}/duration".format(service_name, span_kind),
description="Duration of grpc requests to the server",
unit="ms",
value_type=float,
metric_type=ValueRecorder,
label_keys=base_attributes + ["error", "status_code"],
)
self._error_count = self._meter.create_metric(
name="{}/{}/errors".format(service_name, span_kind),
description="Number of errors that were returned from the server",
unit="1",
value_type=int,
metric_type=Counter,
label_keys=base_attributes + ["status_code"],
)
self._bytes_in = self._meter.create_metric(
name="{}/{}/bytes_in".format(service_name, span_kind),
description="Number of bytes received from the server",
unit="by",
value_type=int,
metric_type=Counter,
label_keys=base_attributes,
)
self._bytes_out = self._meter.create_metric(
name="{}/{}/bytes_out".format(service_name, span_kind),
description="Number of bytes sent out through gRPC",
unit="by",
value_type=int,
metric_type=Counter,
label_keys=base_attributes,
)

def record_bytes_in(self, bytes_in, method):
if self._meter:
labels = {"method": method}
self._bytes_in.add(bytes_in, labels)

def record_bytes_out(self, bytes_out, method):
if self._meter:
labels = {"method": method}
self._bytes_out.add(bytes_out, labels)

@contextmanager
def record_latency(self, method):
start_time = time()
labels = {"method": method, "status_code": grpc.StatusCode.OK}
try:
yield labels
except grpc.RpcError as exc:
if self._meter:
# pylint: disable=no-member
labels["status_code"] = exc.code()
self._error_count.add(1, labels)
labels["error"] = True
raise
finally:
if self._meter:
if "error" not in labels:
labels["error"] = False
elapsed_time = (time() - start_time) * 1000
self._duration.record(elapsed_time, labels)
Loading

0 comments on commit dc79e3a

Please sign in to comment.