Skip to content

Commit

Permalink
Add scheduler span
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu committed Jul 2, 2024
1 parent 0ee007f commit 11dfc9c
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 6 deletions.
20 changes: 20 additions & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@
import typing
from typing import Any, Callable, Iterable, Optional

from opentelemetry import trace
from opentelemetry.trace.propagation import set_span_in_context

if typing.TYPE_CHECKING: # pragma: NO COVER
from google.cloud.pubsub_v1 import subscriber


_LOGGER = logging.getLogger(__name__)

_OPEN_TELEMETRY_TRACER_NAME = "google.cloud.pubsub_v1.subscriber"
"""Open Telemetry Instrumenting module name."""


class MessagesOnHold(object):
"""Tracks messages on hold by ordering key. Not thread-safe."""
Expand All @@ -48,6 +54,8 @@ def __init__(self):
# flight, but there are no pending messages.
self._pending_ordered_messages = {}

self._tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)

@property
def size(self) -> int:
"""Return the number of messages on hold across ordered and unordered messages.
Expand Down Expand Up @@ -100,6 +108,18 @@ def put(self, message: "subscriber.message.Message") -> None:
Args:
message: The message to put on hold.
"""
if message.open_telemetry_data:
with self._tracer.start_as_current_span(
name="subscriber scheduler",
kind=trace.SpanKind.INTERNAL,
context=set_span_in_context(
message.open_telemetry_data.subscribe_span
if message.open_telemetry_data.subscribe_span
else None
),
end_on_exit=False,
) as scheduler_span:
message.open_telemetry_data.scheduler_span = scheduler_span
self._messages_on_hold.append(message)
self._size = self._size + 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ def _wrap_callback_errors(
message: The Pub/Sub message.
"""
try:
if (
message.open_telemetry_data()
and message.open_telemetry_data().concurrrency_control_span
):
message.open_telemetry_data().concurrrency_control_span.end()
if message.open_telemetry_data():
if message.open_telemetry_data().concurrrency_control_span:
message.open_telemetry_data().concurrrency_control_span.end()
if message.open_telemetry_data().scheduler_span:
message.open_telemetry_data().scheduler_span.end()
callback(message)
except BaseException as exc:
# Note: the likelihood of this failing is extremely low. This just adds
Expand Down
33 changes: 32 additions & 1 deletion tests/unit/pubsub_v1/subscriber/test_messages_on_hold.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,32 @@
# limitations under the License.

import queue
import sys

from google.cloud.pubsub_v1.subscriber import message
from google.cloud.pubsub_v1.subscriber._protocol import messages_on_hold
from google.pubsub_v1 import types as gapic_types
from google.cloud.pubsub_v1.opentelemetry.subscribe_spans_data import OpenTelemetryData

from opentelemetry.trace.span import SpanContext
from opentelemetry import trace

def make_message(ack_id, ordering_key):
# special case python < 3.8
if sys.version_info.major == 3 and sys.version_info.minor < 8:
import mock
else:
from unittest import mock


def make_message(ack_id, ordering_key, open_telemetry_data=None):
proto_msg = gapic_types.PubsubMessage(data=b"Q", ordering_key=ordering_key)
return message.Message(
proto_msg._pb,
ack_id,
0,
queue.Queue(),
exactly_once_delivery_enabled_func=lambda: False, # pragma: NO COVER
open_telemetry_data=open_telemetry_data,
)


Expand All @@ -37,6 +49,25 @@ def test_init():
assert moh.get() is None


def test_put_otel():
moh = messages_on_hold.MessagesOnHold()
subscribe_span = mock.Mock(spec=SpanContext)
msg = make_message(
ack_id="ack_id1",
ordering_key="key1",
open_telemetry_data=OpenTelemetryData(
subscribe_span=subscribe_span,
),
)
moh.put(msg)

scheduler_span = msg.open_telemetry_data.scheduler_span
assert scheduler_span is not None
assert scheduler_span.name == "subscriber scheduler"
assert scheduler_span.kind == trace.SpanKind.INTERNAL
assert scheduler_span.is_recording()


def test_put_and_get_unordered_messages():
moh = messages_on_hold.MessagesOnHold()

Expand Down

0 comments on commit 11dfc9c

Please sign in to comment.