Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub: adds region tags and updates existing to standard #1491

Merged
merged 1 commit into from
May 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pubsub/cloud-client/iam.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

def get_topic_policy(project, topic_name):
"""Prints the IAM policy for the given topic."""
# [START pubsub_get_topic_policy]
client = pubsub_v1.PublisherClient()
topic_path = client.topic_path(project, topic_name)

Expand All @@ -36,10 +37,12 @@ def get_topic_policy(project, topic_name):
print('Policy for topic {}:'.format(topic_path))
for binding in policy.bindings:
print('Role: {}, Members: {}'.format(binding.role, binding.members))
# [END pubsub_get_topic_policy]


def get_subscription_policy(project, subscription_name):
"""Prints the IAM policy for the given subscription."""
# [START pubsub_get_subscription_policy]
client = pubsub_v1.SubscriberClient()
subscription_path = client.subscription_path(project, subscription_name)

Expand All @@ -48,10 +51,12 @@ def get_subscription_policy(project, subscription_name):
print('Policy for subscription {}:'.format(subscription_path))
for binding in policy.bindings:
print('Role: {}, Members: {}'.format(binding.role, binding.members))
# [END pubsub_get_subscription_policy]


def set_topic_policy(project, topic_name):
"""Sets the IAM policy for a topic."""
# [START pubsub_set_topic_policy]
client = pubsub_v1.PublisherClient()
topic_path = client.topic_path(project, topic_name)

Expand All @@ -72,10 +77,12 @@ def set_topic_policy(project, topic_name):

print('IAM policy for topic {} set: {}'.format(
topic_name, policy))
# [END pubsub_set_topic_policy]


def set_subscription_policy(project, subscription_name):
"""Sets the IAM policy for a topic."""
# [START pubsub_set_subscription_policy]
client = pubsub_v1.SubscriberClient()
subscription_path = client.subscription_path(project, subscription_name)

Expand All @@ -96,10 +103,12 @@ def set_subscription_policy(project, subscription_name):

print('IAM policy for subscription {} set: {}'.format(
subscription_name, policy))
# [END pubsub_set_subscription_policy]


def check_topic_permissions(project, topic_name):
"""Checks to which permissions are available on the given topic."""
# [START pubsub_test_topic_permissions]
client = pubsub_v1.PublisherClient()
topic_path = client.topic_path(project, topic_name)

Expand All @@ -113,10 +122,12 @@ def check_topic_permissions(project, topic_name):

print('Allowed permissions for topic {}: {}'.format(
topic_path, allowed_permissions))
# [END pubsub_test_topic_permissions]


def check_subscription_permissions(project, subscription_name):
"""Checks to which permissions are available on the given subscription."""
# [START pubsub_test_subscription_permissions]
client = pubsub_v1.SubscriberClient()
subscription_path = client.subscription_path(project, subscription_name)

Expand All @@ -130,6 +141,7 @@ def check_subscription_permissions(project, subscription_name):

print('Allowed permissions for subscription {}: {}'.format(
subscription_path, allowed_permissions))
# [END pubsub_test_subscription_permissions]


if __name__ == '__main__':
Expand Down
14 changes: 14 additions & 0 deletions pubsub/cloud-client/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,42 @@

def list_topics(project):
"""Lists all Pub/Sub topics in the given project."""
# [START pubsub_list_topics]
publisher = pubsub_v1.PublisherClient()
project_path = publisher.project_path(project)

for topic in publisher.list_topics(project_path):
print(topic)
# [END pubsub_list_topics]


def create_topic(project, topic_name):
"""Create a new Pub/Sub topic."""
# [START pubsub_create_topic]
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

topic = publisher.create_topic(topic_path)

print('Topic created: {}'.format(topic))
# [END pubsub_create_topic]


def delete_topic(project, topic_name):
"""Deletes an existing Pub/Sub topic."""
# [START pubsub_delete_topic]
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

publisher.delete_topic(topic_path)

print('Topic deleted: {}'.format(topic_path))
# [END pubsub_delete_topic]


def publish_messages(project, topic_name):
"""Publishes multiple messages to a Pub/Sub topic."""
# [START pubsub_quickstart_publisher]
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

Expand All @@ -67,11 +74,13 @@ def publish_messages(project, topic_name):
publisher.publish(topic_path, data=data)

print('Published messages.')
# [END pubsub_quickstart_publisher]


def publish_messages_with_custom_attributes(project, topic_name):
"""Publishes multiple messages with custom attributes
to a Pub/Sub topic."""
# [START pubsub_publish_custom_attributes]
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

Expand All @@ -84,11 +93,13 @@ def publish_messages_with_custom_attributes(project, topic_name):
topic_path, data, origin='python-sample', username='gcp')

print('Published messages with custom attributes.')
# [END pubsub_publish_custom_attributes]


def publish_messages_with_futures(project, topic_name):
"""Publishes multiple messages to a Pub/Sub topic and prints their
message IDs."""
# [START pubsub_publisher_concurrency_control]
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

Expand All @@ -107,10 +118,12 @@ def publish_messages_with_futures(project, topic_name):
for future in futures:
# result() blocks until the message is published.
print(future.result())
# [END pubsub_publisher_concurrency_control]


