Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pub/Sub snapshot and seek functionality #3303

Merged
merged 4 commits into from
Apr 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 152 additions & 7 deletions pubsub/google/cloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from google.cloud.iterator import GAXIterator
from google.cloud.pubsub import __version__
from google.cloud.pubsub._helpers import subscription_name_from_path
from google.cloud.pubsub.snapshot import Snapshot
from google.cloud.pubsub.subscription import Subscription
from google.cloud.pubsub.topic import Topic

Expand Down Expand Up @@ -136,10 +137,10 @@ def topic_delete(self, topic_path):
"""API call: delete a topic

See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.topics/create
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.topics/delete

:type topic_path: str
:param topic_path: fully-qualified path of the new topic, in format
:param topic_path: fully-qualified path of the topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
"""
try:
Expand Down Expand Up @@ -390,7 +391,7 @@ def subscription_modify_push_config(self, subscription_path,

:type subscription_path: str
:param subscription_path:
the fully-qualified path of the new subscription, in format
the fully-qualified path of the subscription to affect, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type push_endpoint: str
Expand All @@ -415,8 +416,8 @@ def subscription_pull(self, subscription_path, return_immediately=False,

:type subscription_path: str
:param subscription_path:
the fully-qualified path of the new subscription, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
the fully-qualified path of the subscription to pull from, in
format ``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type return_immediately: bool
:param return_immediately: if True, the back-end returns even if no
Expand Down Expand Up @@ -456,7 +457,7 @@ def subscription_acknowledge(self, subscription_path, ack_ids):

:type subscription_path: str
:param subscription_path:
the fully-qualified path of the new subscription, in format
the fully-qualified path of the subscription to affect, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type ack_ids: list of string
Expand All @@ -478,7 +479,7 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,

:type subscription_path: str
:param subscription_path:
the fully-qualified path of the new subscription, in format
the fully-qualified path of the subscription to affect, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type ack_ids: list of string
Expand All @@ -496,6 +497,120 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
raise NotFound(subscription_path)
raise

def subscription_seek(self, subscription_path, time=None, snapshot=None):
"""API call: seek a subscription

See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/seek

:type subscription_path: str
:param subscription_path::
the fully-qualified path of the subscription to affect, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type time: :class:`.timestamp_pb2.Timestamp`
:param time: The time to seek to.

:type snapshot: str
:param snapshot: The snapshot to seek to.
"""
try:
self._gax_api.seek(subscription_path, time=time, snapshot=snapshot)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

def list_snapshots(self, project, page_size=0, page_token=None):
"""List snapshots for the project associated with this API.

See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/list

:type project: str
:param project: project ID

:type page_size: int
:param page_size: maximum number of topics to return, If not passed,
defaults to a value set by the API.

:type page_token: str
:param page_token: opaque marker for the next "page" of topics. If not
passed, the API will return the first page of
topics.

:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of :class:`~google.cloud.pubsub.snapshot.Snapshot`
accessible to the current API.
"""
if page_token is None:
page_token = INITIAL_PAGE
options = CallOptions(page_token=page_token)
path = 'projects/%s' % (project,)
page_iter = self._gax_api.list_snapshots(
path, page_size=page_size, options=options)

# We attach a mutable topics dictionary so that as topic
# objects are created by Snapshot.from_api_repr, they
# can be re-used by other snapshots of the same topic.
topics = {}
item_to_value = functools.partial(
_item_to_snapshot_for_client, topics=topics)
return GAXIterator(self._client, page_iter, item_to_value)

def snapshot_create(self, snapshot_path, subscription_path):
"""API call: create a snapshot

See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/create

:type snapshot_path: str
:param snapshot_path: fully-qualified path of the snapshot, in format
``projects/<PROJECT>/snapshots/<SNAPSHOT_NAME>``.

:type subscription_path: str
:param subscription_path: fully-qualified path of the subscrption that
the new snapshot captures, in format
``projects/<PROJECT>/subscription/<SNAPSHOT_NAME>``.

:rtype: dict
:returns: ``Snapshot`` resource returned from the API.
:raises: :exc:`google.cloud.exceptions.Conflict` if the snapshot
already exists
:raises: :exc:`google.cloud.exceptions.NotFound` if the subscription
does not exist
"""
try:
snapshot_pb = self._gax_api.create_snapshot(
snapshot_path, subscription_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
raise Conflict(snapshot_path)
elif exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise
return MessageToDict(snapshot_pb)

def snapshot_delete(self, snapshot_path):
"""API call: delete a topic

See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/delete

:type snapshot_path: str
:param snapshot_path: fully-qualified path of the snapshot, in format
``projects/<PROJECT>/snapshots/<SNAPSHOT_NAME>``.

:raises: :exc:`google.cloud.exceptions.NotFound` if the snapshot does
not exist
"""
try:
self._gax_api.delete_snapshot(snapshot_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(snapshot_path)
raise


def _message_pb_from_mapping(message):
"""Helper for :meth:`_PublisherAPI.topic_publish`.
Expand Down Expand Up @@ -649,3 +764,33 @@ def _item_to_sub_for_client(iterator, sub_pb, topics):
resource = MessageToDict(sub_pb)
return Subscription.from_api_repr(
resource, iterator.client, topics=topics)


def _item_to_snapshot_for_client(iterator, snapshot_pb, topics):
"""Convert a subscription protobuf to the native object.

.. note::

This method does not have the correct signature to be used as
the ``item_to_value`` argument to
:class:`~google.cloud.iterator.Iterator`. It is intended to be
patched with a mutable topics argument that can be updated
on subsequent calls. For an example, see how the method is
used above in :meth:`_SubscriberAPI.list_snapshots`.

:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.

:type sub_pb: :class:`.pubsub_pb2.Snapshot`
:param sub_pb: A subscription returned from the API.

:type topics: dict
:param topics: A dictionary of topics to be used (and modified)
as new subscriptions are created bound to topics.

:rtype: :class:`~google.cloud.pubsub.subscription.Subscription`
:returns: The next subscription in the page.
"""
resource = MessageToDict(snapshot_pb)
return Snapshot.from_api_repr(
resource, iterator.client, topics=topics)
128 changes: 128 additions & 0 deletions pubsub/google/cloud/pubsub/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from google.cloud.pubsub import __version__
from google.cloud.pubsub._helpers import subscription_name_from_path
from google.cloud.pubsub.snapshot import Snapshot
from google.cloud.pubsub.subscription import Subscription
from google.cloud.pubsub.topic import Topic

Expand Down Expand Up @@ -492,6 +493,104 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
}
self.api_request(method='POST', path=path, data=data)

def subscription_seek(self, subscription_path, time=None, snapshot=None):
"""API call: seek a subscription

See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/seek

:type subscription_path: str
:param subscription_path::
the fully-qualified path of the subscription to affect, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.

:type time: str
:param time: The time to seek to, in RFC 3339 format.

:type snapshot: str
:param snapshot: The snapshot to seek to.
"""
path = '/%s:seek' % (subscription_path,)
data = {}
if time is not None:
data['time'] = time
if snapshot is not None:
data['snapshot'] = snapshot
self.api_request(method='POST', path=path, data=data)

def list_snapshots(self, project, page_size=None, page_token=None):
"""List snapshots for the project associated with this API.

See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/list

:type project: str
:param project: project ID

:type page_size: int
:param page_size: maximum number of topics to return, If not passed,
defaults to a value set by the API.

:type page_token: str
:param page_token: opaque marker for the next "page" of topics. If not
passed, the API will return the first page of
topics.

:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of :class:`~google.cloud.pubsub.snapshot.Snapshot`
accessible to the current API.
"""
extra_params = {}
if page_size is not None:
extra_params['pageSize'] = page_size
path = '/projects/%s/snapshots' % (project,)

# We attach a mutable topics dictionary so that as topic
# objects are created by Snapshot.from_api_repr, they
# can be re-used by other snapshots of the same topic.
topics = {}
item_to_value = functools.partial(
_item_to_snapshot_for_client, topics=topics)
return HTTPIterator(
client=self._client, path=path, item_to_value=item_to_value,
items_key='snapshots', page_token=page_token,
extra_params=extra_params)

def snapshot_create(self, snapshot_path, subscription_path):
"""API call: create a snapshot

See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/create

:type snapshot_path: str
:param snapshot_path: fully-qualified path of the snapshot, in format
``projects/<PROJECT>/snapshots/<SNAPSHOT_NAME>``.

:type subscription_path: str
:param subscription_path: fully-qualified path of the subscrption that
the new snapshot captures, in format
``projects/<PROJECT>/subscription/<SNAPSHOT_NAME>``.

:rtype: dict
:returns: ``Snapshot`` resource returned from the API.
"""
path = '/%s' % (snapshot_path,)
data = {'subscription': subscription_path}
return self.api_request(method='PUT', path=path, data=data)

def snapshot_delete(self, snapshot_path):
"""API call: delete a topic

See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/delete

:type snapshot_path: str
:param snapshot_path: fully-qualified path of the snapshot, in format
``projects/<PROJECT>/snapshots/<SNAPSHOT_NAME>``.
"""
path = '/%s' % (snapshot_path,)
self.api_request(method='DELETE', path=path)


class _IAMPolicyAPI(object):
"""Helper mapping IAM policy-related APIs.
Expand Down Expand Up @@ -652,3 +751,32 @@ def _item_to_sub_for_client(iterator, resource, topics):
"""
return Subscription.from_api_repr(
resource, iterator.client, topics=topics)


def _item_to_snapshot_for_client(iterator, resource, topics):
"""Convert a subscription to the native object.

.. note::

This method does not have the correct signature to be used as
the ``item_to_value`` argument to
:class:`~google.cloud.iterator.Iterator`. It is intended to be
patched with a mutable topics argument that can be updated
on subsequent calls. For an example, see how the method is
used above in :meth:`_SubscriberAPI.list_snapshots`.

:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.

:type resource: dict
:param resource: A subscription returned from the API.

:type topics: dict
:param topics: A dictionary of topics to be used (and modified)
as new subscriptions are created bound to topics.

:rtype: :class:`~google.cloud.pubsub.subscription.Subscription`
:returns: The next subscription in the page.
"""
return Snapshot.from_api_repr(
resource, iterator.client, topics=topics)
26 changes: 26 additions & 0 deletions pubsub/google/cloud/pubsub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,32 @@ def list_subscriptions(self, page_size=None, page_token=None):
return api.list_subscriptions(
self.project, page_size, page_token)

def list_snapshots(self, page_size=None, page_token=None):
"""List snapshots for the project associated with this API.

See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/list

:type project: str
:param project: project ID

:type page_size: int
:param page_size: maximum number of topics to return, If not passed,
defaults to a value set by the API.

:type page_token: str
:param page_token: opaque marker for the next "page" of topics. If not
passed, the API will return the first page of
topics.

:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of :class:`~google.cloud.pubsub.snapshot.Snapshot`
accessible to the current API.
"""
api = self.subscriber_api
return api.list_snapshots(
self.project, page_size, page_token)

def topic(self, name, timestamp_messages=False):
"""Creates a topic bound to the current client.

Expand Down
Loading