Skip to content

Commit

Permalink
Modify subscriber Message to contain OpenTelemetryData
Browse files Browse the repository at this point in the history
* Add the subscribe span to OpenTelemetryData
  • Loading branch information
mukund-ananthu committed Jul 1, 2024
1 parent 81a96b4 commit 9c0ac12
Show file tree
Hide file tree
Showing 10 changed files with 481 additions and 34 deletions.
22 changes: 22 additions & 0 deletions google/cloud/pubsub_v1/opentelemetry/subscribe_spans_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from dataclasses import dataclass
from typing import Optional

from opentelemetry.trace.span import Span


@dataclass
class OpenTelemetryData:
"""
This class is for internal use by the library only.
Contains Open Telmetry data associated with a
google.cloud.pubsub_v1.subscriber.message.Message. Specifically it contains
the subscriber side spans associated with the message.
This is so that the subsriber side spans can be ended by the library
after receiving the message back via an ack(), ack_with_response(), nack().
"""

subscribe_span: Optional[Span] = None
concurrrency_control_span: Optional[Span] = None
scheduler_span: Optional[Span] = None
process_span: Optional[Span] = None
72 changes: 68 additions & 4 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ def __init__(self, manager: "StreamingPullManager", queue: "queue.Queue"):
self._thread: Optional[threading.Thread] = None
self._operational_lock = threading.Lock()

@property
def manager(self) -> "StreamingPullManager":
"""Returns the Streaming Pull Manager associated with this dispatcher
instance.
"""
return self._manager

def start(self) -> None:
"""Start a thread to dispatch requests queued up by callbacks.
Expand Down Expand Up @@ -179,7 +186,21 @@ def dispatch_callback(self, items: Sequence[RequestItem]) -> None:
self.lease(lease_requests)

if modack_requests:
for req in modack_requests:
if (
req
and req.open_telemetry_data
and req.open_telemetry_data.subscribe_span
):
req.open_telemetry_data.subscribe_span.add_event("modack start")
self.modify_ack_deadline(modack_requests)
for req in modack_requests:
if (
req
and req.open_telemetry_data
and req.open_telemetry_data.subscribe_span
):
req.open_telemetry_data.subscribe_span.add_event("modack end")

# Note: Drop and ack *must* be after lease. It's possible to get both
# the lease and the ack/drop request in the same batch.
Expand Down Expand Up @@ -243,6 +264,20 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None:
ack_reqs_dict=ack_reqs_dict,
)

if self._manager.open_telemetry_enabled:
for completed_ack in requests_completed:
if completed_ack.open_telemetry_data:
subscribe_span = (
completed_ack.open_telemetry_data.subscribe_span
)
if subscribe_span:
subscribe_span.set_attribute(
key="messaging.gcp_pubsub.result",
value="ack",
)
subscribe_span.add_event("ack end")
subscribe_span.end()

# Remove the completed messages from lease management.
self.drop(requests_completed)

Expand Down Expand Up @@ -289,6 +324,20 @@ def _retry_acks(self, requests_to_retry):
assert (
len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE
), "Too many requests to be retried."

if self._manager.open_telemetry_enabled:
for completed_ack in requests_completed:
if completed_ack.open_telemetry_data:
subscribe_span = (
completed_ack.open_telemetry_data.subscribe_span
)
if subscribe_span:
subscribe_span.set_attribute(
key="messaging.gcp_pubsub.result",
value="ack",
)
subscribe_span.add_event("ack end")
subscribe_span.end()
# Remove the completed messages from lease management.
self.drop(requests_completed)

Expand Down Expand Up @@ -342,9 +391,10 @@ def modify_ack_deadline(
for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE)
}
requests_to_retry: List[requests.ModAckRequest]
requests_completed: List[requests.ModAckRequest]
if default_deadline is None:
# no further work needs to be done for `requests_to_retry`
_, requests_to_retry = self._manager.send_unary_modack(
requests_completed, requests_to_retry = self._manager.send_unary_modack(
modify_deadline_ack_ids=list(
itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)
),
Expand All @@ -355,7 +405,7 @@ def modify_ack_deadline(
default_deadline=None,
)
else:
_, requests_to_retry = self._manager.send_unary_modack(
requests_completed, requests_to_retry = self._manager.send_unary_modack(
modify_deadline_ack_ids=itertools.islice(
ack_ids_gen, _ACK_IDS_BATCH_SIZE
),
Expand All @@ -375,7 +425,7 @@ def modify_ack_deadline(
functools.partial(self._retry_modacks, requests_to_retry),
)

def _retry_modacks(self, requests_to_retry):
def _retry_modacks(self, requests_to_retry: List[requests.ModAckRequest]):
retry_delay_gen = exponential_sleep_generator(
initial=_MIN_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
maximum=_MAX_EXACTLY_ONCE_DELIVERY_ACK_MODACK_RETRY_DURATION_SECS,
Expand Down Expand Up @@ -405,11 +455,25 @@ def nack(self, items: Sequence[requests.NackRequest]) -> None:
self.modify_ack_deadline(
[
requests.ModAckRequest(
ack_id=item.ack_id, seconds=0, future=item.future
ack_id=item.ack_id,
seconds=0,
future=item.future,
open_telemetry_data=item.open_telemetry_data,
)
for item in items
]
)
if self._manager.open_telemetry_enabled:
for item in items:
if item.open_telemetry_data:
subscribe_span = item.open_telemetry_data.subscribe_span
if subscribe_span:
subscribe_span.set_attribute(
key="messaging.gcp_pubsub.result",
value="nack",
)
subscribe_span.add_event("nack end")

self.drop(
[
requests.DropRequest(
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import typing
from typing import NamedTuple, Optional

from google.cloud.pubsub_v1.opentelemetry.subscribe_spans_data import OpenTelemetryData

if typing.TYPE_CHECKING: # pragma: NO COVER
from google.cloud.pubsub_v1.subscriber import futures

Expand All @@ -27,6 +29,7 @@ class AckRequest(NamedTuple):
time_to_ack: float
ordering_key: Optional[str]
future: Optional["futures.Future"]
open_telemetry_data: Optional[OpenTelemetryData] = None


class DropRequest(NamedTuple):
Expand All @@ -45,10 +48,12 @@ class ModAckRequest(NamedTuple):
ack_id: str
seconds: float
future: Optional["futures.Future"]
open_telemetry_data: Optional[OpenTelemetryData] = None


class NackRequest(NamedTuple):
ack_id: str
byte_size: int
ordering_key: Optional[str]
future: Optional["futures.Future"]
open_telemetry_data: Optional[OpenTelemetryData] = None
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import typing
from typing import Any, Dict, Callable, Iterable, List, Optional, Set, Tuple
import uuid
import sys

import grpc # type: ignore

from opentelemetry import trace
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from google.cloud.pubsub_v1.opentelemetry.subscribe_spans_data import OpenTelemetryData

from google.api_core import bidi
from google.api_core import exceptions
Expand Down Expand Up @@ -281,6 +281,7 @@ def __init__(
await_callbacks_on_shutdown: bool = False,
):
self._client = client
self._open_telemetry_enabled = client.open_telemetry_enabled
self._subscription = subscription
self._exactly_once_enabled = False
self._flow_control = flow_control
Expand Down Expand Up @@ -375,6 +376,11 @@ def __init__(
self._consumer: Optional[bidi.BackgroundConsumer] = None
self._heartbeater: Optional[heartbeater.Heartbeater] = None

@property
def open_telemetry_enabled(self) -> bool:
"""Whether open telemetry is enabled."""
return self._open_telemetry_enabled

@property
def is_active(self) -> bool:
"""``True`` if this manager is actively streaming.
Expand Down Expand Up @@ -1084,7 +1090,8 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
# protobuf message to significantly gain on attribute access performance.
received_messages = response._pb.received_messages

