diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index a367f181c04b..f079e7d423f8 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -52,10 +52,11 @@ def list_subscriptions_in_project(project_id): subscriber = pubsub_v1.SubscriberClient() project_path = subscriber.project_path(project_id) - for subscription in subscriber.list_subscriptions(project_path): - print(subscription.name) - - subscriber.close() + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + for subscription in subscriber.list_subscriptions(project_path): + print(subscription.name) # [END pubsub_list_subscriptions] @@ -73,11 +74,12 @@ def create_subscription(project_id, topic_id, subscription_id): topic_path = subscriber.topic_path(project_id, topic_id) subscription_path = subscriber.subscription_path(project_id, subscription_id) - subscription = subscriber.create_subscription(subscription_path, topic_path) + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + subscription = subscriber.create_subscription(subscription_path, topic_path) print("Subscription created: {}".format(subscription)) - - subscriber.close() # [END pubsub_create_pull_subscription] @@ -146,14 +148,15 @@ def create_push_subscription(project_id, topic_id, subscription_id, endpoint): push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) - subscription = subscriber.create_subscription( - subscription_path, topic_path, push_config - ) + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + subscription = subscriber.create_subscription( + subscription_path, topic_path, push_config + ) print("Push subscription created: {}".format(subscription)) print("Endpoint for subscription is: {}".format(endpoint)) - - subscriber.close() # [END pubsub_create_push_subscription] @@ -169,11 +172,12 @@ def delete_subscription(project_id, subscription_id): subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(project_id, subscription_id) - subscriber.delete_subscription(subscription_path) + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + subscriber.delete_subscription(subscription_path) print("Subscription deleted: {}".format(subscription_path)) - - subscriber.close() # [END pubsub_delete_subscription] @@ -203,12 +207,13 @@ def update_push_subscription(project_id, topic_id, subscription_id, endpoint): update_mask = {"paths": {"push_config"}} - result = subscriber.update_subscription(subscription, update_mask) + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + result = subscriber.update_subscription(subscription, update_mask) print("Subscription updated: {}".format(subscription_path)) print("New endpoint for subscription is: {}".format(result.push_config)) - - subscriber.close() # [END pubsub_update_push_configuration] @@ -436,24 +441,25 @@ def synchronous_pull(project_id, subscription_id): NUM_MESSAGES = 3 - # The subscriber pulls a specific number of messages. - response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + # The subscriber pulls a specific number of messages. + response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) - ack_ids = [] - for received_message in response.received_messages: - print("Received: {}".format(received_message.message.data)) - ack_ids.append(received_message.ack_id) + ack_ids = [] + for received_message in response.received_messages: + print("Received: {}".format(received_message.message.data)) + ack_ids.append(received_message.ack_id) - # Acknowledges the received messages so they will not be sent again. - subscriber.acknowledge(subscription_path, ack_ids) + # Acknowledges the received messages so they will not be sent again. + subscriber.acknowledge(subscription_path, ack_ids) - print( - "Received and acknowledged {} messages. Done.".format( - len(response.received_messages) + print( + "Received and acknowledged {} messages. Done.".format( + len(response.received_messages) + ) ) - ) - - subscriber.close() # [END pubsub_subscriber_sync_pull] @@ -539,6 +545,8 @@ def worker(msg): ) ) + # Close the underlying gPRC channel. Alternatively, wrap subscriber in + # a 'with' block to automatically call close() when done. subscriber.close() # [END pubsub_subscriber_sync_pull_with_lease]