Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GAX-based _SubscriberAPI. #1795

Merged
merged 3 commits into from
May 16, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
273 changes: 273 additions & 0 deletions gcloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from google.gax.errors import GaxError
from google.gax.grpc import exc_to_code
from google.pubsub.v1.pubsub_pb2 import PubsubMessage
from google.pubsub.v1.pubsub_pb2 import PushConfig
from grpc.beta.interfaces import StatusCode
# pylint: enable=import-error

Expand Down Expand Up @@ -180,7 +181,279 @@ def topic_list_subscriptions(self, topic_path):
return subs, response.next_page_token


class _SubscriberAPI(object):
"""Helper mapping subscriber-related APIs.

:type gax_api: :class:`google.pubsub.v1.publisher_api.SubscriberApi`
:param gax_api: API object used to make GAX requests.
"""
def __init__(self, gax_api):
self._gax_api = gax_api

def list_subscriptions(self, project):
"""List subscriptions for the project associated with this API.

See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/list

This comment was marked as spam.

This comment was marked as spam.


:type project: string
:param project: project ID

:rtype: tuple, (list, str)
:returns: list of ``Subscription`` resource dicts, plus a
"next page token" string: if not None, indicates that

This comment was marked as spam.

This comment was marked as spam.

more topics can be retrieved with another call (pass that
value as ``page_token``).
"""
options = CallOptions(is_page_streaming=False)
path = 'projects/%s' % (project,)
response = self._gax_api.list_subscriptions(path, options)

This comment was marked as spam.

This comment was marked as spam.

subscriptions = [_subscription_pb_to_mapping(sub_pb)
for sub_pb in response.subscriptions]
return subscriptions, response.next_page_token

def subscription_create(self, subscription_path, topic_path,
ack_deadline=None, push_endpoint=None):
"""API call: create a subscription

See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/create

:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type topic_path: string
:param topic_path: the fully-qualified path of the topic being
subscribed, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.

:type ack_deadline: int, or ``NoneType``
:param ack_deadline: the deadline (in seconds) by which messages pulled
from the back-end must be acknowledged.

This comment was marked as spam.

This comment was marked as spam.


:type push_endpoint: string, or ``NoneType``
:param push_endpoint: URL to which messages will be pushed by the
back-end. If not set, the application must pull
messages.

:rtype: dict
:returns: ``Subscription`` resource returned from the API.
"""
if push_endpoint is not None:
push_config = PushConfig(push_endpoint=push_endpoint)
else:
push_config = None

if ack_deadline is None:
ack_deadline = 0

try:
sub_pb = self._gax_api.create_subscription(
subscription_path, topic_path, push_config, ack_deadline)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
raise Conflict(topic_path)
raise
return _subscription_pb_to_mapping(sub_pb)