def publish_messages_with_batch_settings(project, topic_name):
"""Publishes multiple messages to a Pub/Sub topic with batch settings."""
# [START pubsub_publisher_batch_settings]
# Configure the batch to publish once there is one kilobyte of data or
# 1 second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
Expand All @@ -127,6 +140,7 @@ def publish_messages_with_batch_settings(project, topic_name):
publisher.publish(topic_path, data=data)

print('Published messages.')
# [END pubsub_publisher_batch_settings]


if __name__ == '__main__':
Expand Down
4 changes: 2 additions & 2 deletions pubsub/cloud-client/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


def run_quickstart():
# [START pubsub_quickstart]
# [START pubsub_quickstart_create_topic]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you've locked the docs via the branch parameter already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't locked them. I'm waiting to merge all the PRs at once when they're all approved, and simultaneously update the docs.

# Imports the Google Cloud client library
from google.cloud import pubsub_v1

Expand All @@ -32,7 +32,7 @@ def run_quickstart():
topic = publisher.create_topic(topic_path)

print('Topic created: {}'.format(topic))
# [END pubsub_quickstart]
# [END pubsub_quickstart_create_topic]


if __name__ == '__main__':
Expand Down
32 changes: 26 additions & 6 deletions pubsub/cloud-client/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,29 @@

def list_subscriptions_in_topic(project, topic_name):
"""Lists all subscriptions for a given topic."""
# [START pubsub_list_topic_subscriptions]
subscriber = pubsub_v1.PublisherClient()
topic_path = subscriber.topic_path(project, topic_name)

for subscription in subscriber.list_topic_subscriptions(topic_path):
print(subscription)
# [END pubsub_list_topic_subscriptions]


def list_subscriptions_in_project(project):
"""Lists all subscriptions in the current project."""
# [START pubsub_list_subscriptions]
subscriber = pubsub_v1.SubscriberClient()
project_path = subscriber.project_path(project)

for subscription in subscriber.list_subscriptions(project_path):
print(subscription.name)
# [END pubsub_list_subscriptions]


def create_subscription(project, topic_name, subscription_name):
"""Create a new pull subscription on the given topic."""
# [START pubsub_create_pull_subscription]
subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path(project, topic_name)
subscription_path = subscriber.subscription_path(
Expand All @@ -56,16 +61,16 @@ def create_subscription(project, topic_name, subscription_name):
subscription_path, topic_path)

print('Subscription created: {}'.format(subscription))
# [END pubsub_create_pull_subscription]


def create_push_subscription(project,
topic_name,
subscription_name,
endpoint):
"""Create a new push subscription on the given topic.
For example, endpoint is
"https://my-test-project.appspot.com/push".
"""
"""Create a new push subscription on the given topic."""
# [START pubsub_create_push_subscription]
# endpoint = "https://my-test-project.appspot.com/push"
subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path(project, topic_name)
subscription_path = subscriber.subscription_path(
Expand All @@ -79,26 +84,30 @@ def create_push_subscription(project,

print('Push subscription created: {}'.format(subscription))
print('Endpoint for subscription is: {}'.format(endpoint))
# [END pubsub_create_push_subscription]


def delete_subscription(project, subscription_name):
"""Deletes an existing Pub/Sub topic."""
# [START pubsub_delete_subscription]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)

subscriber.delete_subscription(subscription_path)

print('Subscription deleted: {}'.format(subscription_path))
# [END pubsub_delete_subscription]


def update_subscription(project, subscription_name, endpoint):
"""
Updates an existing Pub/Sub subscription's push endpoint URL.
Note that certain properties of a subscription, such as
its topic, are not modifiable. For example, endpoint is
"https://my-test-project.appspot.com/push".
its topic, are not modifiable.
"""
# [START pubsub_update_push_configuration]
# endpoint = "https://my-test-project.appspot.com/push"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Expand All @@ -122,10 +131,13 @@ def update_subscription(project, subscription_name, endpoint):
print('Subscription updated: {}'.format(subscription_path))
print('New endpoint for subscription is: {}'.format(
result.push_config))
# [END pubsub_update_push_configuration]


def receive_messages(project, subscription_name):
"""Receives messages from a pull subscription."""
# [START pubsub_subscriber_async_pull]
# [START pubsub_quickstart_subscriber]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Expand All @@ -141,10 +153,13 @@ def callback(message):
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
# [END pubsub_subscriber_async_pull]
# [END pubsub_quickstart_subscriber]


def receive_messages_with_custom_attributes(project, subscription_name):
"""Receives messages from a pull subscription."""
# [START pubsub_subscriber_sync_pull_custom_attributes]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Expand All @@ -165,10 +180,12 @@ def callback(message):
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
# [END pubsub_subscriber_sync_pull_custom_attributes]


def receive_messages_with_flow_control(project, subscription_name):
"""Receives messages from a pull subscription with flow control."""
# [START pubsub_subscriber_flow_settings]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Expand All @@ -187,10 +204,12 @@ def callback(message):
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
# [END pubsub_subscriber_flow_settings]


def listen_for_errors(project, subscription_name):
"""Receives messages and catches errors from a pull subscription."""
# [START pubsub_subscriber_error_listener]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
Expand All @@ -210,6 +229,7 @@ def callback(message):
'Listening for messages on {} threw an Exception: {}.'.format(
subscription_name, e))
raise
# [END pubsub_subscriber_error_listener]


if __name__ == '__main__':
Expand Down