if self._client._open_telemetry_enabled:
subscribe_spans = []
if self.open_telemetry_enabled():
tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME)
for received_message in response.received_messages:
parent_span_context = TraceContextTextMapPropagator().extract(
Expand All @@ -1100,7 +1107,7 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
"messaging.destination.name": self._subscription,
"gcp.project_id": self._subscription.split("/")[1],
"messaging.message.id": received_message.message.message_id,
"messaging.message.body.size": sys.getsizeof(
"messaging.message.body.size": len(
received_message.message.data
),
"messaging.gcp_pubsub.message.ack_id": received_message.ack_id,
Expand All @@ -1111,7 +1118,7 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
},
end_on_exit=False,
) as subscribe_span:
pass
subscribe_spans.append(subscribe_span)

_LOGGER.debug(
"Processing %s received message(s), currently on hold %s (bytes %s).",
Expand All @@ -1136,19 +1143,22 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
# Immediately (i.e. without waiting for the auto lease management)
# modack the messages we received, as this tells the server that we've
# received them.
if self._client._open_telemetry_enabled:
subscribe_span.add_event(name="modack start")
if self.open_telemetry_enabled():
for subscribe_span in subscribe_spans:
subscribe_span.add_event("modack start")
ack_id_gen = (message.ack_id for message in received_messages)
expired_ack_ids = self._send_lease_modacks(
ack_id_gen, self.ack_deadline, warn_on_invalid=False
)
if self._client._open_telemetry_enabled:
subscribe_span.add_event(name="modack end")
if self.open_telemetry_enabled():
for subscribe_span in subscribe_spans:
subscribe_span.add_event("modack end")

with self._pause_resume_lock:
assert self._scheduler is not None
assert self._leaser is not None

i = 0
for received_message in received_messages:
if (
not self._exactly_once_delivery_enabled()
Expand All @@ -1161,6 +1171,10 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
self._scheduler.queue,
self._exactly_once_delivery_enabled,
)
if self.open_telemetry_enabled():
message._open_telemetry_data = OpenTelemetryData(
subscribe_span=subscribe_spans[i],
)
self._messages_on_hold.put(message)
self._on_hold_bytes += message.size
req = requests.LeaseRequest(
Expand All @@ -1169,6 +1183,7 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None:
ordering_key=message.ordering_key,
)
self._leaser.add([req])
i = i + 1

self._maybe_release_messages()

Expand Down
5 changes: 5 additions & 0 deletions google/cloud/pubsub_v1/subscriber/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ def from_service_account_file( # type: ignore[override]

from_service_account_json = from_service_account_file # type: ignore[assignment]

@property
def open_telemetry_enabled(self) -> bool:
"""Returns whether Open Telemetry is enabled for the subscriber client."""
return self._open_telemetry_enabled

@property
def target(self) -> str:
"""Return the target (where the API is).
Expand Down
Loading

0 comments on commit 9c0ac12

Please sign in to comment.