Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gRPC filter #1241

Merged
merged 48 commits into from
Sep 20, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
8bb3be5
fix typo in example codes
Aug 24, 2022
c960d22
add initial implementation of filters
Aug 24, 2022
b2e1546
Merge branch 'main' into grpc-filter
Aug 24, 2022
380d63e
add docstring to filter functions
Aug 24, 2022
059b076
update changelog to mention this pull request
Aug 24, 2022
ead1f61
revert unnecessary change and fix typos in changelog
Aug 25, 2022
9e20d21
Merge branch 'open-telemetry:main' into grpc-filter
Aug 25, 2022
d1aee36
ran tox -e generate
Aug 25, 2022
782213f
add filter option to auto instrumentors
Aug 25, 2022
a66b21f
add filter tests and fix filters
Aug 25, 2022
d7a1588
ran tox -e generate
Aug 25, 2022
48baf11
remove unused package
Aug 25, 2022
840f152
rename to meet lint conditions
Aug 25, 2022
9825288
fix variable names to meet lint criteria
Aug 25, 2022
45c94a2
Merge branch 'main' into grpc-filter
Aug 25, 2022
de75b24
fix filter propagation and add tests
Aug 25, 2022
e23d77d
fix lint
Aug 25, 2022
98a84f9
add composition filters and tests
Aug 25, 2022
7aa93c8
add comments about default filter value and its behavior
Aug 25, 2022
97faf49
fix function names
Aug 25, 2022
bb6fc2f
Merge branch 'main' into grpc-filter
Aug 25, 2022
49ab02e
add OTEL_PYTHON_GRPC_EXCLUDED_SERVICES support in global instrumentor
Aug 26, 2022
3e058cb
fix lint
Aug 26, 2022
3879fd2
Merge branch 'main' into grpc-filter
srikanthccv Aug 27, 2022
60a97a5
Merge branch 'main' into grpc-filter
Aug 30, 2022
5b9c5bf
Merge branch 'main' into grpc-filter
Sep 4, 2022
f7de43f
Update instrumentation/opentelemetry-instrumentation-grpc/src/opentel…
Sep 9, 2022
8f80844
Update instrumentation/opentelemetry-instrumentation-grpc/src/opentel…
Sep 9, 2022
b064a13
Update instrumentation/opentelemetry-instrumentation-grpc/src/opentel…
Sep 9, 2022
c2b62d1
Update instrumentation/opentelemetry-instrumentation-grpc/src/opentel…
Sep 9, 2022
3a5466e
Update instrumentation/opentelemetry-instrumentation-grpc/src/opentel…
Sep 9, 2022
5660969
Update instrumentation/opentelemetry-instrumentation-grpc/src/opentel…
Sep 9, 2022
d72156b
Update instrumentation/opentelemetry-instrumentation-grpc/src/opentel…
Sep 9, 2022
c625012
Merge branch 'main' into grpc-filter
Sep 9, 2022
39bc530
fix comments, function name and add type annotations
Sep 9, 2022
91649ba
tox -e lint and generate
Sep 9, 2022
fc9b0e6
fix filter condition to handle both environment variables and options
Sep 9, 2022
21c287c
tox -e generate
Sep 9, 2022
55a17f9
tox -e lint
Sep 9, 2022
c298fc1
Merge branch 'main' into grpc-filter
srikanthccv Sep 9, 2022
b4d6386
Merge branch 'main' into grpc-filter
Sep 12, 2022
fa42135
Merge branch 'main' into grpc-filter
Sep 13, 2022
4ad8547
change option name from filters to filter_
Sep 14, 2022
ff06ac3
Merge branch 'main' into grpc-filter
Sep 15, 2022
ebbf6ef
Merge branch 'main' into grpc-filter
srikanthccv Sep 15, 2022
9d89b75
Merge branch 'main' into grpc-filter
Sep 20, 2022
3dd81ce
fix filters' type annotations
Sep 20, 2022
ddbf0bd
lint: remove unused class
Sep 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- `opentelemetry-instrumentation-grpc` add supports to filter requests to instrument. ([#1241](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1241))

### Fixed

- `opentelemetry-instrumentation-boto3sqs` Make propagation compatible with other SQS instrumentations, add 'messaging.url' span attribute, and fix missing package dependencies.
Expand Down Expand Up @@ -107,7 +111,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- `opentelemetry-instrumentation-starlette` Capture custom request/response headers in span attributes
([#1046])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1046)
([#1046](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1046))

### Fixed
- Prune autoinstrumentation sitecustomize module directory from PYTHONPATH immediately
Expand All @@ -130,35 +134,35 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- `opentelemetry-instrumentation-fastapi` Capture custom request/response headers in span attributes
([#1032])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1032)
([#1032](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1032))
- `opentelemetry-instrumentation-django` Capture custom request/response headers in span attributes
([#1024])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1024)
([#1024](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1024))
- `opentelemetry-instrumentation-asgi` Capture custom request/response headers in span attributes
([#1004])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1004)
([#1004](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1004))
- `opentelemetry-instrumentation-psycopg2` extended the sql commenter support of dbapi into psycopg2
([#940](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/940))
- `opentelemetry-instrumentation-falcon` Add support for falcon==1.4.1
([#1000])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1000)
([#1000](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1000))
- `opentelemetry-instrumentation-falcon` Falcon: Capture custom request/response headers in span attributes
([#1003])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1003)
([#1003](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1003))
- `opentelemetry-instrumentation-elasticsearch` no longer creates unique span names by including search target, replaces them with `<target>` and puts the value in attribute `elasticsearch.target`
([#1018](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1018))
- `opentelemetry-instrumentation-pyramid` Handle non-HTTPException exceptions
([#1001](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1001))
- `opentelemetry-instrumentation-system-metrics` restore `SystemMetrics` instrumentation as `SystemMetricsInstrumentor`
([#1012](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1012))
- `opentelemetry-instrumentation-pyramid` Pyramid: Capture custom request/response headers in span attributes
([#1022])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1022)
([#1022](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1022))


## [1.10.0-0.29b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.10.0-0.29b0) - 2022-03-10

- `opentelemetry-instrumentation-wsgi` Capture custom request/response headers in span attributes
([#925])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/925)
([#925](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/925))
- `opentelemetry-instrumentation-flask` Flask: Capture custom request/response headers in span attributes
([#952])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/952)
([#952](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/952))
- `opentelemetry-instrumentation-tornado` Tornado: Capture custom request/response headers in span attributes
([#950])(https://github.com/open-telemetry/opentelemetry-python-contrib/pull/950)
([#950](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/950))

### Added

Expand Down Expand Up @@ -953,7 +957,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#572](https://github.com/open-telemetry/opentelemetry-python/pull/572))
- `opentelemetry-ext-sqlite3` Initial release
- `opentelemetry-ext-psycopg2` Implement instrumentor interface, enabling auto-instrumentation
([#694]https://github.com/open-telemetry/opentelemetry-python/pull/694)
([#694](https://github.com/open-telemetry/opentelemetry-python/pull/694))
- `opentelemetry-ext-asgi` Add ASGI middleware
([#716](https://github.com/open-telemetry/opentelemetry-python/pull/716))
- `opentelemetry-ext-django` Add exclude list for paths and hosts to prevent from tracing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,63 @@ def serve():
server = grpc.server(futures.ThreadPoolExecutor(),
interceptors = [server_interceptor()])

Filters
-------

If you prefer to filter specific requests to be instrumented, you can specify
the condition by assigning filters to instrumentors.

You can write a global server instrumentor as follows:

.. code-block::

from opentelemetry.instrumentation.grpc import filters, GrpcInstrumentorServer

grpc_server_instrumentor = GrpcInstrumentorServer(
filters = filters.any_of(
filters.method_name("SimpleMethod"),
filters.method_name("ComplexMethod"),
)
)
grpc_server_instrumentor.instrument()

You can also use the filter to manual instrumentors:
ymotongpoo marked this conversation as resolved.
Show resolved Hide resolved

.. code-block::

my_interceptor = server_interceptor(
filters = filters.reverse(filters.method_name("TestMethod"))
)
server = grpc.server(futures.ThreadPoolExecutor(),
interceptors = [my_interceptor])

``filters`` option also applies to both global and manual client intrumentors.


Environment variable
--------------------

If you'd like to exclude specific services for the instrumentations, you can use
``OTEL_PYTHON_GRPC_EXCLUDED_SERVICES`` environment variables.

For example, if you assign ``"GRPCTestServer,GRPCHealthServer"`` to the variable,
then the global interceptor automatically add the filter to exclude requests to
ymotongpoo marked this conversation as resolved.
Show resolved Hide resolved
services ``GRPCTestServer`` and ``GRPCHealthServer``.

"""
from typing import Collection
import os
from typing import Callable, Collection, List, Union

import grpc # pylint:disable=import-self
from wrapt import wrap_function_wrapper as _wrap

from opentelemetry import trace
from opentelemetry.instrumentation.grpc.filters import (
all_of,
any_of,
reverse,
service_name,
)
from opentelemetry.instrumentation.grpc.grpcext import intercept_channel
from opentelemetry.instrumentation.grpc.package import _instruments
from opentelemetry.instrumentation.grpc.version import __version__
Expand All @@ -145,10 +195,26 @@ class GrpcInstrumentorServer(BaseInstrumentor):
grpc_server_instrumentor = GrpcInstrumentorServer()
grpc_server_instrumentor.instrument()

If you want to add filters that only intercept requests
to match the condition, assign filters option to GrpcInstrumentorServer.
ymotongpoo marked this conversation as resolved.
Show resolved Hide resolved

grpc_server_instrumentor = GrpcInstrumentorServer(
filters=filters.method_prefix("SimpleMethod"))
grpc_server_instrumentor.instrument()

"""

# pylint:disable=attribute-defined-outside-init, redefined-outer-name

def __init__(self, filters=None):
ymotongpoo marked this conversation as resolved.
Show resolved Hide resolved
excluded_service_filter = _excluded_service_filter()
if excluded_service_filter is not None:
if filters is None:
filters = excluded_service_filter
else:
filters = all_of(filters, excluded_service_filter)
ymotongpoo marked this conversation as resolved.
Show resolved Hide resolved
self._filters = filters

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

Expand All @@ -160,11 +226,16 @@ def server(*args, **kwargs):
if "interceptors" in kwargs:
# add our interceptor as the first
kwargs["interceptors"].insert(
0, server_interceptor(tracer_provider=tracer_provider)
0,
server_interceptor(
tracer_provider=tracer_provider, filters=self._filters
),
)
else:
kwargs["interceptors"] = [
server_interceptor(tracer_provider=tracer_provider)
server_interceptor(
tracer_provider=tracer_provider, filters=self._filters
)
]
return self._original_func(*args, **kwargs)

Expand All @@ -183,8 +254,25 @@ class GrpcInstrumentorClient(BaseInstrumentor):
grpc_client_instrumentor = GrpcInstrumentorClient()
grpc_client_instrumentor.instrument()

If you want to add filters that only intercept requests
ymotongpoo marked this conversation as resolved.
Show resolved Hide resolved
to match the condition, assign filters option to GrpcInstrumentorClient.

grpc_client_instrumentor = GrpcInstrumentorClient(
filter=filters.reverse(filters.health_check())
)
grpc_client_instrumentor.instrument()

"""

def __init__(self, filters=None):
excluded_service_filter = _excluded_service_filter()
if excluded_service_filter is not None:
if filters is None:
filters = excluded_service_filter
else:
filters = all_of(filters, excluded_service_filter)
self._filters = filters

# Figures out which channel type we need to wrap
def _which_channel(self, kwargs):
# handle legacy argument
Expand Down Expand Up @@ -221,37 +309,70 @@ def wrapper_fn(self, original_func, instance, args, kwargs):
tracer_provider = kwargs.get("tracer_provider")
return intercept_channel(
channel,
client_interceptor(tracer_provider=tracer_provider),
client_interceptor(
tracer_provider=tracer_provider,
filters=self._filters,
),
)


def client_interceptor(tracer_provider=None):
def client_interceptor(tracer_provider=None, filters=None):
"""Create a gRPC client channel interceptor.

Args:
tracer: The tracer to use to create client-side spans.

filters: filter function that returns True if gRPC requests
matches the condition. Default is None and intercept
all requests.

Returns:
An invocation-side interceptor object.
"""
from . import _client

tracer = trace.get_tracer(__name__, __version__, tracer_provider)

return _client.OpenTelemetryClientInterceptor(tracer)
return _client.OpenTelemetryClientInterceptor(tracer, filters=filters)


def server_interceptor(tracer_provider=None):
def server_interceptor(tracer_provider=None, filters=None):
"""Create a gRPC server interceptor.

Args:
tracer: The tracer to use to create server-side spans.

filters: filter function that returns True if gRPC requests
matches the condition. Default is None and intercept
all requests.

Returns:
A service-side interceptor object.
"""
from . import _server

tracer = trace.get_tracer(__name__, __version__, tracer_provider)

return _server.OpenTelemetryServerInterceptor(tracer)
return _server.OpenTelemetryServerInterceptor(tracer, filters=filters)


def _excluded_service_filter() -> Union[Callable[[object], bool], None]:
services = _parse_services(
os.environ.get("OTEL_PYTHON_GRPC_EXCLUDED_SERVICES", "")
)
if len(services) == 0:
return None
filters = []
for srv in services:
filters.append(service_name(srv))
ymotongpoo marked this conversation as resolved.
Show resolved Hide resolved
return reverse(any_of(*filters))


def _parse_services(excluded_services: str) -> List[str]:
if excluded_services != "":
excluded_service_list = [
s.strip() for s in excluded_services.split(",")
]
else:
excluded_service_list = []
return excluded_service_list
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ def callback(response_future):
class OpenTelemetryClientInterceptor(
grpcext.UnaryClientInterceptor, grpcext.StreamClientInterceptor
):
def __init__(self, tracer):
def __init__(self, tracer, filters=None):
self._tracer = tracer
self._filter = filters

def _start_span(self, method, **kwargs):
service, meth = method.lstrip("/").split("/", 1)
Expand Down Expand Up @@ -148,6 +149,8 @@ def _intercept(self, request, metadata, client_info, invoker):
return self._trace_result(span, rpc_info, result)

def intercept_unary(self, request, metadata, client_info, invoker):
if self._filter is not None and not self._filter(client_info):
return invoker(request, metadata)
return self._intercept(request, metadata, client_info, invoker)

# For RPCs that stream responses, the result can be a generator. To record
Expand Down Expand Up @@ -188,6 +191,9 @@ def intercept_stream(
if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return invoker(request_or_iterator, metadata)

if self._filter is not None and not self._filter(client_info):
return invoker(request_or_iterator, metadata)

if client_info.is_server_stream:
return self._intercept_server_stream(
request_or_iterator, metadata, client_info, invoker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,10 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor):
Usage::

tracer = some OpenTelemetry tracer
filter = filters.reverse(filters.method_name("service.Foo"))

interceptors = [
OpenTelemetryServerInterceptor(tracer),
OpenTelemetryServerInterceptor(tracer, filter),
]

server = grpc.server(
Expand All @@ -183,8 +184,9 @@ class OpenTelemetryServerInterceptor(grpc.ServerInterceptor):

"""

def __init__(self, tracer):
def __init__(self, tracer, filters=None):
self._tracer = tracer
self._filter = filters

@contextmanager
def _set_remote_context(self, servicer_context):
Expand Down Expand Up @@ -259,6 +261,9 @@ def _start_span(
)

def intercept_service(self, continuation, handler_call_details):
if self._filter is not None and not self._filter(handler_call_details):
return continuation(handler_call_details)

def telemetry_wrapper(behavior, request_streaming, response_streaming):
def telemetry_interceptor(request_or_iterator, context):

Expand Down
Loading