Skip to content

Commit

Permalink
doc: Subsribe sample (#1260)
Browse files Browse the repository at this point in the history
  • Loading branch information
mukund-ananthu authored Sep 28, 2024
1 parent bc13ff0 commit 45a251d
Showing 1 changed file with 88 additions and 0 deletions.
88 changes: 88 additions & 0 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,94 @@ def list_subscriptions_in_project(project_id: str) -> None:
# [END pubsub_list_subscriptions]


def pubsub_subscribe_otel_tracing(
subscription_project_id: str,
cloud_trace_project_id: str,
subscription_id: str,
timeout: Optional[float] = None,
) -> None:
"""
Subscribe to `subscription_id` in `subscription_project_id` with OpenTelemetry enabled.
Export the OpenTelemetry traces to Google Cloud Trace in project
`trace_project_id`
Args:
subscription_project_id: project ID of the subscription.
cloud_trace_project_id: project ID to export Cloud Trace to.
subscription_id: subscription ID to subscribe from.
timeout: time until which to subscribe to.
Returns:
None
"""
# [START pubsub_subscribe_otel_tracing]
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
)
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import SubscriberClient
from google.cloud.pubsub_v1.types import SubscriberOptions

# TODO(developer)
# subscription_project_id = "your-subscription-project-id"
# subscription_id = "your-subscription-id"
# cloud_trace_project_id = "your-cloud-trace-project-id"
# timeout = 300.0

# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
# traces: https://cloud.google.com/trace/docs/setup/python-ot
# Choose and configure the exporter for your set up accordingly.

sampler = ParentBased(root=TraceIdRatioBased(1))
trace.set_tracer_provider(TracerProvider(sampler=sampler))

# Export to Google Trace
cloud_trace_exporter = CloudTraceSpanExporter(
project_id=cloud_trace_project_id,
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(cloud_trace_exporter)
)
# Set the `enable_open_telemetry_tracing` option to True when creating
# the subscriber client. This in itself is necessary and sufficient for
# the library to export OpenTelemetry traces. However, where the traces
# must be exported to needs to be configured based on your OpenTelemetry
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
subscriber = SubscriberClient(
subscriber_options=SubscriberOptions(enable_open_telemetry_tracing=True)
)

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(
subscription_project_id, subscription_id
)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
# Ack message after processing it.
print(message.data)
message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# Optimistically subscribe to messages on the subscription.
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
)
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
print("Successfully subscribed until the timeout passed.")
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.

# [END pubsub_subscribe_otel_tracing]


def create_subscription(project_id: str, topic_id: str, subscription_id: str) -> None:
"""Create a new pull subscription on the given topic."""
# [START pubsub_create_pull_subscription]
Expand Down

0 comments on commit 45a251d

Please sign in to comment.