Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub: wrap subscriber in a with block and add comments #4070

Merged
merged 6 commits into from
Jun 12, 2020
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 40 additions & 32 deletions pubsub/cloud-client/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ def list_subscriptions_in_project(project_id):
subscriber = pubsub_v1.SubscriberClient()
project_path = subscriber.project_path(project_id)

for subscription in subscriber.list_subscriptions(project_path):
print(subscription.name)

subscriber.close()
# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underslying gRPC channel when done.
with subscriber:
for subscription in subscriber.list_subscriptions(project_path):
print(subscription.name)
# [END pubsub_list_subscriptions]


Expand All @@ -73,11 +74,12 @@ def create_subscription(project_id, topic_id, subscription_id):
topic_path = subscriber.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

subscription = subscriber.create_subscription(subscription_path, topic_path)
# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underslying gRPC channel when done.
with subscriber:
subscription = subscriber.create_subscription(subscription_path, topic_path)

print("Subscription created: {}".format(subscription))

subscriber.close()
# [END pubsub_create_pull_subscription]


Expand Down Expand Up @@ -146,14 +148,15 @@ def create_push_subscription(project_id, topic_id, subscription_id, endpoint):

push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint)

subscription = subscriber.create_subscription(
subscription_path, topic_path, push_config
)
# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underslying gRPC channel when done.
with subscriber:
subscription = subscriber.create_subscription(
subscription_path, topic_path, push_config
)

print("Push subscription created: {}".format(subscription))
print("Endpoint for subscription is: {}".format(endpoint))

subscriber.close()
# [END pubsub_create_push_subscription]


Expand All @@ -169,11 +172,12 @@ def delete_subscription(project_id, subscription_id):
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

subscriber.delete_subscription(subscription_path)
# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underslying gRPC channel when done.
with subscriber:
subscriber.delete_subscription(subscription_path)

print("Subscription deleted: {}".format(subscription_path))

subscriber.close()
# [END pubsub_delete_subscription]


Expand Down Expand Up @@ -203,12 +207,13 @@ def update_push_subscription(project_id, topic_id, subscription_id, endpoint):

update_mask = {"paths": {"push_config"}}

result = subscriber.update_subscription(subscription, update_mask)
# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underslying gRPC channel when done.
with subscriber:
result = subscriber.update_subscription(subscription, update_mask)

print("Subscription updated: {}".format(subscription_path))
print("New endpoint for subscription is: {}".format(result.push_config))

subscriber.close()
# [END pubsub_update_push_configuration]


Expand Down Expand Up @@ -436,24 +441,25 @@ def synchronous_pull(project_id, subscription_id):

NUM_MESSAGES = 3

# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)
# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underslying gRPC channel when done.
with subscriber:
# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

ack_ids = []
for received_message in response.received_messages:
print("Received: {}".format(received_message.message.data))
ack_ids.append(received_message.ack_id)
ack_ids = []
for received_message in response.received_messages:
print("Received: {}".format(received_message.message.data))
ack_ids.append(received_message.ack_id)

# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(subscription_path, ack_ids)
# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(subscription_path, ack_ids)

print(
"Received and acknowledged {} messages. Done.".format(
len(response.received_messages)
print(
"Received and acknowledged {} messages. Done.".format(
len(response.received_messages)
)
)
)

subscriber.close()
# [END pubsub_subscriber_sync_pull]


Expand Down Expand Up @@ -539,6 +545,8 @@ def worker(msg):
)
)

# Close the underlying gPRC channel. Alternatively, wrap subscriber in
# a 'with' block to automatically call close() when done.
subscriber.close()
# [END pubsub_subscriber_sync_pull_with_lease]

Expand Down