Skip to content

Commit

Permalink
Merge pull request #3303 from geigerj/snap-seek2
Browse files Browse the repository at this point in the history
Add Pub/Sub snapshot and seek functionality
  • Loading branch information
geigerj authored Apr 19, 2017
2 parents d77b70d + c72acfa commit 5fa507f
Show file tree
Hide file tree
Showing 11 changed files with 1,327 additions and 9 deletions.
159 changes: 152 additions & 7 deletions pubsub/google/cloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from google.cloud.iterator import GAXIterator
from google.cloud.pubsub import __version__
from google.cloud.pubsub._helpers import subscription_name_from_path
from google.cloud.pubsub.snapshot import Snapshot
from google.cloud.pubsub.subscription import Subscription
from google.cloud.pubsub.topic import Topic

Expand Down Expand Up @@ -136,10 +137,10 @@ def topic_delete(self, topic_path):
"""API call: delete a topic
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.topics/create
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.topics/delete
:type topic_path: str
:param topic_path: fully-qualified path of the new topic, in format
:param topic_path: fully-qualified path of the topic, in format
``projects/<PROJECT>/topics/<TOPIC_NAME>``.
"""
try:
Expand Down Expand Up @@ -390,7 +391,7 @@ def subscription_modify_push_config(self, subscription_path,
:type subscription_path: str
:param subscription_path:
the fully-qualified path of the new subscription, in format
the fully-qualified path of the subscription to affect, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type push_endpoint: str
Expand All @@ -415,8 +416,8 @@ def subscription_pull(self, subscription_path, return_immediately=False,
:type subscription_path: str
:param subscription_path:
the fully-qualified path of the new subscription, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
the fully-qualified path of the subscription to pull from, in
format ``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type return_immediately: bool
:param return_immediately: if True, the back-end returns even if no
Expand Down Expand Up @@ -456,7 +457,7 @@ def subscription_acknowledge(self, subscription_path, ack_ids):
:type subscription_path: str
:param subscription_path:
the fully-qualified path of the new subscription, in format
the fully-qualified path of the subscription to affect, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type ack_ids: list of string
Expand All @@ -478,7 +479,7 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
:type subscription_path: str
:param subscription_path:
the fully-qualified path of the new subscription, in format
the fully-qualified path of the subscription to affect, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type ack_ids: list of string
Expand All @@ -496,6 +497,120 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
raise NotFound(subscription_path)
raise

def subscription_seek(self, subscription_path, time=None, snapshot=None):
"""API call: seek a subscription
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/seek
:type subscription_path: str
:param subscription_path::
the fully-qualified path of the subscription to affect, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type time: :class:`.timestamp_pb2.Timestamp`
:param time: The time to seek to.
:type snapshot: str
:param snapshot: The snapshot to seek to.
"""
try:
self._gax_api.seek(subscription_path, time=time, snapshot=snapshot)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise

def list_snapshots(self, project, page_size=0, page_token=None):
"""List snapshots for the project associated with this API.
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/list
:type project: str
:param project: project ID
:type page_size: int
:param page_size: maximum number of topics to return, If not passed,
defaults to a value set by the API.
:type page_token: str
:param page_token: opaque marker for the next "page" of topics. If not
passed, the API will return the first page of
topics.
:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of :class:`~google.cloud.pubsub.snapshot.Snapshot`
accessible to the current API.
"""
if page_token is None:
page_token = INITIAL_PAGE
options = CallOptions(page_token=page_token)
path = 'projects/%s' % (project,)
page_iter = self._gax_api.list_snapshots(
path, page_size=page_size, options=options)

# We attach a mutable topics dictionary so that as topic
# objects are created by Snapshot.from_api_repr, they
# can be re-used by other snapshots of the same topic.
topics = {}
item_to_value = functools.partial(
_item_to_snapshot_for_client, topics=topics)
return GAXIterator(self._client, page_iter, item_to_value)

def snapshot_create(self, snapshot_path, subscription_path):
"""API call: create a snapshot
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/create
:type snapshot_path: str
:param snapshot_path: fully-qualified path of the snapshot, in format
``projects/<PROJECT>/snapshots/<SNAPSHOT_NAME>``.
:type subscription_path: str
:param subscription_path: fully-qualified path of the subscrption that
the new snapshot captures, in format
``projects/<PROJECT>/subscription/<SNAPSHOT_NAME>``.
:rtype: dict
:returns: ``Snapshot`` resource returned from the API.
:raises: :exc:`google.cloud.exceptions.Conflict` if the snapshot
already exists
:raises: :exc:`google.cloud.exceptions.NotFound` if the subscription
does not exist
"""
try:
snapshot_pb = self._gax_api.create_snapshot(
snapshot_path, subscription_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION:
raise Conflict(snapshot_path)
elif exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(subscription_path)
raise
return MessageToDict(snapshot_pb)

def snapshot_delete(self, snapshot_path):
"""API call: delete a topic
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/delete
:type snapshot_path: str
:param snapshot_path: fully-qualified path of the snapshot, in format
``projects/<PROJECT>/snapshots/<SNAPSHOT_NAME>``.
:raises: :exc:`google.cloud.exceptions.NotFound` if the snapshot does
not exist
"""
try:
self._gax_api.delete_snapshot(snapshot_path)
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(snapshot_path)
raise


def _message_pb_from_mapping(message):
"""Helper for :meth:`_PublisherAPI.topic_publish`.
Expand Down Expand Up @@ -649,3 +764,33 @@ def _item_to_sub_for_client(iterator, sub_pb, topics):
resource = MessageToDict(sub_pb)
return Subscription.from_api_repr(
resource, iterator.client, topics=topics)


def _item_to_snapshot_for_client(iterator, snapshot_pb, topics):
"""Convert a subscription protobuf to the native object.
.. note::
This method does not have the correct signature to be used as
the ``item_to_value`` argument to
:class:`~google.cloud.iterator.Iterator`. It is intended to be
patched with a mutable topics argument that can be updated
on subsequent calls. For an example, see how the method is
used above in :meth:`_SubscriberAPI.list_snapshots`.
:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.
:type sub_pb: :class:`.pubsub_pb2.Snapshot`
:param sub_pb: A subscription returned from the API.
:type topics: dict
:param topics: A dictionary of topics to be used (and modified)
as new subscriptions are created bound to topics.
:rtype: :class:`~google.cloud.pubsub.subscription.Subscription`
:returns: The next subscription in the page.
"""
resource = MessageToDict(snapshot_pb)
return Snapshot.from_api_repr(
resource, iterator.client, topics=topics)
128 changes: 128 additions & 0 deletions pubsub/google/cloud/pubsub/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from google.cloud.pubsub import __version__
from google.cloud.pubsub._helpers import subscription_name_from_path
from google.cloud.pubsub.snapshot import Snapshot
from google.cloud.pubsub.subscription import Subscription
from google.cloud.pubsub.topic import Topic

Expand Down Expand Up @@ -492,6 +493,104 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids,
}
self.api_request(method='POST', path=path, data=data)

def subscription_seek(self, subscription_path, time=None, snapshot=None):
"""API call: seek a subscription
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/seek
:type subscription_path: str
:param subscription_path::
the fully-qualified path of the subscription to affect, in format
``projects/<PROJECT>/subscriptions/<SUB_NAME>``.
:type time: str
:param time: The time to seek to, in RFC 3339 format.
:type snapshot: str
:param snapshot: The snapshot to seek to.
"""
path = '/%s:seek' % (subscription_path,)
data = {}
if time is not None:
data['time'] = time
if snapshot is not None:
data['snapshot'] = snapshot
self.api_request(method='POST', path=path, data=data)

def list_snapshots(self, project, page_size=None, page_token=None):
"""List snapshots for the project associated with this API.
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/list
:type project: str
:param project: project ID
:type page_size: int
:param page_size: maximum number of topics to return, If not passed,
defaults to a value set by the API.
:type page_token: str
:param page_token: opaque marker for the next "page" of topics. If not
passed, the API will return the first page of
topics.
:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of :class:`~google.cloud.pubsub.snapshot.Snapshot`
accessible to the current API.
"""
extra_params = {}
if page_size is not None:
extra_params['pageSize'] = page_size
path = '/projects/%s/snapshots' % (project,)

# We attach a mutable topics dictionary so that as topic
# objects are created by Snapshot.from_api_repr, they
# can be re-used by other snapshots of the same topic.
topics = {}
item_to_value = functools.partial(
_item_to_snapshot_for_client, topics=topics)
return HTTPIterator(
client=self._client, path=path, item_to_value=item_to_value,
items_key='snapshots', page_token=page_token,
extra_params=extra_params)

def snapshot_create(self, snapshot_path, subscription_path):
"""API call: create a snapshot
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/create
:type snapshot_path: str
:param snapshot_path: fully-qualified path of the snapshot, in format
``projects/<PROJECT>/snapshots/<SNAPSHOT_NAME>``.
:type subscription_path: str
:param subscription_path: fully-qualified path of the subscrption that
the new snapshot captures, in format
``projects/<PROJECT>/subscription/<SNAPSHOT_NAME>``.
:rtype: dict
:returns: ``Snapshot`` resource returned from the API.
"""
path = '/%s' % (snapshot_path,)
data = {'subscription': subscription_path}
return self.api_request(method='PUT', path=path, data=data)

def snapshot_delete(self, snapshot_path):
"""API call: delete a topic
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/delete
:type snapshot_path: str
:param snapshot_path: fully-qualified path of the snapshot, in format
``projects/<PROJECT>/snapshots/<SNAPSHOT_NAME>``.
"""
path = '/%s' % (snapshot_path,)
self.api_request(method='DELETE', path=path)


class _IAMPolicyAPI(object):
"""Helper mapping IAM policy-related APIs.
Expand Down Expand Up @@ -652,3 +751,32 @@ def _item_to_sub_for_client(iterator, resource, topics):
"""
return Subscription.from_api_repr(
resource, iterator.client, topics=topics)


def _item_to_snapshot_for_client(iterator, resource, topics):
"""Convert a subscription to the native object.
.. note::
This method does not have the correct signature to be used as
the ``item_to_value`` argument to
:class:`~google.cloud.iterator.Iterator`. It is intended to be
patched with a mutable topics argument that can be updated
on subsequent calls. For an example, see how the method is
used above in :meth:`_SubscriberAPI.list_snapshots`.
:type iterator: :class:`~google.cloud.iterator.Iterator`
:param iterator: The iterator that is currently in use.
:type resource: dict
:param resource: A subscription returned from the API.
:type topics: dict
:param topics: A dictionary of topics to be used (and modified)
as new subscriptions are created bound to topics.
:rtype: :class:`~google.cloud.pubsub.subscription.Subscription`
:returns: The next subscription in the page.
"""
return Snapshot.from_api_repr(
resource, iterator.client, topics=topics)
26 changes: 26 additions & 0 deletions pubsub/google/cloud/pubsub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,32 @@ def list_subscriptions(self, page_size=None, page_token=None):
return api.list_subscriptions(
self.project, page_size, page_token)

def list_snapshots(self, page_size=None, page_token=None):
"""List snapshots for the project associated with this API.
See:
https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/list
:type project: str
:param project: project ID
:type page_size: int
:param page_size: maximum number of topics to return, If not passed,
defaults to a value set by the API.
:type page_token: str
:param page_token: opaque marker for the next "page" of topics. If not
passed, the API will return the first page of
topics.
:rtype: :class:`~google.cloud.iterator.Iterator`
:returns: Iterator of :class:`~google.cloud.pubsub.snapshot.Snapshot`
accessible to the current API.
"""
api = self.subscriber_api
return api.list_snapshots(
self.project, page_size, page_token)

def topic(self, name, timestamp_messages=False):
"""Creates a topic bound to the current client.
Expand Down
Loading

0 comments on commit 5fa507f

Please sign in to comment.