Skip to content

Commit

Permalink
Add Pub/Sub snapshot and seek functionality
Browse files Browse the repository at this point in the history
Notes:
- cloud.google.com links do not yet work
- SeekResponse exists but is empty, so corresponding methods return
  None instead
- It's not documented whether the deleted topic path applies to
  snapshots analogous to how it applies to subscriptions, but it seems
  logical to assume so.
- The new Subscription fields are part of this release but will come
  shortly as a separate PR. (They can be done independently of the
  snap-seek work.)
  • Loading branch information
geigerj committed Apr 17, 2017
1 parent 534ef81 commit c73c833
Show file tree
Hide file tree
Showing 11 changed files with 1,328 additions and 7 deletions.
159 changes: 152 additions & 7 deletions pubsub/google/cloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from google.cloud.iterator import GAXIterator
from google.cloud.pubsub import __version__
from google.cloud.pubsub._helpers import subscription_name_from_path
from google.cloud.pubsub.snapshot import Snapshot
from google.cloud.pubsub.subscription import Subscription
from google.cloud.pubsub.topic import Topic

Expand Down Expand Up @@ -135,10 +136,10 @@ def topic_delete(self, topic_path):
"""API call: delete a topic
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.topics/create
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.topics/delete
:type topic_path: str
:param topic_path: fully-qualified path of the new topic, in format
:param topic_path: fully-qualified path of the topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
"""
try:
Expand Down Expand Up @@ -372,7 +373,7 @@ def subscription_modify_push_config(self, subscription_path,
:type subscription_path: str
:param subscription_path:
the fully-qualified path of the new subscription, in format
the fully-qualified path of the subscription to affect, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type push_endpoint: str
Expand All @@ -397,8 +398,8 @@ def subscription_pull(self, subscription_path, return_immediately=False,
:type subscription_path: str
:param subscription_path:
the fully-qualified path of the new subscription, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
the fully-qualified path of the subscription to pull from, in
format ``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type return_immediately: bool
:param return_immediately: if True, the back-end returns even if no
Expand Down Expand Up @@ -438,7 +439,7 @@ def subscription_acknowledge(self, subscription_path, ack_ids):
:type subscription_path: str
:param subscription_path:
the fully-qualified path of the new subscription, in format
the fully-qualified path of the subscription to affect, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type ack_ids: list of string
Expand All @@ -460,7 +461,7 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
:type subscription_path: str
:param subscription_path:
the fully-qualified path of the new subscription, in format
the fully-qualified path of the subscription to affect, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type ack_ids: list of string
Expand All @@ -478,6 +479,120 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
raise NotFound(subscription_path)
raise

def subscription_seek(self, subscription_path, time=None, snapshot=None):
"""API call: seek a subscription
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/seek
:type subscription_path: str
:param subscription_path::
the fully-qualified path of the subscription to affect, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type time: :class:`.timestamp_pb2.Timestamp`
:param time: The time to seek to.
:type snapshot: str
:param snapshot: The snapshot to seek to.
"""
try:
self._gax_api.seek(subscription_path, time=time, snapshot=snapshot)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

def list_snapshots(self, project, page_size=0, page_token=None):
"""List snapshots for the project associated with this API.
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/list
:type project: str
:param project: project ID
:type page_size: int
:param page_size: maximum number of topics to return, If not passed,
defaults to a value set by the API.
:type page_token: str
:param page_token: opaque marker for the next "page" of topics. If not
passed, the API will return the first page of
topics.
:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of :class:`~google.cloud.pubsub.snapshot.Snapshot`
accessible to the current API.
"""
if page_token is None:
page_token = INITIAL_PAGE
options = CallOptions(page_token=page_token)
path = 'projects/%s' % (project,)
page_iter = self._gax_api.list_snapshots(
path, page_size=page_size, options=options)

# We attach a mutable topics dictionary so that as topic
# objects are created by Snapshot.from_api_repr, they
# can be re-used by other snapshots of the same topic.
topics = {}
item_to_value = functools.partial(
_item_to_snapshot_for_client, topics=topics)
return GAXIterator(self._client, page_iter, item_to_value)

def snapshot_create(self, snapshot_path, subscription_path):
"""API call: create a snapshot
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/create
:type snapshot_path: str
:param snapshot_path: fully-qualified path of the snapshot, in format
``projects/<PROJECT>/snapshots/<SNAPSHOT_NAME>``.
:type subscription_path: str
:param subscription_path: fully-qualified path of the subscrption that
the new snapshot captures, in format
``projects/<PROJECT>/subscription/<SNAPSHOT_NAME>``.
:rtype: dict
:returns: ``Snapshot`` resource returned from the API.
:raises: :exc:`google.cloud.exceptions.Conflict` if the snapshot
already exists
:raises: :exc:`google.cloud.exceptions.NotFound` if the subscription
does not exist
"""
try:
snapshot_pb = self._gax_api.create_snapshot(
snapshot_path, subscription_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
raise Conflict(snapshot_path)
elif exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise
return MessageToDict(snapshot_pb)

def snapshot_delete(self, snapshot_path):
"""API call: delete a topic
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/delete
:type snapshot_path: str
:param snapshot_path: fully-qualified path of the snapshot, in format
``projects/<PROJECT>/snapshots/<SNAPSHOT_NAME>``.
:raises: :exc:`google.cloud.exceptions.NotFound` if the snapshot does
not exist
"""
try:
self._gax_api.delete_snapshot(snapshot_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(snapshot_path)
raise


def _message_pb_from_mapping(message):
"""Helper for :meth:`_PublisherAPI.topic_publish`.
Expand Down Expand Up @@ -631,3 +746,33 @@ def _item_to_sub_for_client(iterator, sub_pb, topics):
resource = MessageToDict(sub_pb)
return Subscription.from_api_repr(
resource, iterator.client, topics=topics)


def _item_to_snapshot_for_client(iterator, snapshot_pb, topics):
"""Convert a subscription protobuf to the native object.
.. note::
This method does not have the correct signature to be used as
the ``item_to_value`` argument to
:class:`~google.cloud.iterator.Iterator`. It is intended to be
patched with a mutable topics argument that can be updated
on subsequent calls. For an example, see how the method is
used above in :meth:`_SubscriberAPI.list_snapshots`.
:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.
:type sub_pb: :class:`.pubsub_pb2.Snapshot`
:param sub_pb: A subscription returned from the API.
:type topics: dict
:param topics: A dictionary of topics to be used (and modified)
as new subscriptions are created bound to topics.
:rtype: :class:`~google.cloud.pubsub.subscription.Subscription`
:returns: The next subscription in the page.
"""
resource = MessageToDict(snapshot_pb)
return Snapshot.from_api_repr(
resource, iterator.client, topics=topics)
128 changes: 128 additions & 0 deletions pubsub/google/cloud/pubsub/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from google.cloud.pubsub import __version__
from google.cloud.pubsub._helpers import subscription_name_from_path
from google.cloud.pubsub.snapshot import Snapshot
from google.cloud.pubsub.subscription import Subscription
from google.cloud.pubsub.topic import Topic

Expand Down Expand Up @@ -467,6 +468,104 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
}
self.api_request(method='POST', path=path, data=data)

def subscription_seek(self, subscription_path, time=None, snapshot=None):
"""API call: seek a subscription
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/seek
:type subscription_path: str
:param subscription_path::
the fully-qualified path of the subscription to affect, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type time: str
:param time: The time to seek to, in RFC 3339 format.
:type snapshot: str
:param snapshot: The snapshot to seek to.
"""
path = '/%s:seek' % (subscription_path,)
data = {}
if time is not None:
data['time'] = time
elif snapshot is not None:
data['snapshot'] = snapshot
self.api_request(method='POST', path=path, data=data)

def list_snapshots(self, project, page_size=None, page_token=None):
"""List snapshots for the project associated with this API.
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/list
:type project: str
:param project: project ID
:type page_size: int
:param page_size: maximum number of topics to return, If not passed,
defaults to a value set by the API.
:type page_token: str
:param page_token: opaque marker for the next "page" of topics. If not
passed, the API will return the first page of
topics.
:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of :class:`~google.cloud.pubsub.snapshot.Snapshot`
accessible to the current API.
"""
extra_params = {}
if page_size is not None:
extra_params['pageSize'] = page_size
path = '/projects/%s/snapshots' % (project,)

# We attach a mutable topics dictionary so that as topic
# objects are created by Snapshot.from_api_repr, they
# can be re-used by other snapshots of the same topic.
topics = {}
item_to_value = functools.partial(
_item_to_snapshot_for_client, topics=topics)
return HTTPIterator(
client=self._client, path=path, item_to_value=item_to_value,
items_key='snapshots', page_token=page_token,
extra_params=extra_params)

def snapshot_create(self, snapshot_path, subscription_path):
"""API call: create a snapshot
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/create
:type snapshot_path: str
:param snapshot_path: fully-qualified path of the snapshot, in format
``projects/<PROJECT>/snapshots/<SNAPSHOT_NAME>``.
:type subscription_path: str
:param subscription_path: fully-qualified path of the subscrption that
the new snapshot captures, in format
``projects/<PROJECT>/subscription/<SNAPSHOT_NAME>``.
:rtype: dict
:returns: ``Snapshot`` resource returned from the API.
"""
path = '/%s' % (snapshot_path,)
data = {'subscription': subscription_path}
return self.api_request(method='PUT', path=path, data=data)

def snapshot_delete(self, snapshot_path):
"""API call: delete a topic
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/delete
:type snapshot_path: str
:param snapshot_path: fully-qualified path of the snapshot, in format
``projects/<PROJECT>/snapshots/<SNAPSHOT_NAME>``.
"""
path = '/%s' % (snapshot_path,)
self.api_request(method='DELETE', path=path)


class _IAMPolicyAPI(object):
"""Helper mapping IAM policy-related APIs.
Expand Down Expand Up @@ -627,3 +726,32 @@ def _item_to_sub_for_client(iterator, resource, topics):
"""
return Subscription.from_api_repr(
resource, iterator.client, topics=topics)


def _item_to_snapshot_for_client(iterator, resource, topics):
"""Convert a subscription to the native object.
.. note::
This method does not have the correct signature to be used as
the ``item_to_value`` argument to
:class:`~google.cloud.iterator.Iterator`. It is intended to be
patched with a mutable topics argument that can be updated
on subsequent calls. For an example, see how the method is
used above in :meth:`_SubscriberAPI.list_snapshots`.
:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.
:type resource: dict
:param resource: A subscription returned from the API.
:type topics: dict
:param topics: A dictionary of topics to be used (and modified)
as new subscriptions are created bound to topics.
:rtype: :class:`~google.cloud.pubsub.subscription.Subscription`
:returns: The next subscription in the page.
"""
return Snapshot.from_api_repr(
resource, iterator.client, topics=topics)
26 changes: 26 additions & 0 deletions pubsub/google/cloud/pubsub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,32 @@ def list_subscriptions(self, page_size=None, page_token=None):
return api.list_subscriptions(
self.project, page_size, page_token)

def list_snapshots(self, page_size=None, page_token=None):
"""List snapshots for the project associated with this API.
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/list
:type project: str
:param project: project ID
:type page_size: int
:param page_size: maximum number of topics to return, If not passed,
defaults to a value set by the API.
:type page_token: str
:param page_token: opaque marker for the next "page" of topics. If not
passed, the API will return the first page of
topics.
:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of :class:`~google.cloud.pubsub.snapshot.Snapshot`
accessible to the current API.
"""
api = self.subscriber_api
return api.list_snapshots(
self.project, page_size, page_token)

def topic(self, name, timestamp_messages=False):
"""Creates a topic bound to the current client.
Expand Down
Loading

0 comments on commit c73c833

Please sign in to comment.