Skip to content

Commit

Permalink
Add subscriber concurrency control span
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Jul 2, 2024
1 parent 9c0ac12 commit 0ee007f
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ class OpenTelemetryData:
"""

subscribe_span: Optional[Span] = None
concurrrency_control_span: Optional[Span] = None
concurrency_control_span: Optional[Span] = None
scheduler_span: Optional[Span] = None
process_span: Optional[Span] = None
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from opentelemetry import trace
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from google.cloud.pubsub_v1.opentelemetry.subscribe_spans_data import OpenTelemetryData
from opentelemetry.trace.propagation import set_span_in_context

from google.api_core import bidi
from google.api_core import exceptions
Expand Down Expand Up @@ -132,6 +133,11 @@ def _wrap_callback_errors(
message: The Pub/Sub message.
"""
try:
if (
message.open_telemetry_data()
and message.open_telemetry_data().concurrrency_control_span
):
message.open_telemetry_data().concurrrency_control_span.end()
callback(message)
except BaseException as exc:
# Note: the likelihood of this failing is extremely low. This just adds
Expand Down Expand Up @@ -633,6 +639,21 @@ def _schedule_message_on_hold(
)
assert self._scheduler is not None
assert self._callback is not None
if (
self.open_telemetry_enabled()
and msg.open_telemetry_data
and msg.open_telemetry_data.subscribe_span
):
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
with tracer.start_as_current_span(
name="subscriber concurrency control",
kind=trace.SpanKind.INTERNAL,
context=set_span_in_context(msg.open_telemetry_data.subscribe_span),
end_on_exit=False,
) as concurrency_control_span:
msg.open_telemetry_data.concurrency_control_span = (
concurrency_control_span
)
self._scheduler.schedule(self._callback, msg)

def send_unary_ack(
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/pubsub_v1/subscriber/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ def delivery_attempt(self) -> Optional[int]:
"""
return self._delivery_attempt

@property
def open_telemetry_data(self) -> Optional[OpenTelemetryData]:
return self._open_telemetry_data

def ack(self) -> None:
"""Acknowledge the given message.
Expand Down
61 changes: 13 additions & 48 deletions tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
from google.rpc import code_pb2
from google.rpc import error_details_pb2

from opentelemetry import trace


@pytest.mark.parametrize(
"exception,expected_cls",
Expand All @@ -67,6 +65,19 @@ def test__wrap_as_exception(exception, expected_cls):
)


def test__wrap_callback_errors_no_error_otel():
msg = mock.Mock()
msg.open_telemetry_data = mock.Mock(return_value=None)
callback = mock.Mock()
on_callback_error = mock.Mock()

streaming_pull_manager._wrap_callback_errors(callback, on_callback_error, msg)

callback.assert_called_once_with(msg)
msg.nack.assert_not_called()
on_callback_error.assert_not_called()


def test__wrap_callback_errors_no_error():
msg = mock.create_autospec(message.Message, instance=True)
callback = mock.Mock()
Expand Down Expand Up @@ -1526,52 +1537,6 @@ def test__on_response_mod_ack_otel(span_exporter):
# Subscribe span would still be active, hence would not be exported.
spans = span_exporter.get_finished_spans()
assert len(spans) == 0
mock_get_tracer.assert_called_once_with("google.cloud.pubsub_v1.subscriber")

mock_tracer.start_as_current_span.assert_has_calls(
[
mock.call(
name="projects/projectID/subscriptions/subscriptionID subscribe",
kind=trace.SpanKind.CONSUMER,
context=None,
attributes={
"messaging.system": "gcp_pubsub",
"messaging.destination.name": "projects/projectID/subscriptions/subscriptionID",
"gcp.project_id": "projectID",
"messaging.message.id": "1",
"messaging.message.body.size": 3,
"messaging.gcp_pubsub.message.ack_id": "ack_1",
"messaging.gcp_pubsub.message.ordering_key": "",
"messaging.gcp_pubsub.message.exactly_once_delivery": False,
"code.function": "_on_response",
"messaging.gcp_pubsub.message.delivery_attempt": 1,
},
end_on_exit=False,
),
mock.call().__enter__(),
mock.call().__exit__(None, None, None),
mock.call(
name="projects/projectID/subscriptions/subscriptionID subscribe",
kind=trace.SpanKind.CONSUMER,
context=None,
attributes={
"messaging.system": "gcp_pubsub",
"messaging.destination.name": "projects/projectID/subscriptions/subscriptionID",
"gcp.project_id": "projectID",
"messaging.message.id": "2",
"messaging.message.body.size": 3,
"messaging.gcp_pubsub.message.ack_id": "ack_2",
"messaging.gcp_pubsub.message.ordering_key": "",
"messaging.gcp_pubsub.message.exactly_once_delivery": False,
"code.function": "_on_response",
"messaging.gcp_pubsub.message.delivery_attempt": 1,
},
end_on_exit=False,
),
mock.call().__enter__(),
mock.call().__exit__(None, None, None),
],
)


def test__on_response_modifies_ack_deadline():
Expand Down

0 comments on commit 0ee007f

Please sign in to comment.