From 45a251d7da147c157711b6c4f3e4a8dac8fa7c0d Mon Sep 17 00:00:00 2001 From: mukund-ananthu <83691193+mukund-ananthu@users.noreply.github.com> Date: Sat, 28 Sep 2024 12:43:34 -0400 Subject: [PATCH] doc: Subsribe sample (#1260) --- samples/snippets/subscriber.py | 88 ++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index 79cd0ebf1..7931b31cb 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -68,6 +68,94 @@ def list_subscriptions_in_project(project_id: str) -> None: # [END pubsub_list_subscriptions] +def pubsub_subscribe_otel_tracing( + subscription_project_id: str, + cloud_trace_project_id: str, + subscription_id: str, + timeout: Optional[float] = None, +) -> None: + """ + Subscribe to `subscription_id` in `subscription_project_id` with OpenTelemetry enabled. + Export the OpenTelemetry traces to Google Cloud Trace in project + `trace_project_id` + Args: + subscription_project_id: project ID of the subscription. + cloud_trace_project_id: project ID to export Cloud Trace to. + subscription_id: subscription ID to subscribe from. + timeout: time until which to subscribe to. + Returns: + None + """ + # [START pubsub_subscribe_otel_tracing] + from opentelemetry import trace + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, + ) + from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter + from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased + + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1 import SubscriberClient + from google.cloud.pubsub_v1.types import SubscriberOptions + + # TODO(developer) + # subscription_project_id = "your-subscription-project-id" + # subscription_id = "your-subscription-id" + # cloud_trace_project_id = "your-cloud-trace-project-id" + # timeout = 300.0 + + # In this sample, we use a Google Cloud Trace to export the OpenTelemetry + # traces: https://cloud.google.com/trace/docs/setup/python-ot + # Choose and configure the exporter for your set up accordingly. + + sampler = ParentBased(root=TraceIdRatioBased(1)) + trace.set_tracer_provider(TracerProvider(sampler=sampler)) + + # Export to Google Trace + cloud_trace_exporter = CloudTraceSpanExporter( + project_id=cloud_trace_project_id, + ) + trace.get_tracer_provider().add_span_processor( + BatchSpanProcessor(cloud_trace_exporter) + ) + # Set the `enable_open_telemetry_tracing` option to True when creating + # the subscriber client. This in itself is necessary and sufficient for + # the library to export OpenTelemetry traces. However, where the traces + # must be exported to needs to be configured based on your OpenTelemetry + # set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/ + subscriber = SubscriberClient( + subscriber_options=SubscriberOptions(enable_open_telemetry_tracing=True) + ) + + # The `subscription_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/subscriptions/{subscription_id}` + subscription_path = subscriber.subscription_path( + subscription_project_id, subscription_id + ) + + # Define callback to be called when a message is received. + def callback(message: pubsub_v1.subscriber.message.Message) -> None: + # Ack message after processing it. + print(message.data) + message.ack() + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # Optimistically subscribe to messages on the subscription. + streaming_pull_future = subscriber.subscribe( + subscription_path, callback=callback + ) + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + print("Successfully subscribed until the timeout passed.") + streaming_pull_future.cancel() # Trigger the shutdown. + streaming_pull_future.result() # Block until the shutdown is complete. + + # [END pubsub_subscribe_otel_tracing] + + def create_subscription(project_id: str, topic_id: str, subscription_id: str) -> None: """Create a new pull subscription on the given topic.""" # [START pubsub_create_pull_subscription]