def subscription_get(self, subscription_path):
"""API call: retrieve a subscription

See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/get

:type subscription_path: string
:param subscription_path: the fully-qualified path of the subscription,
in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:rtype: dict
:returns: ``Subscription`` resource returned from the API.
"""
try:
sub_pb = self._gax_api.get_subscription(subscription_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise
return _subscription_pb_to_mapping(sub_pb)

def subscription_delete(self, subscription_path):
"""API call: delete a subscription

See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/delete

:type subscription_path: string
:param subscription_path: the fully-qualified path of the subscription,
in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
"""
try:
self._gax_api.delete_subscription(subscription_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

def subscription_modify_push_config(self, subscription_path,
push_endpoint):
"""API call: update push config of a subscription

See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig

:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type push_endpoint: string, or ``NoneType``
:param push_endpoint: URL to which messages will be pushed by the
back-end. If not set, the application must pull
messages.
"""
push_config = PushConfig(push_endpoint=push_endpoint)
try:
self._gax_api.modify_push_config(subscription_path, push_config)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

def subscription_pull(self, subscription_path, return_immediately=False,
max_messages=1):
"""API call: retrieve messages for a subscription

See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig

:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type return_immediately: boolean
:param return_immediately: if True, the back-end returns even if no
messages are available; if False, the API
call blocks until one or more messages are
available.

:type max_messages: int
:param max_messages: the maximum number of messages to return.

:rtype: list of dict
:returns: the ``receivedMessages`` element of the response.
"""
try:
response_pb = self._gax_api.pull(
subscription_path, max_messages, return_immediately)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise
return [_received_message_pb_to_mapping(rmpb)
for rmpb in response_pb.received_messages]

def subscription_acknowledge(self, subscription_path, ack_ids):
"""API call: acknowledge retrieved messages

See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyPushConfig

:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type ack_ids: list of string
:param ack_ids: ack IDs of messages being acknowledged
"""
try:
self._gax_api.acknowledge(subscription_path, ack_ids)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
ack_deadline):
"""API call: update ack deadline for retrieved messages

See:
https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions/modifyAckDeadline

:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type ack_ids: list of string
:param ack_ids: ack IDs of messages being acknowledged

:type ack_deadline: int
:param ack_deadline: the deadline (in seconds) by which messages pulled
from the back-end must be acknowledged.
"""
try:
self._gax_api.modify_ack_deadline(
subscription_path, ack_ids, ack_deadline)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise


def _message_pb_from_dict(message):
"""Helper for :meth:`_PublisherAPI.topic_publish`."""
return PubsubMessage(data=_to_bytes(message['data']),
attributes=message['attributes'])


def _subscription_pb_to_mapping(sub_pb):
"""Helper for :meth:`list_subscriptions`, et aliae

Ideally, would use a function from :mod:`protobuf.json_format`, but
the right one isn't public. See:
https://github.com/google/protobuf/issues/1351
"""
mapping = {
'name': sub_pb.name,
'topic': sub_pb.topic,
'ack_deadline': sub_pb.ack_deadline,
}
if sub_pb.push_config.push_endpoint != '':

This comment was marked as spam.

This comment was marked as spam.

mapping['push_config'] = {
'push_endpoint': sub_pb.push_config.push_endpoint,
}
return mapping


def _message_pb_to_mapping(message_pb):
"""Helper for :meth:`pull`, et aliae

Ideally, would use a function from :mod:`protobuf.json_format`, but
the right one isn't public. See:
https://github.com/google/protobuf/issues/1351
"""
return {
'messageId': message_pb.message_id,
'data': message_pb.data,
'attributes': message_pb.attributes,
}


def _received_message_pb_to_mapping(received_message_pb):
"""Helper for :meth:`pull`, et aliae

Ideally, would use a function from :mod:`protobuf.json_format`, but
the right one isn't public. See:
https://github.com/google/protobuf/issues/1351
"""
return {
'ackId': received_message_pb.ack_id,
'message': _message_pb_to_mapping(
received_message_pb.message),
}
10 changes: 5 additions & 5 deletions gcloud/pubsub/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def subscription_create(self, subscription_path, topic_path,
:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<TOPIC_NAME>``.
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type topic_path: string
:param topic_path: the fully-qualified path of the topic being
Expand Down Expand Up @@ -373,7 +373,7 @@ def subscription_modify_push_config(self, subscription_path,
:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<TOPIC_NAME>``.
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type push_endpoint: string, or ``NoneType``
:param push_endpoint: URL to which messages will be pushed by the
Expand All @@ -395,7 +395,7 @@ def subscription_pull(self, subscription_path, return_immediately=False,
:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<TOPIC_NAME>``.
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type return_immediately: boolean
:param return_immediately: if True, the back-end returns even if no
Expand Down Expand Up @@ -427,7 +427,7 @@ def subscription_acknowledge(self, subscription_path, ack_ids):
:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<TOPIC_NAME>``.
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type ack_ids: list of string
:param ack_ids: ack IDs of messages being acknowledged
Expand All @@ -449,7 +449,7 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
:type subscription_path: string
:param subscription_path: the fully-qualified path of the new
subscription, in format
``projects/<PROJECT>/subscriptions/<TOPIC_NAME>``.
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type ack_ids: list of string
:param ack_ids: ack IDs of messages being acknowledged
Expand Down
Loading