diff --git a/google/cloud/pubsub_v1/open_telemetry/__init__.py b/google/cloud/pubsub_v1/open_telemetry/__init__.py new file mode 100644 index 000000000..e88bb5dbb --- /dev/null +++ b/google/cloud/pubsub_v1/open_telemetry/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2024, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/google/cloud/pubsub_v1/open_telemetry/context_propagation.py b/google/cloud/pubsub_v1/open_telemetry/context_propagation.py new file mode 100644 index 000000000..37fad3e20 --- /dev/null +++ b/google/cloud/pubsub_v1/open_telemetry/context_propagation.py @@ -0,0 +1,39 @@ +# Copyright 2024, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from opentelemetry.propagators.textmap import Setter + +from google.pubsub_v1 import PubsubMessage + + +class OpenTelemetryContextSetter(Setter): + """ + Used by Open Telemetry for context propagation. + """ + + def set(self, carrier: PubsubMessage, key: str, value: str) -> None: + """ + Injects trace context into Pub/Sub message attributes with + "googclient_" prefix. + + Args: + carrier(PubsubMessage): The Pub/Sub message which is the carrier of Open Telemetry + data. + key(str): The key for which the Open Telemetry context data needs to be set. + value(str): The Open Telemetry context value to be set. + + Returns: + None + """ + carrier.attributes["googclient_" + key] = value diff --git a/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py new file mode 100644 index 000000000..e03a8f800 --- /dev/null +++ b/google/cloud/pubsub_v1/open_telemetry/publish_message_wrapper.py @@ -0,0 +1,142 @@ +# Copyright 2017, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +from datetime import datetime +from typing import Optional + +from opentelemetry import trace +from opentelemetry.trace.propagation import set_span_in_context +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator + +from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.context_propagation import ( + OpenTelemetryContextSetter, +) + + +class PublishMessageWrapper: + _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1" + _OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" + _OPEN_TELEMETRY_PUBLISHER_BATCHING = "publisher batching" + + _PUBLISH_START_EVENT: str = "publish start" + _PUBLISH_FLOW_CONTROL: str = "publisher flow control" + + def __init__(self, message: gapic_types.PubsubMessage): + self._message: gapic_types.PubsubMessage = message + self._create_span: Optional[trace.Span] = None + self._flow_control_span: Optional[trace.Span] = None + self._batching_span: Optional[trace.Span] = None + + @property + def message(self): + return self._message + + @message.setter # type: ignore[no-redef] # resetting message value is intentional here + def message(self, message: gapic_types.PubsubMessage): + self._message = message + + @property + def create_span(self): + return self._create_span + + def __eq__(self, other): # pragma: NO COVER + """Used for pytest asserts to compare two PublishMessageWrapper objects with the same message""" + if isinstance(self, other.__class__): + return self.message == other.message + return False + + def start_create_span(self, topic: str, ordering_key: str) -> None: + tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) + assert len(topic.split("/")) == 4 + topic_short_name = topic.split("/")[3] + with tracer.start_as_current_span( + name=f"{topic_short_name} create", + attributes={ + "messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM, + "messaging.destination.name": topic_short_name, + "code.function": "publish", + "messaging.gcp_pubsub.message.ordering_key": ordering_key, + "messaging.operation": "create", + "gcp.project_id": topic.split("/")[1], + "messaging.message.body.size": sys.getsizeof( + self._message.data + ), # sys.getsizeof() used since the attribute expects size of message body in bytes + }, + kind=trace.SpanKind.PRODUCER, + end_on_exit=False, + ) as create_span: + create_span.add_event( + name=self._PUBLISH_START_EVENT, + attributes={ + "timestamp": str(datetime.now()), + }, + ) + self._create_span = create_span + TraceContextTextMapPropagator().inject( + carrier=self._message, + setter=OpenTelemetryContextSetter(), + ) + + def end_create_span(self, exc: Optional[BaseException] = None) -> None: + assert self._create_span is not None + if exc: + self._create_span.record_exception(exception=exc) + self._create_span.set_status( + trace.Status(status_code=trace.StatusCode.ERROR) + ) + self._create_span.end() + + def start_publisher_flow_control_span(self) -> None: + tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) + assert self._create_span is not None + with tracer.start_as_current_span( + name=self._PUBLISH_FLOW_CONTROL, + kind=trace.SpanKind.INTERNAL, + context=set_span_in_context(self._create_span), + end_on_exit=False, + ) as flow_control_span: + self._flow_control_span = flow_control_span + + def end_publisher_flow_control_span( + self, exc: Optional[BaseException] = None + ) -> None: + assert self._flow_control_span is not None + if exc: + self._flow_control_span.record_exception(exception=exc) + self._flow_control_span.set_status( + trace.Status(status_code=trace.StatusCode.ERROR) + ) + self._flow_control_span.end() + + def start_publisher_batching_span(self) -> None: + assert self._create_span is not None + tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) + with tracer.start_as_current_span( + name=self._OPEN_TELEMETRY_PUBLISHER_BATCHING, + kind=trace.SpanKind.INTERNAL, + context=set_span_in_context(self._create_span), + end_on_exit=False, + ) as batching_span: + self._batching_span = batching_span + + def end_publisher_batching_span(self, exc: Optional[BaseException] = None) -> None: + assert self._batching_span is not None + if exc: + self._batching_span.record_exception(exception=exc) + self._batching_span.set_status( + trace.Status(status_code=trace.StatusCode.ERROR) + ) + self._batching_span.end() diff --git a/google/cloud/pubsub_v1/publisher/_batch/base.py b/google/cloud/pubsub_v1/publisher/_batch/base.py index 52505996b..c91e0a444 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/base.py +++ b/google/cloud/pubsub_v1/publisher/_batch/base.py @@ -19,6 +19,10 @@ import typing from typing import Optional, Sequence +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) + if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud import pubsub_v1 @@ -54,7 +58,7 @@ class Batch(metaclass=abc.ABCMeta): def __len__(self): """Return the number of messages currently in the batch.""" - return len(self.messages) + return len(self.message_wrappers) @staticmethod @abc.abstractmethod @@ -68,7 +72,7 @@ def make_lock(): # pragma: NO COVER @property @abc.abstractmethod - def messages(self) -> Sequence["gapic_types.PubsubMessage"]: # pragma: NO COVER + def message_wrappers(self) -> Sequence[PublishMessageWrapper]: # pragma: NO COVER """Return the messages currently in the batch. Returns: diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index 1617f8c90..c4bf67c35 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -19,13 +19,19 @@ import time import typing from typing import Any, Callable, List, Optional, Sequence +from datetime import datetime +from opentelemetry import trace import google.api_core.exceptions from google.api_core import gapic_v1 + from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher import futures from google.cloud.pubsub_v1.publisher._batch import base from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud import pubsub_v1 @@ -85,6 +91,9 @@ class Batch(base.Batch): timeout is used. """ + _OPEN_TELEMETRY_TRACER_NAME: str = "google.cloud.pubsub_v1" + _OPEN_TELEMETRY_MESSAGING_SYSTEM: str = "gcp_pubsub" + def __init__( self, client: "PublisherClient", @@ -108,7 +117,7 @@ def __init__( # status changed from ACCEPTING_MESSAGES to any other # in order to avoid race conditions self._futures: List[futures.Future] = [] - self._messages: List[gapic_types.PubsubMessage] = [] + self._message_wrappers: List[PublishMessageWrapper] = [] self._status = base.BatchStatus.ACCEPTING_MESSAGES # The initial size is not zero, we need to account for the size overhead @@ -119,6 +128,10 @@ def __init__( self._commit_retry = commit_retry self._commit_timeout = commit_timeout + # Publish RPC Span that will be set by method `_start_publish_rpc_span` + # if Open Telemetry is enabled. + self._rpc_span: Optional[trace.Span] = None + @staticmethod def make_lock() -> threading.Lock: """Return a threading lock. @@ -134,9 +147,9 @@ def client(self) -> "PublisherClient": return self._client @property - def messages(self) -> Sequence[gapic_types.PubsubMessage]: - """The messages currently in the batch.""" - return self._messages + def message_wrappers(self) -> Sequence[PublishMessageWrapper]: + """The message wrappers currently in the batch.""" + return self._message_wrappers @property def settings(self) -> "types.BatchSettings": @@ -226,6 +239,38 @@ def _start_commit_thread(self) -> None: ) commit_thread.start() + def _start_publish_rpc_span(self) -> None: + tracer = trace.get_tracer(self._OPEN_TELEMETRY_TRACER_NAME) + links = [] + + for wrapper in self._message_wrappers: + span = wrapper.create_span + # Add links only for sampled spans. + if span.get_span_context().trace_flags.sampled: + links.append(trace.Link(span.get_span_context())) + assert len(self._topic.split("/")) == 4 + topic_short_name = self._topic.split("/")[3] + with tracer.start_as_current_span( + name=f"{topic_short_name} publish", + attributes={ + "messaging.system": self._OPEN_TELEMETRY_MESSAGING_SYSTEM, + "messaging.destination.name": topic_short_name, + "gcp.project_id": self._topic.split("/")[1], + "messaging.batch.message_count": len(self._message_wrappers), + "messaging.operation": "publish", + "code.function": "_commit", + }, + links=links, + kind=trace.SpanKind.CLIENT, + end_on_exit=False, + ) as rpc_span: + ctx = rpc_span.get_span_context() + for wrapper in self._message_wrappers: + span = wrapper.create_span + if span.get_span_context().trace_flags.sampled: + span.add_link(ctx) + self._rpc_span = rpc_span + def _commit(self) -> None: """Actually publish all of the messages on the active batch. @@ -259,7 +304,7 @@ def _commit(self) -> None: # https://github.com/googleapis/google-cloud-python/issues/8036 # Sanity check: If there are no messages, no-op. - if not self._messages: + if not self._message_wrappers: _LOGGER.debug("No messages to publish, exiting commit") self._status = base.BatchStatus.SUCCESS return @@ -270,18 +315,51 @@ def _commit(self) -> None: batch_transport_succeeded = True try: + if self._client.open_telemetry_enabled: + self._start_publish_rpc_span() + # Performs retries for errors defined by the retry configuration. response = self._client._gapic_publish( topic=self._topic, - messages=self._messages, + messages=[wrapper.message for wrapper in self._message_wrappers], retry=self._commit_retry, timeout=self._commit_timeout, ) + + if self._client.open_telemetry_enabled: + assert self._rpc_span is not None + self._rpc_span.end() + end_time = str(datetime.now()) + for message_id, wrapper in zip( + response.message_ids, self._message_wrappers + ): + span = wrapper.create_span + span.add_event( + name="publish end", + attributes={ + "timestamp": end_time, + }, + ) + span.set_attribute(key="messaging.message.id", value=message_id) + wrapper.end_create_span() except google.api_core.exceptions.GoogleAPIError as exc: # We failed to publish, even after retries, so set the exception on # all futures and exit. self._status = base.BatchStatus.ERROR + if self._client.open_telemetry_enabled: + if self._rpc_span: + self._rpc_span.record_exception( + exception=exc, + ) + self._rpc_span.set_status( + trace.Status(status_code=trace.StatusCode.ERROR) + ) + self._rpc_span.end() + + for wrapper in self._message_wrappers: + wrapper.end_create_span(exc=exc) + batch_transport_succeeded = False if self._batch_done_callback is not None: # Failed to publish batch. @@ -326,7 +404,8 @@ def _commit(self) -> None: self._batch_done_callback(batch_transport_succeeded) def publish( - self, message: gapic_types.PubsubMessage + self, + wrapper: PublishMessageWrapper, ) -> Optional["pubsub_v1.publisher.futures.Future"]: """Publish a single message. @@ -338,7 +417,7 @@ def publish( This method is called by :meth:`~.PublisherClient.publish`. Args: - message: The Pub/Sub message. + wrapper: The Pub/Sub message wrapper. Returns: An object conforming to the :class:`~concurrent.futures.Future` interface @@ -351,12 +430,14 @@ def publish( """ # Coerce the type, just in case. - if not isinstance(message, gapic_types.PubsubMessage): + if not isinstance( + wrapper.message, gapic_types.PubsubMessage + ): # pragma: NO COVER # For performance reasons, the message should be constructed by directly # using the raw protobuf class, and only then wrapping it into the # higher-level PubsubMessage class. - vanilla_pb = _raw_proto_pubbsub_message(**message) - message = gapic_types.PubsubMessage.wrap(vanilla_pb) + vanilla_pb = _raw_proto_pubbsub_message(**wrapper.message) + wrapper.message = gapic_types.PubsubMessage.wrap(vanilla_pb) future = None @@ -369,7 +450,7 @@ def publish( return None size_increase = gapic_types.PublishRequest( - messages=[message] + messages=[wrapper.message] )._pb.ByteSize() if (self._base_request_size + size_increase) > _SERVER_PUBLISH_MAX_BYTES: @@ -381,14 +462,14 @@ def publish( raise exceptions.MessageTooLargeError(err_msg) new_size = self._size + size_increase - new_count = len(self._messages) + 1 + new_count = len(self._message_wrappers) + 1 size_limit = min(self.settings.max_bytes, _SERVER_PUBLISH_MAX_BYTES) overflow = new_size > size_limit or new_count >= self.settings.max_messages - if not self._messages or not overflow: + if not self._message_wrappers or not overflow: # Store the actual message in the batch's message queue. - self._messages.append(message) + self._message_wrappers.append(wrapper) self._size = new_size # Track the future on this batch (so that the result of the diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py index 30c76a44f..9644a1fa2 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py @@ -23,7 +23,9 @@ from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher._sequencer import base as sequencer_base from google.cloud.pubsub_v1.publisher._batch import base as batch_base -from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud.pubsub_v1 import types @@ -262,15 +264,15 @@ def _create_batch( def publish( self, - message: gapic_types.PubsubMessage, + wrapper: PublishMessageWrapper, retry: "OptionalRetry" = gapic_v1.method.DEFAULT, timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, ) -> futures.Future: """Publish message for this ordering key. Args: - message: - The Pub/Sub message. + wrapper: + The Pub/Sub message wrapper. retry: The retry settings to apply when publishing the message. timeout: @@ -317,11 +319,11 @@ def publish( self._ordered_batches.append(new_batch) batch = self._ordered_batches[-1] - future = batch.publish(message) + future = batch.publish(wrapper) while future is None: batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) self._ordered_batches.append(batch) - future = batch.publish(message) + future = batch.publish(wrapper) return future diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py index 7d57aa821..7dbd3f084 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py @@ -18,7 +18,9 @@ from google.api_core import gapic_v1 from google.cloud.pubsub_v1.publisher._sequencer import base -from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud.pubsub_v1.publisher import _batch @@ -115,15 +117,15 @@ def _create_batch( def publish( self, - message: gapic_types.PubsubMessage, + wrapper: PublishMessageWrapper, retry: "OptionalRetry" = gapic_v1.method.DEFAULT, timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, ) -> "futures.Future": """Batch message into existing or new batch. Args: - message: - The Pub/Sub message. + wrapper: + The Pub/Sub message wrapper. retry: The retry settings to apply when publishing the message. timeout: @@ -151,7 +153,7 @@ def publish( future = None while future is None: # Might throw MessageTooLargeError - future = batch.publish(message) + future = batch.publish(wrapper) # batch is full, triggering commit_when_full if future is None: batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 54b353276..481a8472d 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -22,6 +22,7 @@ import typing from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union import warnings +import sys from google.api_core import gapic_v1 from google.auth.credentials import AnonymousCredentials # type: ignore @@ -37,6 +38,9 @@ from google.pubsub_v1 import gapic_version as package_version from google.pubsub_v1 import types as gapic_types from google.pubsub_v1.services.publisher import client as publisher_client +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) __version__ = package_version.__version__ @@ -153,6 +157,22 @@ def __init__( # The object controlling the message publishing flow self._flow_controller = FlowController(self.publisher_options.flow_control) + self._open_telemetry_enabled = ( + self.publisher_options.enable_open_telemetry_tracing + ) + # OpenTelemetry features used by the library are not supported in Python versions <= 3.7. + # Refer https://github.com/open-telemetry/opentelemetry-python/issues/3993#issuecomment-2211976389 + if ( + self.publisher_options.enable_open_telemetry_tracing + and sys.version_info.major == 3 + and sys.version_info.minor < 8 + ): + warnings.warn( + message="Open Telemetry for Python version 3.7 or lower is not supported. Disabling Open Telemetry tracing.", + category=RuntimeWarning, + ) + self._open_telemetry_enabled = False + @classmethod def from_service_account_file( # type: ignore[override] cls, @@ -209,6 +229,10 @@ def api(self): warnings.warn(msg, category=DeprecationWarning) return super() + @property + def open_telemetry_enabled(self) -> bool: + return self._open_telemetry_enabled + def _get_or_create_sequencer(self, topic: str, ordering_key: str) -> SequencerType: """Get an existing sequencer or create a new one given the (topic, ordering_key) pair. @@ -368,11 +392,41 @@ def publish( # type: ignore[override] ) message = gapic_types.PubsubMessage.wrap(vanilla_pb) + wrapper: PublishMessageWrapper = PublishMessageWrapper(message) + if self._open_telemetry_enabled: + wrapper.start_create_span(topic=topic, ordering_key=ordering_key) + # Messages should go through flow control to prevent excessive # queuing on the client side (depending on the settings). try: + if self._open_telemetry_enabled: + if wrapper: + wrapper.start_publisher_flow_control_span() + else: # pragma: NO COVER + warnings.warn( + message="PubSubMessageWrapper is None. Not starting publisher flow control span.", + category=RuntimeWarning, + ) self._flow_controller.add(message) + if self._open_telemetry_enabled: + if wrapper: + wrapper.end_publisher_flow_control_span() + else: # pragma: NO COVER + warnings.warn( + message="PubSubMessageWrapper is None. Not ending publisher flow control span.", + category=RuntimeWarning, + ) except exceptions.FlowControlLimitError as exc: + if self._open_telemetry_enabled: + if wrapper: + wrapper.end_publisher_flow_control_span(exc) + wrapper.end_create_span(exc) + else: # pragma: NO COVER + warnings.warn( + message="PubSubMessageWrapper is None. Not ending publisher create and flow control spans on FlowControlLimitError.", + category=RuntimeWarning, + ) + future = futures.Future() future.set_exception(exc) return future @@ -386,31 +440,68 @@ def on_publish_done(future): if timeout is gapic_v1.method.DEFAULT: # if custom timeout not passed in timeout = self.publisher_options.timeout + if self._open_telemetry_enabled: + if wrapper: + wrapper.start_publisher_batching_span() + else: # pragma: NO COVER + warnings.warn( + message="PublishMessageWrapper is None. Hence, not starting publisher batching span", + category=RuntimeWarning, + ) with self._batch_lock: - if self._is_stopped: - raise RuntimeError("Cannot publish on a stopped publisher.") - - # Set retry timeout to "infinite" when message ordering is enabled. - # Note that this then also impacts messages added with an empty - # ordering key. - if self._enable_message_ordering: - if retry is gapic_v1.method.DEFAULT: - # use the default retry for the publish GRPC method as a base - transport = self._transport - base_retry = transport._wrapped_methods[transport.publish]._retry - retry = base_retry.with_deadline(2.0**32) - # timeout needs to be overridden and set to infinite in - # addition to the retry deadline since both determine - # the duration for which retries are attempted. - timeout = 2.0**32 - elif retry is not None: - retry = retry.with_deadline(2.0**32) - timeout = 2.0**32 - - # Delegate the publishing to the sequencer. - sequencer = self._get_or_create_sequencer(topic, ordering_key) - future = sequencer.publish(message, retry=retry, timeout=timeout) - future.add_done_callback(on_publish_done) + try: + if self._is_stopped: + raise RuntimeError("Cannot publish on a stopped publisher.") + + # Set retry timeout to "infinite" when message ordering is enabled. + # Note that this then also impacts messages added with an empty + # ordering key. + if self._enable_message_ordering: + if retry is gapic_v1.method.DEFAULT: + # use the default retry for the publish GRPC method as a base + transport = self._transport + base_retry = transport._wrapped_methods[ + transport.publish + ]._retry + retry = base_retry.with_deadline(2.0**32) + # timeout needs to be overridden and set to infinite in + # addition to the retry deadline since both determine + # the duration for which retries are attempted. + timeout = 2.0**32 + elif retry is not None: + retry = retry.with_deadline(2.0**32) + timeout = 2.0**32 + + # Delegate the publishing to the sequencer. + sequencer = self._get_or_create_sequencer(topic, ordering_key) + future = sequencer.publish( + wrapper=wrapper, retry=retry, timeout=timeout + ) + future.add_done_callback(on_publish_done) + except BaseException as be: + # Exceptions can be thrown when attempting to add messages to + # the batch. If they're thrown, record them in publisher + # batching and create span, end the spans and bubble the + # exception up. + if self._open_telemetry_enabled: + if wrapper: + wrapper.end_publisher_batching_span(be) + wrapper.end_create_span(be) + else: # pragma: NO COVER + warnings.warn( + message="PublishMessageWrapper is None. Hence, not recording exception and ending publisher batching span and create span", + category=RuntimeWarning, + ) + raise be + + if self._open_telemetry_enabled: + if wrapper: + wrapper.end_publisher_batching_span() + else: # pragma: NO COVER + warnings.warn( + message="PublishMessageWrapper is None. Hence, not ending publisher batching span", + category=RuntimeWarning, + ) # Create a timer thread if necessary to enforce the batching # timeout. diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index 3d071a189..c4282e685 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -174,6 +174,9 @@ class PublisherOptions(NamedTuple): "compatible with :class:`~.pubsub_v1.types.TimeoutType`." ) + enable_open_telemetry_tracing: bool = False # disabled by default + """Open Telemetry tracing is enabled if this is set to True.""" + # Define the type class and default values for flow control settings. # diff --git a/setup.py b/setup.py index dbb66cf7c..cc852f7d8 100644 --- a/setup.py +++ b/setup.py @@ -45,6 +45,8 @@ "protobuf>=3.20.2,<6.0.0dev,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5", "grpc-google-iam-v1 >= 0.12.4, < 1.0.0dev", "grpcio-status >= 1.33.2", + "opentelemetry-api", + "opentelemetry-sdk", ] extras = {"libcst": "libcst >= 0.3.10"} url = "https://github.com/googleapis/python-pubsub" diff --git a/tests/unit/pubsub_v1/conftest.py b/tests/unit/pubsub_v1/conftest.py index dc4192931..b44e2fd84 100644 --- a/tests/unit/pubsub_v1/conftest.py +++ b/tests/unit/pubsub_v1/conftest.py @@ -12,9 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import google.auth.credentials import pytest +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry import trace +import google.auth.credentials + @pytest.fixture def creds(): @@ -23,3 +28,18 @@ def creds(): GOOGLE_APPLICATION_CREDENTIALS set. """ yield google.auth.credentials.AnonymousCredentials() + + +@pytest.fixture(scope="session", autouse=True) +def set_trace_provider(): + provider = TracerProvider() + trace.set_tracer_provider(provider) + + +@pytest.fixture(scope="function") +def span_exporter(): + exporter = InMemorySpanExporter() + processor = SimpleSpanProcessor(exporter) + provider = trace.get_tracer_provider() + provider.add_span_processor(processor) + yield exporter diff --git a/tests/unit/pubsub_v1/publisher/batch/test_base.py b/tests/unit/pubsub_v1/publisher/batch/test_base.py index a95d72c12..ae5dbea04 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_base.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_base.py @@ -21,6 +21,9 @@ from google.cloud.pubsub_v1.publisher._batch.base import BatchStatus from google.cloud.pubsub_v1.publisher._batch.thread import Batch from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) def create_batch(status, settings=types.BatchSettings()): @@ -41,5 +44,5 @@ def create_batch(status, settings=types.BatchSettings()): def test_len(): batch = create_batch(status=BatchStatus.ACCEPTING_MESSAGES) assert len(batch) == 0 - batch.publish(gapic_types.PubsubMessage(data=b"foo")) + batch.publish(PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"foo"))) assert len(batch) == 1 diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 2752d62a2..32eaa3d98 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -25,6 +25,9 @@ import pytest +from opentelemetry import trace +from opentelemetry.trace import SpanContext + import google.api_core.exceptions from google.api_core import gapic_v1 from google.auth import credentials @@ -36,10 +39,18 @@ from google.cloud.pubsub_v1.publisher._batch import thread from google.cloud.pubsub_v1.publisher._batch.thread import Batch from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) -def create_client(): - return publisher.Client(credentials=credentials.AnonymousCredentials()) +def create_client(enable_open_telemetry: bool = False): + return publisher.Client( + credentials=credentials.AnonymousCredentials(), + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=enable_open_telemetry, + ), + ) def create_batch( @@ -48,7 +59,8 @@ def create_batch( commit_when_full=True, commit_retry=gapic_v1.method.DEFAULT, commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, - **batch_settings + enable_open_telemetry: bool = False, + **batch_settings, ): """Return a batch object suitable for testing. @@ -62,13 +74,14 @@ def create_batch( for the batch commit call. commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`): The timeout to apply to the batch commit call. + enable_open_telemetry (bool): Whether to enable OpenTelemetry. batch_settings (Mapping[str, str]): Arguments passed on to the :class:``~.pubsub_v1.types.BatchSettings`` constructor. Returns: ~.pubsub_v1.publisher.batch.thread.Batch: A batch object. """ - client = create_client() + client = create_client(enable_open_telemetry=enable_open_telemetry) settings = types.BatchSettings(**batch_settings) return Batch( client, @@ -126,8 +139,16 @@ def test_commit_no_op(): def test_blocking__commit(): batch = create_batch() futures = ( - batch.publish({"data": b"This is my message."}), - batch.publish({"data": b"This is another message."}), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is my message.") + ) + ), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is another message.") + ) + ), ) # Set up the underlying API publish method to return a PublishResponse. @@ -160,7 +181,11 @@ def test_blocking__commit(): def test_blocking__commit_custom_retry(): batch = create_batch(commit_retry=mock.sentinel.custom_retry) - batch.publish({"data": b"This is my message."}) + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is my message.") + ) + ) # Set up the underlying API publish method to return a PublishResponse. publish_response = gapic_types.PublishResponse(message_ids=["a"]) @@ -182,7 +207,11 @@ def test_blocking__commit_custom_retry(): def test_blocking__commit_custom_timeout(): batch = create_batch(commit_timeout=mock.sentinel.custom_timeout) - batch.publish({"data": b"This is my message."}) + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is my message.") + ) + ) # Set up the underlying API publish method to return a PublishResponse. publish_response = gapic_types.PublishResponse(message_ids=["a"]) @@ -217,13 +246,21 @@ def api_publish_delay(topic="", messages=(), retry=None, timeout=None): ) with api_publish_patch: - batch.publish({"data": b"first message"}) + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"first message") + ) + ) start = datetime.datetime.now() event_set = api_publish_called.wait(timeout=1.0) if not event_set: # pragma: NO COVER pytest.fail("API publish was not called in time") - batch.publish({"data": b"second message"}) + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"second message") + ) + ) end = datetime.datetime.now() # While a batch commit in progress, waiting for the API publish call to @@ -266,8 +303,16 @@ def test_blocking__commit_no_messages(): def test_blocking__commit_wrong_messageid_length(): batch = create_batch() futures = ( - batch.publish({"data": b"blah blah blah"}), - batch.publish({"data": b"blah blah blah blah"}), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah") + ) + ), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah blah") + ) + ), ) # Set up a PublishResponse that only returns one message ID. @@ -287,8 +332,16 @@ def test_blocking__commit_wrong_messageid_length(): def test_block__commmit_api_error(): batch = create_batch() futures = ( - batch.publish({"data": b"blah blah blah"}), - batch.publish({"data": b"blah blah blah blah"}), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah") + ) + ), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah blah") + ) + ), ) # Make the API throw an error when publishing. @@ -306,8 +359,16 @@ def test_block__commmit_api_error(): def test_block__commmit_retry_error(): batch = create_batch() futures = ( - batch.publish({"data": b"blah blah blah"}), - batch.publish({"data": b"blah blah blah blah"}), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah") + ) + ), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"blah blah blah blah") + ) + ), ) # Make the API throw an error when publishing. @@ -324,24 +385,31 @@ def test_block__commmit_retry_error(): def test_publish_updating_batch_size(): batch = create_batch(topic="topic_foo") - messages = ( - gapic_types.PubsubMessage(data=b"foobarbaz"), - gapic_types.PubsubMessage(data=b"spameggs"), - gapic_types.PubsubMessage(data=b"1335020400"), + wrappers = ( + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz"), + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"spameggs"), + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"1335020400"), + ), ) # Publish each of the messages, which should save them to the batch. - futures = [batch.publish(message) for message in messages] + futures = [batch.publish(wrapper) for wrapper in wrappers] # There should be three messages on the batch, and three futures. - assert len(batch.messages) == 3 + assert len(batch.message_wrappers) == 3 assert batch._futures == futures # The size should have been incremented by the sum of the size # contributions of each message to the PublishRequest. base_request_size = gapic_types.PublishRequest(topic="topic_foo")._pb.ByteSize() expected_request_size = base_request_size + sum( - gapic_types.PublishRequest(messages=[msg])._pb.ByteSize() for msg in messages + gapic_types.PublishRequest(messages=[wrapper.message])._pb.ByteSize() + for wrapper in wrappers ) assert batch.size == expected_request_size @@ -350,68 +418,82 @@ def test_publish_updating_batch_size(): def test_publish(): batch = create_batch() - message = gapic_types.PubsubMessage() - future = batch.publish(message) + wrapper = PublishMessageWrapper(message=gapic_types.PubsubMessage()) + future = batch.publish(wrapper) - assert len(batch.messages) == 1 + assert len(batch.message_wrappers) == 1 assert batch._futures == [future] def test_publish_max_messages_zero(): batch = create_batch(topic="topic_foo", max_messages=0) - - message = gapic_types.PubsubMessage(data=b"foobarbaz") + wrapper = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz"), + ) with mock.patch.object(batch, "commit") as commit: - future = batch.publish(message) + future = batch.publish(wrapper) assert future is not None - assert len(batch.messages) == 1 + assert len(batch.message_wrappers) == 1 assert batch._futures == [future] commit.assert_called_once() def test_publish_max_messages_enforced(): batch = create_batch(topic="topic_foo", max_messages=1) + wrapper = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz") + ) + wrapper2 = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz2") + ) - message = gapic_types.PubsubMessage(data=b"foobarbaz") - message2 = gapic_types.PubsubMessage(data=b"foobarbaz2") - - future = batch.publish(message) - future2 = batch.publish(message2) + future = batch.publish(wrapper) + future2 = batch.publish(wrapper2) assert future is not None assert future2 is None - assert len(batch.messages) == 1 + assert len(batch.message_wrappers) == 1 assert len(batch._futures) == 1 def test_publish_max_bytes_enforced(): batch = create_batch(topic="topic_foo", max_bytes=15) - message = gapic_types.PubsubMessage(data=b"foobarbaz") - message2 = gapic_types.PubsubMessage(data=b"foobarbaz2") + wrapper = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz") + ) + wrapper2 = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz2") + ) - future = batch.publish(message) - future2 = batch.publish(message2) + future = batch.publish(wrapper) + future2 = batch.publish(wrapper2) assert future is not None assert future2 is None - assert len(batch.messages) == 1 + assert len(batch.message_wrappers) == 1 assert len(batch._futures) == 1 def test_publish_exceed_max_messages(): max_messages = 4 batch = create_batch(max_messages=max_messages) - messages = ( - gapic_types.PubsubMessage(data=b"foobarbaz"), - gapic_types.PubsubMessage(data=b"spameggs"), - gapic_types.PubsubMessage(data=b"1335020400"), + wrappers = ( + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz"), + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"spameggs"), + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"1335020400"), + ), ) # Publish each of the messages, which should save them to the batch. with mock.patch.object(batch, "commit") as commit: - futures = [batch.publish(message) for message in messages] + futures = [batch.publish(wrapper) for wrapper in wrappers] assert batch._futures == futures assert len(futures) == max_messages - 1 @@ -420,7 +502,11 @@ def test_publish_exceed_max_messages(): # When a fourth message is published, commit should be called. # No future will be returned in this case. - future = batch.publish(gapic_types.PubsubMessage(data=b"last one")) + future = batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"last one") + ) + ) commit.assert_called_once_with() assert future is None @@ -443,28 +529,32 @@ def test_publish_single_message_size_exceeds_server_size_limit(): assert request_size == 1001 # sanity check, just above the (mocked) server limit with pytest.raises(exceptions.MessageTooLargeError): - batch.publish(big_message) + batch.publish(wrapper=PublishMessageWrapper(message=big_message)) @mock.patch.object(thread, "_SERVER_PUBLISH_MAX_BYTES", 1000) def test_publish_total_messages_size_exceeds_server_size_limit(): batch = create_batch(topic="topic_foo", max_messages=10, max_bytes=1500) - messages = ( - gapic_types.PubsubMessage(data=b"x" * 500), - gapic_types.PubsubMessage(data=b"x" * 600), + wrappers = ( + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"x" * 500), + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"x" * 600), + ), ) # Sanity check - request size is still below BatchSettings.max_bytes, # but it exceeds the server-side size limit. request_size = gapic_types.PublishRequest( - topic="topic_foo", messages=messages + topic="topic_foo", messages=[wrapper.message for wrapper in wrappers] )._pb.ByteSize() assert 1000 < request_size < 1500 with mock.patch.object(batch, "commit") as fake_commit: - batch.publish(messages[0]) - batch.publish(messages[1]) + batch.publish(wrappers[0]) + batch.publish(wrappers[1]) # The server side limit should kick in and cause a commit. fake_commit.assert_called_once() @@ -472,21 +562,40 @@ def test_publish_total_messages_size_exceeds_server_size_limit(): def test_publish_dict(): batch = create_batch() - future = batch.publish({"data": b"foobarbaz", "attributes": {"spam": "eggs"}}) + future = batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage( + data=b"foobarbaz", + attributes={"spam": "eggs"}, + ), + ) + ) # There should be one message on the batch. - expected_message = gapic_types.PubsubMessage( - data=b"foobarbaz", attributes={"spam": "eggs"} + expected_message_wrapper = PublishMessageWrapper( + message=gapic_types.PubsubMessage( + data=b"foobarbaz", + attributes={"spam": "eggs"}, + ) ) - assert batch.messages == [expected_message] + + assert batch.message_wrappers == [expected_message_wrapper] assert batch._futures == [future] def test_cancel(): batch = create_batch() futures = ( - batch.publish({"data": b"This is my message."}), - batch.publish({"data": b"This is another message."}), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is my message."), + ), + ), + batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"This is another message."), + ), + ), ) batch.cancel(BatchCancellationReason.PRIOR_ORDERED_MESSAGE_FAILED) @@ -502,19 +611,29 @@ def test_do_not_commit_when_full_when_flag_is_off(): max_messages = 4 # Set commit_when_full flag to False batch = create_batch(max_messages=max_messages, commit_when_full=False) - messages = ( - gapic_types.PubsubMessage(data=b"foobarbaz"), - gapic_types.PubsubMessage(data=b"spameggs"), - gapic_types.PubsubMessage(data=b"1335020400"), + wrappers = ( + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz"), + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"spameggs"), + ), + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"1335020400"), + ), ) with mock.patch.object(batch, "commit") as commit: # Publish 3 messages. - futures = [batch.publish(message) for message in messages] + futures = [batch.publish(wrapper) for wrapper in wrappers] assert len(futures) == 3 # When a fourth message is published, commit should not be called. - future = batch.publish(gapic_types.PubsubMessage(data=b"last one")) + future = batch.publish( + wrapper=PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"last one"), + ) + ) assert commit.call_count == 0 assert future is None @@ -534,8 +653,10 @@ def test_batch_done_callback_called_on_success(): batch = create_batch(batch_done_callback=batch_done_callback_tracker) # Ensure messages exist. - message = gapic_types.PubsubMessage(data=b"foobarbaz") - batch.publish(message) + wrapper = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz") + ) + batch.publish(wrapper) # One response for one published message. publish_response = gapic_types.PublishResponse(message_ids=["a"]) @@ -554,8 +675,10 @@ def test_batch_done_callback_called_on_publish_failure(): batch = create_batch(batch_done_callback=batch_done_callback_tracker) # Ensure messages exist. - message = gapic_types.PubsubMessage(data=b"foobarbaz") - batch.publish(message) + wrapper = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz") + ) + batch.publish(wrapper) # One response for one published message. publish_response = gapic_types.PublishResponse(message_ids=["a"]) @@ -580,8 +703,10 @@ def test_batch_done_callback_called_on_publish_response_invalid(): batch = create_batch(batch_done_callback=batch_done_callback_tracker) # Ensure messages exist. - message = gapic_types.PubsubMessage(data=b"foobarbaz") - batch.publish(message) + wrapper = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foobarbaz"), + ) + batch.publish(wrapper) # No message ids returned in successful publish response -> invalid. publish_response = gapic_types.PublishResponse(message_ids=[]) @@ -593,3 +718,249 @@ def test_batch_done_callback_called_on_publish_response_invalid(): assert batch_done_callback_tracker.called assert not batch_done_callback_tracker.success + + +# Refer https://opentelemetry.io/docs/languages/python/#version-support +@pytest.mark.skipif( + sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher" +) +def test_open_telemetry_commit_publish_rpc_span_none(span_exporter): + """ + Test scenario where OpenTelemetry is enabled, publish RPC + span creation fails(unexpected) and hence batch._rpc_span is None when + attempting to close it. Required for code coverage. + """ + TOPIC = "projects/projectID/topics/topicID" + batch = create_batch(topic=TOPIC, enable_open_telemetry=True) + + message = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo"), + ) + message.start_create_span(topic=TOPIC, ordering_key=None) + batch.publish(message) + + # Mock error when publish RPC span creation is attempted. + error = google.api_core.exceptions.InternalServerError("error") + + with mock.patch.object( + type(batch), + "_start_publish_rpc_span", + side_effect=error, + ): + batch._commit() + + assert batch._rpc_span is None + spans = span_exporter.get_finished_spans() + + # Only Create span should be exported, since publish RPC span creation + # should fail with a mock error. + assert len(spans) == 1 + + publish_create_span = spans[0] + assert publish_create_span.status.status_code == trace.status.StatusCode.ERROR + assert publish_create_span.end_time is not None + + assert publish_create_span.name == "topicID create" + # Publish start event and exception event should be present in publish + # create span. + assert len(publish_create_span.events) == 2 + assert publish_create_span.events[0].name == "publish start" + assert publish_create_span.events[1].name == "exception" + + +# Refer https://opentelemetry.io/docs/languages/python/#version-support +@pytest.mark.skipif( + sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher" +) +def test_open_telemetry_commit_publish_rpc_exception(span_exporter): + TOPIC = "projects/projectID/topics/topicID" + batch = create_batch(topic=TOPIC, enable_open_telemetry=True) + + message = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo"), + ) + message.start_create_span(topic=TOPIC, ordering_key=None) + batch.publish(message) + + # Mock publish error. + error = google.api_core.exceptions.InternalServerError("error") + + with mock.patch.object( + type(batch.client), + "_gapic_publish", + side_effect=error, + ): + batch._commit() + + spans = span_exporter.get_finished_spans() + # Span 1: Publish RPC span + # Span 2: Create span. + assert len(spans) == 2 + + # Verify both spans recorded error and have ended. + for span in spans: + assert span.status.status_code == trace.status.StatusCode.ERROR + assert span.end_time is not None + + publish_rpc_span = spans[0] + assert publish_rpc_span.name == "topicID publish" + assert len(publish_rpc_span.events) == 1 + assert publish_rpc_span.events[0].name == "exception" + + publish_create_span = spans[1] + assert publish_create_span.name == "topicID create" + # Publish start event and exception event should be present in publish + # create span. + assert len(publish_create_span.events) == 2 + assert publish_create_span.events[0].name == "publish start" + assert publish_create_span.events[1].name == "exception" + + +# Refer https://opentelemetry.io/docs/languages/python/#version-support +@pytest.mark.skipif( + sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher" +) +def test_opentelemetry_commit_sampling(span_exporter): + TOPIC = "projects/projectID/topics/topic" + batch = create_batch( + topic=TOPIC, + enable_open_telemetry=True, + ) + + message1 = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo"), + ) + message1.start_create_span(topic=TOPIC, ordering_key=None) + + message2 = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"bar"), + ) + message2.start_create_span(topic=TOPIC, ordering_key=None) + + # Mock the 'get_span_context' method to return a mock SpanContext + mock_span_context = mock.Mock(spec=SpanContext) + mock_span_context.trace_flags.sampled = False + + batch.publish(message1) + batch.publish(message2) + + publish_response = gapic_types.PublishResponse(message_ids=["a", "b"]) + + # Patch the 'create_span' method to return the mock SpanContext + with mock.patch.object( + message1.create_span, "get_span_context", return_value=mock_span_context + ): + with mock.patch.object( + type(batch.client), "_gapic_publish", return_value=publish_response + ): + batch._commit() + + spans = span_exporter.get_finished_spans() + + # Span 1: Publish RPC span of both messages + # Span 2: Create span of message 1 + # Span 3: Create span of message 2 + assert len(spans) == 3 + + publish_rpc_span, create_span1, create_span2 = spans + + # Verify publish RPC span has only one link corresponding to + # message 2 which is included in the sample. + assert len(publish_rpc_span.links) == 1 + assert len(create_span1.links) == 0 + assert len(create_span2.links) == 1 + assert publish_rpc_span.links[0].context == create_span2.context + assert create_span2.links[0].context == publish_rpc_span.context + + # Verify all spans have ended. + for span in spans: + assert span.end_time is not None + + # Verify both publish create spans have 2 events - publish start and publish + # end. + for span in spans[1:]: + assert len(span.events) == 2 + assert span.events[0].name == "publish start" + assert span.events[1].name == "publish end" + + +@pytest.mark.skipif( + sys.version_info < (3, 8), reason="Open Telemetry requires python3.8 or higher" +) +def test_opentelemetry_commit(span_exporter): + TOPIC = "projects/projectID/topics/topic" + batch = create_batch( + topic=TOPIC, + enable_open_telemetry=True, + ) + + msg1 = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo"), + ) + msg2 = PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"bar"), + ) + msg1.start_create_span(topic=TOPIC, ordering_key=None) + msg2.start_create_span(topic=TOPIC, ordering_key=None) + + # Add both messages to the batch. + batch.publish(msg1) + batch.publish(msg2) + + publish_response = gapic_types.PublishResponse(message_ids=["a", "b"]) + with mock.patch.object( + type(batch.client), "_gapic_publish", return_value=publish_response + ): + batch._commit() + + spans = span_exporter.get_finished_spans() + + # Span 1: publish RPC span - closed after publish RPC success. + # Span 2: publisher create span of message 1 - closed after publish RPC success. + # Span 3: publisher create span of message 2 - closed after publish RPC success. + assert len(spans) == 3 + publish_rpc_span, create_span1, create_span2 = spans + + # Verify publish RPC span + assert publish_rpc_span.name == "topic publish" + assert publish_rpc_span.kind == trace.SpanKind.CLIENT + assert publish_rpc_span.end_time is not None + attributes = publish_rpc_span.attributes + assert attributes["messaging.system"] == "gcp_pubsub" + assert attributes["messaging.destination.name"] == "topic" + assert attributes["gcp.project_id"] == "projectID" + assert attributes["messaging.batch.message_count"] == 2 + assert attributes["messaging.operation"] == "publish" + assert attributes["code.function"] == "_commit" + assert publish_rpc_span.parent is None + # Verify the links correspond to the spans of the published messages. + assert len(publish_rpc_span.links) == 2 + assert publish_rpc_span.links[0].context == create_span1.context + assert publish_rpc_span.links[1].context == create_span2.context + assert len(create_span1.links) == 1 + assert create_span1.links[0].context == publish_rpc_span.get_span_context() + assert len(create_span2.links) == 1 + assert create_span2.links[0].context == publish_rpc_span.get_span_context() + + # Verify spans of the published messages. + assert create_span1.name == "topic create" + assert create_span2.name == "topic create" + + # Verify the publish create spans have been closed after publish success. + assert create_span1.end_time is not None + assert create_span2.end_time is not None + + # Verify message IDs returned from gapic publish are added as attributes + # to the publisher create spans of the messages. + assert "messaging.message.id" in create_span1.attributes + assert create_span1.attributes["messaging.message.id"] == "a" + assert "messaging.message.id" in create_span2.attributes + assert create_span2.attributes["messaging.message.id"] == "b" + + # Verify publish end event added to the span + assert len(create_span1.events) == 2 + assert len(create_span2.events) == 2 + assert create_span1.events[0].name == "publish start" + assert create_span1.events[1].name == "publish end" + assert create_span2.events[0].name == "publish start" + assert create_span2.events[1].name == "publish end" diff --git a/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py b/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py index 7570c2970..4377d1447 100644 --- a/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py +++ b/tests/unit/pubsub_v1/publisher/sequencer/test_ordered_sequencer.py @@ -27,12 +27,17 @@ from google.cloud.pubsub_v1 import publisher from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) _ORDERING_KEY = "ordering_key_1" def create_message(): - return gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + return PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + ) def create_client(): diff --git a/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py b/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py index 01d9d6ca4..739bae3bd 100644 --- a/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py +++ b/tests/unit/pubsub_v1/publisher/sequencer/test_unordered_sequencer.py @@ -27,10 +27,15 @@ from google.cloud.pubsub_v1.publisher._batch import base from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) def create_message(): - return gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + return PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + ) def create_client(): @@ -140,7 +145,9 @@ def test_publish_after_batch_error(): batch = client._batch_class( client, "topic_name", types.BatchSettings(max_latency=float("inf")) ) - batch._messages.append(mock.Mock(name="message")) # Make batch truthy (non-empty). + batch._message_wrappers.append( + mock.Mock(name="message") + ) # Make batch truthy (non-empty). sequencer = unordered_sequencer.UnorderedSequencer(client, "topic_name") sequencer._set_batch(batch) diff --git a/tests/unit/pubsub_v1/publisher/test_publish_message_wrapper.py b/tests/unit/pubsub_v1/publisher/test_publish_message_wrapper.py new file mode 100644 index 000000000..e100950ad --- /dev/null +++ b/tests/unit/pubsub_v1/publisher/test_publish_message_wrapper.py @@ -0,0 +1,55 @@ +# Copyright 2019, Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from google.pubsub_v1 import types as gapic_types +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) + + +def test_message_setter(): + wrapper = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"foo")) + another_message = gapic_types.PubsubMessage(data=b"bar") + wrapper.message = another_message + + assert wrapper.message == another_message + + +def test_eq(): + wrapper1 = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"foo")) + wrapper2 = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"bar")) + wrapper3 = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"foo")) + + assert wrapper1.__eq__(wrapper2) is False + assert wrapper1.__eq__(wrapper3) is True + + +def test_end_create_span(): + wrapper = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"foo")) + with pytest.raises(AssertionError): + wrapper.end_create_span() + + +def test_end_publisher_flow_control_span(): + wrapper = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"foo")) + with pytest.raises(AssertionError): + wrapper.end_publisher_flow_control_span() + + +def test_end_publisher_batching_span(): + wrapper = PublishMessageWrapper(message=gapic_types.PubsubMessage(data=b"foo")) + with pytest.raises(AssertionError): + wrapper.end_publisher_batching_span() diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 9db5e0ef8..23255db3b 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -29,18 +29,24 @@ import pytest import time +from opentelemetry import trace from google.api_core import gapic_v1 from google.api_core import retry as retries from google.api_core.gapic_v1.client_info import METRICS_METADATA_KEY + from google.cloud.pubsub_v1 import publisher from google.cloud.pubsub_v1 import types - from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer - from google.pubsub_v1 import types as gapic_types from google.pubsub_v1.services.publisher import client as publisher_client from google.pubsub_v1.services.publisher.transports.grpc import PublisherGrpcTransport +from google.cloud.pubsub_v1.open_telemetry.context_propagation import ( + OpenTelemetryContextSetter, +) +from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import ( + PublishMessageWrapper, +) def _assert_retries_equal(retry, retry2): @@ -129,6 +135,220 @@ def test_init_w_custom_transport(creds): assert client.batch_settings.max_messages == 100 +@pytest.mark.parametrize( + "enable_open_telemetry", + [ + True, + False, + ], +) +def test_open_telemetry_publisher_options(creds, enable_open_telemetry): + if sys.version_info >= (3, 8) or enable_open_telemetry is False: + client = publisher.Client( + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=enable_open_telemetry + ), + credentials=creds, + ) + assert client._open_telemetry_enabled == enable_open_telemetry + else: + # Open Telemetry is not supported and hence disabled for Python + # versions 3.7 or below + with pytest.warns( + RuntimeWarning, + match="Open Telemetry for Python version 3.7 or lower is not supported. Disabling Open Telemetry tracing.", + ): + client = publisher.Client( + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=enable_open_telemetry + ), + credentials=creds, + ) + assert client._open_telemetry_enabled is False + + +def test_opentelemetry_context_setter(): + msg = gapic_types.PubsubMessage(data=b"foo") + OpenTelemetryContextSetter().set(carrier=msg, key="key", value="bar") + + assert "googclient_key" in msg.attributes.keys() + + +@pytest.mark.skipif( + sys.version_info < (3, 8), + reason="Open Telemetry not supported below Python version 3.8", +) +def test_opentelemetry_context_propagation(creds, span_exporter): + TOPIC = "projects/projectID/topics/topicID" + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=True, + ), + ) + + message_mock = mock.Mock(spec=publisher.flow_controller.FlowController.add) + client._flow_controller.add = message_mock + client.publish(TOPIC, b"data") + + message_mock.assert_called_once() + args = message_mock.call_args.args + assert len(args) == 1 + assert "googclient_traceparent" in args[0].attributes + + +@pytest.mark.skipif( + sys.version_info < (3, 8), + reason="Open Telemetry not supported below Python version 3.8", +) +@pytest.mark.parametrize( + "enable_open_telemetry", + [ + True, + False, + ], +) +def test_opentelemetry_publisher_batching_exception( + creds, span_exporter, enable_open_telemetry +): + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=enable_open_telemetry, + ), + ) + + # Throw an exception when sequencer.publish() is called + sequencer = mock.Mock(spec=ordered_sequencer.OrderedSequencer) + sequencer.publish = mock.Mock(side_effect=RuntimeError("some error")) + client._get_or_create_sequencer = mock.Mock(return_value=sequencer) + + TOPIC = "projects/projectID/topics/topicID" + with pytest.raises(RuntimeError): + client.publish(TOPIC, b"message") + + spans = span_exporter.get_finished_spans() + + if enable_open_telemetry: + # Span 1: Publisher Flow Control span + # Span 2: Publisher Batching span + # Span 3: Create Publish span + assert len(spans) == 3 + + flow_control_span, batching_span, create_span = spans + + # Verify batching span contents. + assert batching_span.name == "publisher batching" + assert batching_span.kind == trace.SpanKind.INTERNAL + assert batching_span.parent.span_id == create_span.get_span_context().span_id + + # Verify exception recorded by the publisher batching span. + assert batching_span.status.status_code == trace.StatusCode.ERROR + assert len(batching_span.events) == 1 + assert batching_span.events[0].name == "exception" + + # Verify exception recorded by the publisher create span. + assert create_span.status.status_code == trace.StatusCode.ERROR + assert len(create_span.events) == 2 + assert create_span.events[0].name == "publish start" + assert create_span.events[1].name == "exception" + + # Verify the finished flow control span. + assert flow_control_span.name == "publisher flow control" + assert len(flow_control_span.events) == 0 + else: + assert len(spans) == 0 + + +@pytest.mark.skipif( + sys.version_info < (3, 8), + reason="Open Telemetry not supported below Python version 3.8", +) +def test_opentelemetry_flow_control_exception(creds, span_exporter): + publisher_options = types.PublisherOptions( + flow_control=types.PublishFlowControl( + message_limit=10, + byte_limit=150, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ), + enable_open_telemetry_tracing=True, + ) + client = publisher.Client(credentials=creds, publisher_options=publisher_options) + + mock_batch = mock.Mock(spec=client._batch_class) + topic = "projects/projectID/topics/topicID" + client._set_batch(topic, mock_batch) + + future1 = client.publish(topic, b"a" * 60) + future2 = client.publish(topic, b"b" * 100) + + future1.result() # no error, still within flow control limits + with pytest.raises(exceptions.FlowControlLimitError): + future2.result() + + spans = span_exporter.get_finished_spans() + # Span 1 = Publisher Flow Control Span of first publish + # Span 2 = Publisher Batching Span of first publish + # Span 3 = Publisher Flow Control Span of second publish(raises FlowControlLimitError) + # Span 4 = Publish Create Span of second publish(raises FlowControlLimitError) + assert len(spans) == 4 + + failed_flow_control_span = spans[2] + finished_publish_create_span = spans[3] + + # Verify failed flow control span values. + assert failed_flow_control_span.name == "publisher flow control" + assert failed_flow_control_span.kind == trace.SpanKind.INTERNAL + assert ( + failed_flow_control_span.parent.span_id + == finished_publish_create_span.get_span_context().span_id + ) + assert failed_flow_control_span.status.status_code == trace.StatusCode.ERROR + + assert len(failed_flow_control_span.events) == 1 + assert failed_flow_control_span.events[0].name == "exception" + + # Verify finished publish create span values + assert finished_publish_create_span.name == "topicID create" + assert finished_publish_create_span.status.status_code == trace.StatusCode.ERROR + assert len(finished_publish_create_span.events) == 2 + assert finished_publish_create_span.events[0].name == "publish start" + assert finished_publish_create_span.events[1].name == "exception" + + +@pytest.mark.skipif( + sys.version_info < (3, 8), + reason="Open Telemetry not supported below Python version 3.8", +) +def test_opentelemetry_publish(creds, span_exporter): + TOPIC = "projects/projectID/topics/topicID" + client = publisher.Client( + credentials=creds, + publisher_options=types.PublisherOptions( + enable_open_telemetry_tracing=True, + ), + ) + + client.publish(TOPIC, b"message") + spans = span_exporter.get_finished_spans() + + # Span 1: Publisher Flow control span + # Span 2: Publisher Batching span + # Publish Create Span would still be active, and hence not exported. + flow_control_span = spans[0] + assert flow_control_span.name == "publisher flow control" + assert flow_control_span.kind == trace.SpanKind.INTERNAL + # Assert the Publisher Flow Control Span has a parent(the Publish Create + # span is still active, and hence unexported. So, the value of parent cannot + # be asserted) + assert flow_control_span.parent is not None + + batching_span = spans[1] + assert batching_span.name == "publisher batching" + assert batching_span.kind == trace.SpanKind.INTERNAL + assert batching_span.parent is not None + + def test_init_w_api_endpoint(creds): client_options = {"api_endpoint": "testendpoint.google.com"} client = publisher.Client(client_options=client_options, credentials=creds) @@ -240,9 +460,17 @@ def test_publish(creds): # Check mock. batch.publish.assert_has_calls( [ - mock.call(gapic_types.PubsubMessage(data=b"spam")), mock.call( - gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"spam"), + ) + ), + mock.call( + PublishMessageWrapper( + message=gapic_types.PubsubMessage( + data=b"foo", attributes={"bar": "baz"} + ) + ) ), ] ) @@ -381,7 +609,9 @@ def test_publish_attrs_bytestring(creds): # The attributes should have been sent as text. batch.publish.assert_called_once_with( - gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) + ) ) @@ -421,8 +651,9 @@ def test_publish_new_batch_needed(creds): commit_timeout=gapic_v1.method.DEFAULT, ) message_pb = gapic_types.PubsubMessage(data=b"foo", attributes={"bar": "baz"}) - batch1.publish.assert_called_once_with(message_pb) - batch2.publish.assert_called_once_with(message_pb) + wrapper = PublishMessageWrapper(message=message_pb) + batch1.publish.assert_called_once_with(wrapper) + batch2.publish.assert_called_once_with(wrapper) def test_publish_attrs_type_error(creds): @@ -445,9 +676,9 @@ def test_publish_custom_retry_overrides_configured_retry(creds): client.publish(topic, b"hello!", retry=mock.sentinel.custom_retry) fake_sequencer.publish.assert_called_once_with( - mock.ANY, retry=mock.sentinel.custom_retry, timeout=mock.ANY + wrapper=mock.ANY, retry=mock.sentinel.custom_retry, timeout=mock.ANY ) - message = fake_sequencer.publish.call_args.args[0] + message = fake_sequencer.publish.call_args.kwargs["wrapper"].message assert message.data == b"hello!" @@ -464,9 +695,9 @@ def test_publish_custom_timeout_overrides_configured_timeout(creds): client.publish(topic, b"hello!", timeout=mock.sentinel.custom_timeout) fake_sequencer.publish.assert_called_once_with( - mock.ANY, retry=mock.ANY, timeout=mock.sentinel.custom_timeout + wrapper=mock.ANY, retry=mock.ANY, timeout=mock.sentinel.custom_timeout ) - message = fake_sequencer.publish.call_args.args[0] + message = fake_sequencer.publish.call_args.kwargs["wrapper"].message assert message.data == b"hello!" @@ -626,10 +857,16 @@ def test_publish_with_ordering_key(creds): # Check mock. batch.publish.assert_has_calls( [ - mock.call(gapic_types.PubsubMessage(data=b"spam", ordering_key="k1")), mock.call( - gapic_types.PubsubMessage( - data=b"foo", attributes={"bar": "baz"}, ordering_key="k1" + PublishMessageWrapper( + message=gapic_types.PubsubMessage(data=b"spam", ordering_key="k1") + ), + ), + mock.call( + PublishMessageWrapper( + message=gapic_types.PubsubMessage( + data=b"foo", attributes={"bar": "baz"}, ordering_key="k1" + ) ) ), ]