diff --git a/pubsub/google/cloud/pubsub/_gax.py b/pubsub/google/cloud/pubsub/_gax.py index 6b47b42524a3..730192755221 100644 --- a/pubsub/google/cloud/pubsub/_gax.py +++ b/pubsub/google/cloud/pubsub/_gax.py @@ -38,6 +38,7 @@ from google.cloud.iterator import GAXIterator from google.cloud.pubsub import __version__ from google.cloud.pubsub._helpers import subscription_name_from_path +from google.cloud.pubsub.snapshot import Snapshot from google.cloud.pubsub.subscription import Subscription from google.cloud.pubsub.topic import Topic @@ -136,10 +137,10 @@ def topic_delete(self, topic_path): """API call: delete a topic See: - https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.topics/create + https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.topics/delete :type topic_path: str - :param topic_path: fully-qualified path of the new topic, in format + :param topic_path: fully-qualified path of the topic, in format ``projects//topics/``. """ try: @@ -390,7 +391,7 @@ def subscription_modify_push_config(self, subscription_path, :type subscription_path: str :param subscription_path: - the fully-qualified path of the new subscription, in format + the fully-qualified path of the subscription to affect, in format ``projects//subscriptions/``. :type push_endpoint: str @@ -415,8 +416,8 @@ def subscription_pull(self, subscription_path, return_immediately=False, :type subscription_path: str :param subscription_path: - the fully-qualified path of the new subscription, in format - ``projects//subscriptions/``. + the fully-qualified path of the subscription to pull from, in + format ``projects//subscriptions/``. :type return_immediately: bool :param return_immediately: if True, the back-end returns even if no @@ -456,7 +457,7 @@ def subscription_acknowledge(self, subscription_path, ack_ids): :type subscription_path: str :param subscription_path: - the fully-qualified path of the new subscription, in format + the fully-qualified path of the subscription to affect, in format ``projects//subscriptions/``. :type ack_ids: list of string @@ -478,7 +479,7 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids, :type subscription_path: str :param subscription_path: - the fully-qualified path of the new subscription, in format + the fully-qualified path of the subscription to affect, in format ``projects//subscriptions/``. :type ack_ids: list of string @@ -496,6 +497,120 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids, raise NotFound(subscription_path) raise + def subscription_seek(self, subscription_path, time=None, snapshot=None): + """API call: seek a subscription + + See: + https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/seek + + :type subscription_path: str + :param subscription_path:: + the fully-qualified path of the subscription to affect, in format + ``projects//subscriptions/``. + + :type time: :class:`.timestamp_pb2.Timestamp` + :param time: The time to seek to. + + :type snapshot: str + :param snapshot: The snapshot to seek to. + """ + try: + self._gax_api.seek(subscription_path, time=time, snapshot=snapshot) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(subscription_path) + raise + + def list_snapshots(self, project, page_size=0, page_token=None): + """List snapshots for the project associated with this API. + + See: + https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/list + + :type project: str + :param project: project ID + + :type page_size: int + :param page_size: maximum number of topics to return, If not passed, + defaults to a value set by the API. + + :type page_token: str + :param page_token: opaque marker for the next "page" of topics. If not + passed, the API will return the first page of + topics. + + :rtype: :class:`~google.cloud.iterator.Iterator` + :returns: Iterator of :class:`~google.cloud.pubsub.snapshot.Snapshot` + accessible to the current API. + """ + if page_token is None: + page_token = INITIAL_PAGE + options = CallOptions(page_token=page_token) + path = 'projects/%s' % (project,) + page_iter = self._gax_api.list_snapshots( + path, page_size=page_size, options=options) + + # We attach a mutable topics dictionary so that as topic + # objects are created by Snapshot.from_api_repr, they + # can be re-used by other snapshots of the same topic. + topics = {} + item_to_value = functools.partial( + _item_to_snapshot_for_client, topics=topics) + return GAXIterator(self._client, page_iter, item_to_value) + + def snapshot_create(self, snapshot_path, subscription_path): + """API call: create a snapshot + + See: + https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/create + + :type snapshot_path: str + :param snapshot_path: fully-qualified path of the snapshot, in format + ``projects//snapshots/``. + + :type subscription_path: str + :param subscription_path: fully-qualified path of the subscrption that + the new snapshot captures, in format + ``projects//subscription/``. + + :rtype: dict + :returns: ``Snapshot`` resource returned from the API. + :raises: :exc:`google.cloud.exceptions.Conflict` if the snapshot + already exists + :raises: :exc:`google.cloud.exceptions.NotFound` if the subscription + does not exist + """ + try: + snapshot_pb = self._gax_api.create_snapshot( + snapshot_path, subscription_path) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.FAILED_PRECONDITION: + raise Conflict(snapshot_path) + elif exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(subscription_path) + raise + return MessageToDict(snapshot_pb) + + def snapshot_delete(self, snapshot_path): + """API call: delete a topic + + See: + https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/delete + + :type snapshot_path: str + :param snapshot_path: fully-qualified path of the snapshot, in format + ``projects//snapshots/``. + + :raises: :exc:`google.cloud.exceptions.NotFound` if the snapshot does + not exist + """ + try: + self._gax_api.delete_snapshot(snapshot_path) + except GaxError as exc: + if exc_to_code(exc.cause) == StatusCode.NOT_FOUND: + raise NotFound(snapshot_path) + raise + def _message_pb_from_mapping(message): """Helper for :meth:`_PublisherAPI.topic_publish`. @@ -649,3 +764,33 @@ def _item_to_sub_for_client(iterator, sub_pb, topics): resource = MessageToDict(sub_pb) return Subscription.from_api_repr( resource, iterator.client, topics=topics) + + +def _item_to_snapshot_for_client(iterator, snapshot_pb, topics): + """Convert a subscription protobuf to the native object. + + .. note:: + + This method does not have the correct signature to be used as + the ``item_to_value`` argument to + :class:`~google.cloud.iterator.Iterator`. It is intended to be + patched with a mutable topics argument that can be updated + on subsequent calls. For an example, see how the method is + used above in :meth:`_SubscriberAPI.list_snapshots`. + + :type iterator: :class:`~google.cloud.iterator.Iterator` + :param iterator: The iterator that is currently in use. + + :type sub_pb: :class:`.pubsub_pb2.Snapshot` + :param sub_pb: A subscription returned from the API. + + :type topics: dict + :param topics: A dictionary of topics to be used (and modified) + as new subscriptions are created bound to topics. + + :rtype: :class:`~google.cloud.pubsub.subscription.Subscription` + :returns: The next subscription in the page. + """ + resource = MessageToDict(snapshot_pb) + return Snapshot.from_api_repr( + resource, iterator.client, topics=topics) diff --git a/pubsub/google/cloud/pubsub/_http.py b/pubsub/google/cloud/pubsub/_http.py index 47fa7015c60d..0c059df7453a 100644 --- a/pubsub/google/cloud/pubsub/_http.py +++ b/pubsub/google/cloud/pubsub/_http.py @@ -26,6 +26,7 @@ from google.cloud.pubsub import __version__ from google.cloud.pubsub._helpers import subscription_name_from_path +from google.cloud.pubsub.snapshot import Snapshot from google.cloud.pubsub.subscription import Subscription from google.cloud.pubsub.topic import Topic @@ -492,6 +493,104 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids, } self.api_request(method='POST', path=path, data=data) + def subscription_seek(self, subscription_path, time=None, snapshot=None): + """API call: seek a subscription + + See: + https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/seek + + :type subscription_path: str + :param subscription_path:: + the fully-qualified path of the subscription to affect, in format + ``projects//subscriptions/``. + + :type time: str + :param time: The time to seek to, in RFC 3339 format. + + :type snapshot: str + :param snapshot: The snapshot to seek to. + """ + path = '/%s:seek' % (subscription_path,) + data = {} + if time is not None: + data['time'] = time + if snapshot is not None: + data['snapshot'] = snapshot + self.api_request(method='POST', path=path, data=data) + + def list_snapshots(self, project, page_size=None, page_token=None): + """List snapshots for the project associated with this API. + + See: + https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/list + + :type project: str + :param project: project ID + + :type page_size: int + :param page_size: maximum number of topics to return, If not passed, + defaults to a value set by the API. + + :type page_token: str + :param page_token: opaque marker for the next "page" of topics. If not + passed, the API will return the first page of + topics. + + :rtype: :class:`~google.cloud.iterator.Iterator` + :returns: Iterator of :class:`~google.cloud.pubsub.snapshot.Snapshot` + accessible to the current API. + """ + extra_params = {} + if page_size is not None: + extra_params['pageSize'] = page_size + path = '/projects/%s/snapshots' % (project,) + + # We attach a mutable topics dictionary so that as topic + # objects are created by Snapshot.from_api_repr, they + # can be re-used by other snapshots of the same topic. + topics = {} + item_to_value = functools.partial( + _item_to_snapshot_for_client, topics=topics) + return HTTPIterator( + client=self._client, path=path, item_to_value=item_to_value, + items_key='snapshots', page_token=page_token, + extra_params=extra_params) + + def snapshot_create(self, snapshot_path, subscription_path): + """API call: create a snapshot + + See: + https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/create + + :type snapshot_path: str + :param snapshot_path: fully-qualified path of the snapshot, in format + ``projects//snapshots/``. + + :type subscription_path: str + :param subscription_path: fully-qualified path of the subscrption that + the new snapshot captures, in format + ``projects//subscription/``. + + :rtype: dict + :returns: ``Snapshot`` resource returned from the API. + """ + path = '/%s' % (snapshot_path,) + data = {'subscription': subscription_path} + return self.api_request(method='PUT', path=path, data=data) + + def snapshot_delete(self, snapshot_path): + """API call: delete a topic + + See: + https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/delete + + :type snapshot_path: str + :param snapshot_path: fully-qualified path of the snapshot, in format + ``projects//snapshots/``. + """ + path = '/%s' % (snapshot_path,) + self.api_request(method='DELETE', path=path) + class _IAMPolicyAPI(object): """Helper mapping IAM policy-related APIs. @@ -652,3 +751,32 @@ def _item_to_sub_for_client(iterator, resource, topics): """ return Subscription.from_api_repr( resource, iterator.client, topics=topics) + + +def _item_to_snapshot_for_client(iterator, resource, topics): + """Convert a subscription to the native object. + + .. note:: + + This method does not have the correct signature to be used as + the ``item_to_value`` argument to + :class:`~google.cloud.iterator.Iterator`. It is intended to be + patched with a mutable topics argument that can be updated + on subsequent calls. For an example, see how the method is + used above in :meth:`_SubscriberAPI.list_snapshots`. + + :type iterator: :class:`~google.cloud.iterator.Iterator` + :param iterator: The iterator that is currently in use. + + :type resource: dict + :param resource: A subscription returned from the API. + + :type topics: dict + :param topics: A dictionary of topics to be used (and modified) + as new subscriptions are created bound to topics. + + :rtype: :class:`~google.cloud.pubsub.subscription.Subscription` + :returns: The next subscription in the page. + """ + return Snapshot.from_api_repr( + resource, iterator.client, topics=topics) diff --git a/pubsub/google/cloud/pubsub/client.py b/pubsub/google/cloud/pubsub/client.py index 4e5b76bd118c..cccecd27f4c4 100644 --- a/pubsub/google/cloud/pubsub/client.py +++ b/pubsub/google/cloud/pubsub/client.py @@ -191,6 +191,32 @@ def list_subscriptions(self, page_size=None, page_token=None): return api.list_subscriptions( self.project, page_size, page_token) + def list_snapshots(self, page_size=None, page_token=None): + """List snapshots for the project associated with this API. + + See: + https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/list + + :type project: str + :param project: project ID + + :type page_size: int + :param page_size: maximum number of topics to return, If not passed, + defaults to a value set by the API. + + :type page_token: str + :param page_token: opaque marker for the next "page" of topics. If not + passed, the API will return the first page of + topics. + + :rtype: :class:`~google.cloud.iterator.Iterator` + :returns: Iterator of :class:`~google.cloud.pubsub.snapshot.Snapshot` + accessible to the current API. + """ + api = self.subscriber_api + return api.list_snapshots( + self.project, page_size, page_token) + def topic(self, name, timestamp_messages=False): """Creates a topic bound to the current client. diff --git a/pubsub/google/cloud/pubsub/snapshot.py b/pubsub/google/cloud/pubsub/snapshot.py new file mode 100644 index 000000000000..fd9a78376397 --- /dev/null +++ b/pubsub/google/cloud/pubsub/snapshot.py @@ -0,0 +1,141 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Define API Snapshots.""" + +from google.cloud.pubsub._helpers import topic_name_from_path + + +class Snapshot(object): + + _DELETED_TOPIC_PATH = '_deleted-topic_' + """Value of ``projects.snapshots.topic`` when topic has been deleted.""" + + def __init__(self, name, subscription=None, topic=None, client=None): + + num_kwargs = len( + [param for param in (subscription, topic, client) if param]) + if num_kwargs != 1: + raise TypeError( + "Pass only one of 'subscription', 'topic', 'client'.") + + self.name = name + self.topic = topic or getattr(subscription, 'topic', None) + self._subscription = subscription + self._client = client or getattr( + subscription, '_client', None) or topic._client + self._project = self._client.project + + @classmethod + def from_api_repr(cls, resource, client, topics=None): + """Factory: construct a subscription given its API representation + + :type resource: dict + :param resource: snapshot resource representation returned from the + API. + + :type client: :class:`google.cloud.pubsub.client.Client` + :param client: Client which holds credentials and project + configuration. + + :type subscriptions: dict + :param subscriptions: + (Optional) A Subscription to which this snapshot belongs. If not + passed, the subscription will have a newly-created subscription. + Must have the same topic as the snapshot. + + :rtype: :class:`google.cloud.pubsub.subscription.Subscription` + :returns: Subscription parsed from ``resource``. + """ + if topics is None: + topics = {} + topic_path = resource['topic'] + if topic_path == cls._DELETED_TOPIC_PATH: + topic = None + else: + topic = topics.get(topic_path) + if topic is None: + # NOTE: This duplicates behavior from Topic.from_api_repr to + # avoid an import cycle. + topic_name = topic_name_from_path(topic_path, client.project) + topic = topics[topic_path] = client.topic(topic_name) + _, _, _, name = resource['name'].split('/') + if topic is None: + return cls(name, client=client) + return cls(name, topic=topic) + + @property + def project(self): + """Project bound to the subscription.""" + return self._client.project + + @property + def full_name(self): + """Fully-qualified name used in subscription APIs""" + return 'projects/%s/snapshots/%s' % (self.project, self.name) + + @property + def path(self): + """URL path for the subscription's APIs""" + return '/%s' % (self.full_name,) + + def _require_client(self, client): + """Check client or verify over-ride. + + :type client: :class:`~google.cloud.pubsub.client.Client` or + ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the topic of the + current subscription. + + :rtype: :class:`google.cloud.pubsub.client.Client` + :returns: The client passed in or the currently bound client. + """ + if client is None: + client = self._client + return client + + def create(self, client=None): + """API call: create the snapshot + + See: + https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/create + + :type client: :class:`~google.cloud.pubsub.client.Client` or + ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. + """ + if not self._subscription: + raise RuntimeError( + 'Cannot create a snapshot not bound to a subscription') + + client = self._require_client(client) + api = client.subscriber_api + api.snapshot_create(self.full_name, self._subscription.full_name) + + def delete(self, client=None): + """API call: delete the snapshot + + See: + https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.snapshots/delete + + :type client: :class:`~google.cloud.pubsub.client.Client` or + ``NoneType`` + :param client: the client to use. If not passed, falls back to the + ``client`` stored on the current subscription's topic. + """ + client = self._require_client(client) + api = client.subscriber_api + api.snapshot_delete(self.full_name) diff --git a/pubsub/google/cloud/pubsub/subscription.py b/pubsub/google/cloud/pubsub/subscription.py index 100ac13474b6..b597d3526f67 100644 --- a/pubsub/google/cloud/pubsub/subscription.py +++ b/pubsub/google/cloud/pubsub/subscription.py @@ -17,6 +17,8 @@ import datetime from google.cloud.exceptions import NotFound +from google.cloud._helpers import _datetime_to_rfc3339 +from google.cloud.pubsub.snapshot import Snapshot from google.cloud.pubsub._helpers import topic_name_from_path from google.cloud.pubsub.iam import Policy from google.cloud.pubsub.message import Message @@ -410,6 +412,44 @@ def modify_ack_deadline(self, ack_ids, ack_deadline, client=None): api.subscription_modify_ack_deadline( self.full_name, ack_ids, ack_deadline) + def snapshot(self, name, client=None): + """Creates a snapshot of this subscription. + + :type name: str + :param name: the name of the subscription + + :rtype: :class:`Snapshot` + :returns: The snapshot created with the passed in arguments. + """ + return Snapshot(name, subscription=self) + + def seek_snapshot(self, snapshot, client=None): + """API call: seek a subscription to a given snapshot + + See: + https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/seek + + :type snapshot: :class:`Snapshot` + :param snapshot: The snapshot to seek to. + """ + client = self._require_client(client) + api = client.subscriber_api + api.subscription_seek(self.full_name, snapshot=snapshot.full_name) + + def seek_timestamp(self, timestamp, client=None): + """API call: seek a subscription to a given point in time + + See: + https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/seek + + :type time: :class:`datetime.datetime` + :param time: The time to seek to. + """ + client = self._require_client(client) + timestamp = _datetime_to_rfc3339(timestamp) + api = client.subscriber_api + api.subscription_seek(self.full_name, time=timestamp) + def get_iam_policy(self, client=None): """Fetch the IAM policy for the subscription. diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index 5327d0418114..41ae43f1f55d 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -71,6 +71,18 @@ def _consume_topics(pubsub_client): return list(pubsub_client.list_topics()) +def _consume_snapshots(pubsub_client): + """Consume entire iterator. + + :type pubsub_client: :class:`~google.cloud.pubsub.client.Client` + :param pubsub_client: Client to use to retrieve snapshots. + + :rtype: list + :returns: List of all snapshots encountered. + """ + return list(pubsub_client.list_snapshots()) + + def _consume_subscriptions(topic): """Consume entire iterator. @@ -309,5 +321,72 @@ def test_subscription_iam_policy(self): new_policy = subscription.set_iam_policy(policy) self.assertEqual(new_policy.viewers, policy.viewers) - # TODO(geigerj): set retain_acked_messages=True in snapshot system test once - # PR #3303 is merged + def test_create_snapshot(self): + TOPIC_NAME = 'create-snap-def' + unique_resource_id('-') + topic = Config.CLIENT.topic(TOPIC_NAME) + before_snapshots = _consume_snapshots(Config.CLIENT) + + self.assertFalse(topic.exists()) + topic.create() + self.to_delete.append(topic) + SUBSCRIPTION_NAME = 'subscribing-now' + unique_resource_id('-') + subscription = topic.subscription(SUBSCRIPTION_NAME, ack_deadline=600) + self.assertFalse(subscription.exists()) + subscription.create() + self.to_delete.append(subscription) + SNAPSHOT_NAME = 'new-snapshot' + unique_resource_id('-') + snapshot = subscription.snapshot(SNAPSHOT_NAME) + snapshot.create() + self.to_delete.append(snapshot) + + # There is no GET method for snapshot, so check existence using + # list + after_snapshots = _consume_snapshots(Config.CLIENT) + self.assertEqual(len(before_snapshots) + 1, len(after_snapshots)) + + def full_name(obj): + return obj.full_name + + self.assertIn(snapshot.full_name, map(full_name, after_snapshots)) + self.assertNotIn(snapshot.full_name, map(full_name, before_snapshots)) + + + def test_seek(self): + TOPIC_NAME = 'seek-e2e' + unique_resource_id('-') + topic = Config.CLIENT.topic(TOPIC_NAME, + timestamp_messages=True) + self.assertFalse(topic.exists()) + topic.create() + self.to_delete.append(topic) + + SUBSCRIPTION_NAME = 'subscribing-to-seek' + unique_resource_id('-') + subscription = topic.subscription(SUBSCRIPTION_NAME) + self.assertFalse(subscription.exists()) + subscription.create() + self.to_delete.append(subscription) + + SNAPSHOT_NAME = 'new-snapshot' + unique_resource_id('-') + snapshot = subscription.snapshot(SNAPSHOT_NAME) + snapshot.create() + self.to_delete.append(snapshot) + + MESSAGE_1 = b'MESSAGE ONE' + topic.publish(MESSAGE_1) + MESSAGE_2 = b'MESSAGE TWO' + topic.publish(MESSAGE_2) + + ((ack_id_1a, recvd_1a), ) = subscription.pull() + ((ack_id_2a, recvd_2a), ) = subscription.pull() + before_data = [obj.data for obj in (recvd_1a, recvd_2a)] + self.assertIn(MESSAGE_1, before_data) + self.assertIn(MESSAGE_2, before_data) + subscription.acknowledge((ack_id_1a, ack_id_2a)) + + self.assertFalse(subscription.pull(return_immediately=True)) + + subscription.seek_snapshot(snapshot) + + ((_, recvd_1b), ) = subscription.pull() + ((_, recvd_2b), ) = subscription.pull() + after_data = [obj.data for obj in (recvd_1b, recvd_2b)] + self.assertEqual(sorted(before_data), sorted(after_data)) diff --git a/pubsub/tests/unit/test__gax.py b/pubsub/tests/unit/test__gax.py index 2da629e92bc8..a93d22f95a53 100644 --- a/pubsub/tests/unit/test__gax.py +++ b/pubsub/tests/unit/test__gax.py @@ -45,6 +45,10 @@ class _Base(object): LIST_TOPIC_SUBSCRIPTIONS_PATH = '%s/subscriptions' % (TOPIC_PATH,) SUB_NAME = 'sub_name' SUB_PATH = '%s/subscriptions/%s' % (TOPIC_PATH, SUB_NAME) + SNAPSHOT_NAME = 'snapshot_name' + SNAPSHOT_PATH = '%s/snapshots/%s' % (PROJECT_PATH, SNAPSHOT_NAME) + TIME = 12345 + def _make_one(self, *args, **kw): return self._get_target_class()(*args, **kw) @@ -972,6 +976,238 @@ def test_subscription_modify_ack_deadline_error(self): self.assertEqual(deadline, NEW_DEADLINE) self.assertIsNone(options) + def test_list_snapshots_no_paging(self): + from google.gax import INITIAL_PAGE + from google.cloud.proto.pubsub.v1.pubsub_pb2 import ( + Snapshot as SnapshotPB) + from google.cloud._testing import _GAXPageIterator + from google.cloud.pubsub.client import Client + from google.cloud.pubsub.snapshot import Snapshot + from google.cloud.pubsub.topic import Topic + + local_snapshot_path = '%s/snapshots/%s' % ( + self.PROJECT_PATH, self.SNAPSHOT_NAME) + snapshot_pb = SnapshotPB( + name=local_snapshot_path, topic=self.TOPIC_PATH) + response = _GAXPageIterator([snapshot_pb]) + gax_api = _GAXSubscriberAPI(_list_snapshots_response=response) + creds = _make_credentials() + client = Client(project=self.PROJECT, credentials=creds) + api = self._make_one(gax_api, client) + + iterator = api.list_snapshots(self.PROJECT) + snapshots = list(iterator) + next_token = iterator.next_page_token + + # Check the token returned. + self.assertIsNone(next_token) + # Check the snapshot object returned. + self.assertEqual(len(snapshots), 1) + snapshot = snapshots[0] + self.assertIsInstance(snapshot, Snapshot) + self.assertEqual(snapshot.name, self.SNAPSHOT_NAME) + self.assertIsInstance(snapshot.topic, Topic) + self.assertEqual(snapshot.topic.name, self.TOPIC_NAME) + self.assertIs(snapshot._client, client) + self.assertEqual(snapshot._project, self.PROJECT) + + def test_list_snapshots_with_paging(self): + from google.cloud.proto.pubsub.v1.pubsub_pb2 import ( + Snapshot as SnapshotPB) + from google.cloud._testing import _GAXPageIterator + from google.cloud.pubsub.client import Client + from google.cloud.pubsub.snapshot import Snapshot + from google.cloud.pubsub.topic import Topic + + SIZE = 23 + TOKEN = 'TOKEN' + NEW_TOKEN = 'NEW_TOKEN' + local_snapshot_path = '%s/snapshots/%s' % ( + self.PROJECT_PATH, self.SNAPSHOT_NAME) + snapshot_pb = SnapshotPB(name=local_snapshot_path, topic=self.TOPIC_PATH) + response = _GAXPageIterator([snapshot_pb], page_token=NEW_TOKEN) + gax_api = _GAXSubscriberAPI(_list_snapshots_response=response) + client = _Client(self.PROJECT) + creds = _make_credentials() + client = Client(project=self.PROJECT, credentials=creds) + api = self._make_one(gax_api, client) + + iterator = api.list_snapshots( + self.PROJECT, page_size=SIZE, page_token=TOKEN) + snapshots = list(iterator) + next_token = iterator.next_page_token + + # Check the token returned. + self.assertEqual(next_token, NEW_TOKEN) + # Check the snapshot object returned. + self.assertEqual(len(snapshots), 1) + snapshot = snapshots[0] + self.assertIsInstance(snapshot, Snapshot) + self.assertEqual(snapshot.name, self.SNAPSHOT_NAME) + self.assertIsInstance(snapshot.topic, Topic) + self.assertEqual(snapshot.topic.name, self.TOPIC_NAME) + self.assertIs(snapshot._client, client) + self.assertEqual(snapshot._project, self.PROJECT) + + def test_subscription_seek_hit(self): + gax_api = _GAXSubscriberAPI(_seek_ok=True) + client = _Client(self.PROJECT) + api = self._make_one(gax_api, client) + + api.subscription_seek( + self.SUB_PATH, time=self.TIME, snapshot=self.SNAPSHOT_PATH) + + subscription_path, time, snapshot_path, options = ( + gax_api._seek_called_with) + self.assertEqual(subscription_path, self.SUB_PATH) + self.assertEqual(time, self.TIME) + self.assertEqual(snapshot_path, self.SNAPSHOT_PATH) + self.assertIsNone(options) + + def test_subscription_seek_miss(self): + from google.cloud.exceptions import NotFound + + gax_api = _GAXSubscriberAPI(_seek_ok=False) + client = _Client(self.PROJECT) + api = self._make_one(gax_api, client) + + with self.assertRaises(NotFound): + api.subscription_seek( + self.SUB_PATH, time=self.TIME, snapshot=self.SNAPSHOT_PATH) + + subscription_path, time, snapshot_path, options = ( + gax_api._seek_called_with) + self.assertEqual(subscription_path, self.SUB_PATH) + self.assertEqual(time, self.TIME) + self.assertEqual(snapshot_path, self.SNAPSHOT_PATH) + self.assertIsNone(options) + + def test_subscription_seek_error(self): + from google.gax.errors import GaxError + + gax_api = _GAXSubscriberAPI(_random_gax_error=True) + client = _Client(self.PROJECT) + api = self._make_one(gax_api, client) + + with self.assertRaises(GaxError): + api.subscription_seek( + self.SUB_PATH, time=self.TIME, snapshot=self.SNAPSHOT_PATH) + + subscription_path, time, snapshot_path, options = ( + gax_api._seek_called_with) + self.assertEqual(subscription_path, self.SUB_PATH) + self.assertEqual(time, self.TIME) + self.assertEqual(snapshot_path, self.SNAPSHOT_PATH) + self.assertIsNone(options) + + def test_snapshot_create(self): + from google.cloud.proto.pubsub.v1.pubsub_pb2 import Snapshot + + snapshot_pb = Snapshot(name=self.SNAPSHOT_PATH, topic=self.TOPIC_PATH) + gax_api = _GAXSubscriberAPI(_create_snapshot_response=snapshot_pb) + client = _Client(self.PROJECT) + api = self._make_one(gax_api, client) + + resource = api.snapshot_create(self.SNAPSHOT_PATH, self.SUB_PATH) + + expected = { + 'name': self.SNAPSHOT_PATH, + 'topic': self.TOPIC_PATH, + } + self.assertEqual(resource, expected) + name, subscription, options = ( + gax_api._create_snapshot_called_with) + self.assertEqual(name, self.SNAPSHOT_PATH) + self.assertEqual(subscription, self.SUB_PATH) + self.assertIsNone(options) + + def test_snapshot_create_already_exists(self): + from google.cloud.exceptions import Conflict + + gax_api = _GAXSubscriberAPI(_create_snapshot_conflict=True) + client = _Client(self.PROJECT) + api = self._make_one(gax_api, client) + + with self.assertRaises(Conflict): + api.snapshot_create(self.SNAPSHOT_PATH, self.SUB_PATH) + + name, subscription, options = ( + gax_api._create_snapshot_called_with) + self.assertEqual(name, self.SNAPSHOT_PATH) + self.assertEqual(subscription, self.SUB_PATH) + self.assertIsNone(options) + + def test_snapshot_create_subscrption_miss(self): + from google.cloud.exceptions import NotFound + + gax_api = _GAXSubscriberAPI(_snapshot_create_subscription_miss=True) + client = _Client(self.PROJECT) + api = self._make_one(gax_api, client) + + with self.assertRaises(NotFound): + api.snapshot_create(self.SNAPSHOT_PATH, self.SUB_PATH) + + name, subscription, options = ( + gax_api._create_snapshot_called_with) + self.assertEqual(name, self.SNAPSHOT_PATH) + self.assertEqual(subscription, self.SUB_PATH) + self.assertIsNone(options) + + def test_snapshot_create_error(self): + from google.gax.errors import GaxError + + gax_api = _GAXSubscriberAPI(_random_gax_error=True) + client = _Client(self.PROJECT) + api = self._make_one(gax_api, client) + + with self.assertRaises(GaxError): + api.snapshot_create(self.SNAPSHOT_PATH, self.SUB_PATH) + + name, subscription, options = ( + gax_api._create_snapshot_called_with) + self.assertEqual(name, self.SNAPSHOT_PATH) + self.assertEqual(subscription, self.SUB_PATH) + self.assertIsNone(options) + + def test_snapshot_delete_hit(self): + gax_api = _GAXSubscriberAPI(_delete_snapshot_ok=True) + client = _Client(self.PROJECT) + api = self._make_one(gax_api, client) + + api.snapshot_delete(self.SNAPSHOT_PATH) + + snapshot_path, options = gax_api._delete_snapshot_called_with + self.assertEqual(snapshot_path, self.SNAPSHOT_PATH) + self.assertIsNone(options) + + def test_snapshot_delete_miss(self): + from google.cloud.exceptions import NotFound + + gax_api = _GAXSubscriberAPI(_delete_snapshot_ok=False) + client = _Client(self.PROJECT) + api = self._make_one(gax_api, client) + + with self.assertRaises(NotFound): + api.snapshot_delete(self.SNAPSHOT_PATH) + + snapshot_path, options = gax_api._delete_snapshot_called_with + self.assertEqual(snapshot_path, self.SNAPSHOT_PATH) + self.assertIsNone(options) + + def test_snapshot_delete_error(self): + from google.gax.errors import GaxError + + gax_api = _GAXSubscriberAPI(_random_gax_error=True) + client = _Client(self.PROJECT) + api = self._make_one(gax_api, client) + + with self.assertRaises(GaxError): + api.snapshot_delete(self.SNAPSHOT_PATH) + + snapshot_path, options = gax_api._delete_snapshot_called_with + self.assertEqual(snapshot_path, self.SNAPSHOT_PATH) + self.assertIsNone(options) + @unittest.skipUnless(_HAVE_GRPC, 'No gax-python') class Test_make_gax_publisher_api(_Base, unittest.TestCase): @@ -1196,11 +1432,13 @@ def list_topic_subscriptions(self, topic, page_size, options=None): class _GAXSubscriberAPI(_GAXBaseAPI): + _create_snapshot_conflict = False _create_subscription_conflict = False _modify_push_config_ok = False _acknowledge_ok = False _modify_ack_deadline_ok = False _deadline_exceeded_gax_error = False + _snapshot_create_subscription_miss=False def list_subscriptions(self, project, page_size, options=None): self._list_subscriptions_called_with = (project, page_size, options) @@ -1285,6 +1523,40 @@ def modify_ack_deadline(self, name, ack_ids, deadline, options=None): if not self._modify_ack_deadline_ok: raise GaxError('miss', self._make_grpc_not_found()) + def list_snapshots(self, project, page_size, options=None): + self._list_snapshots_called_with = (project, page_size, options) + return self._list_snapshots_response + + def create_snapshot(self, name, subscription, options=None): + from google.gax.errors import GaxError + + self._create_snapshot_called_with = (name, subscription, options) + if self._random_gax_error: + raise GaxError('error') + if self._create_snapshot_conflict: + raise GaxError('conflict', self._make_grpc_failed_precondition()) + if self._snapshot_create_subscription_miss: + raise GaxError('miss', self._make_grpc_not_found()) + + return self._create_snapshot_response + + def delete_snapshot(self, snapshot, options=None): + from google.gax.errors import GaxError + + self._delete_snapshot_called_with = (snapshot, options) + if self._random_gax_error: + raise GaxError('error') + if not self._delete_snapshot_ok: + raise GaxError('miss', self._make_grpc_not_found()) + + def seek(self, subscription, time=None, snapshot=None, options=None): + from google.gax.errors import GaxError + + self._seek_called_with = (subscription, time, snapshot, options) + if self._random_gax_error: + raise GaxError('error') + if not self._seek_ok: + raise GaxError('miss', self._make_grpc_not_found()) class _TopicPB(object): diff --git a/pubsub/tests/unit/test__http.py b/pubsub/tests/unit/test__http.py index 8d6d6f05fcf6..2dc14f789ed1 100644 --- a/pubsub/tests/unit/test__http.py +++ b/pubsub/tests/unit/test__http.py @@ -26,10 +26,13 @@ def _make_credentials(): class _Base(unittest.TestCase): PROJECT = 'PROJECT' LIST_TOPICS_PATH = 'projects/%s/topics' % (PROJECT,) + LIST_SNAPSHOTS_PATH = 'projects/%s/snapshots' % (PROJECT,) LIST_SUBSCRIPTIONS_PATH = 'projects/%s/subscriptions' % (PROJECT,) TOPIC_NAME = 'topic_name' TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) LIST_TOPIC_SUBSCRIPTIONS_PATH = '%s/subscriptions' % (TOPIC_PATH,) + SNAPSHOT_NAME = 'snapshot_name' + SNAPSHOT_PATH = 'projects/%s/snapshots/%s' % (PROJECT, SNAPSHOT_NAME) SUB_NAME = 'subscription_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) @@ -806,6 +809,174 @@ def test_subscription_modify_ack_deadline(self): self.assertEqual(connection._called_with['path'], path) self.assertEqual(connection._called_with['data'], BODY) + def test_list_snapshots_no_paging(self): + from google.cloud.pubsub.client import Client + from google.cloud.pubsub.snapshot import Snapshot + + local_snapshot_path = 'projects/%s/snapshots/%s' % ( + self.PROJECT, self.SNAPSHOT_NAME) + local_topic_path = 'projects/%s/topics/%s' % ( + self.PROJECT, self.TOPIC_NAME) + RETURNED = {'snapshots': [{ + 'name': local_snapshot_path, + 'topic': local_topic_path, + }], + } + + connection = _Connection(RETURNED) + creds = _make_credentials() + client = Client(project=self.PROJECT, credentials=creds) + client._connection = connection + api = self._make_one(client) + + iterator = api.list_snapshots(self.PROJECT) + snapshots = list(iterator) + next_token = iterator.next_page_token + + self.assertIsNone(next_token) + self.assertEqual(len(snapshots), 1) + snapshot = snapshots[0] + self.assertIsInstance(snapshot, Snapshot) + self.assertEqual(snapshot.topic.name, self.TOPIC_NAME) + self.assertIs(snapshot._client, client) + + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.LIST_SNAPSHOTS_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['query_params'], {}) + + def test_list_snapshots_with_paging(self): + import six + + from google.cloud.pubsub.client import Client + from google.cloud.pubsub.snapshot import Snapshot + + TOKEN1 = 'TOKEN1' + TOKEN2 = 'TOKEN2' + SIZE = 1 + local_snapshot_path = 'projects/%s/snapshots/%s' % ( + self.PROJECT, self.SNAPSHOT_NAME) + local_topic_path = 'projects/%s/topics/%s' % ( + self.PROJECT, self.TOPIC_NAME) + RETURNED = { + 'snapshots': [{ + 'name': local_snapshot_path, + 'topic': local_topic_path, + }], + 'nextPageToken': TOKEN2, + } + + connection = _Connection(RETURNED) + creds = _make_credentials() + client = Client(project=self.PROJECT, credentials=creds) + client._connection = connection + api = self._make_one(client) + + iterator = api.list_snapshots( + self.PROJECT, page_token=TOKEN1, page_size=SIZE) + page = six.next(iterator.pages) + snapshots = list(page) + next_token = iterator.next_page_token + + self.assertEqual(next_token, TOKEN2) + self.assertEqual(len(snapshots), 1) + snapshot = snapshots[0] + self.assertIsInstance(snapshot, Snapshot) + self.assertEqual(snapshot.topic.name, self.TOPIC_NAME) + self.assertIs(snapshot._client, client) + + self.assertEqual(connection._called_with['method'], 'GET') + path = '/%s' % (self.LIST_SNAPSHOTS_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['query_params'], + {'pageToken': TOKEN1, 'pageSize': SIZE}) + + def test_subscription_seek_snapshot(self): + local_snapshot_path = 'projects/%s/snapshots/%s' % ( + self.PROJECT, self.SNAPSHOT_NAME) + RETURNED = {} + BODY = { + 'snapshot': local_snapshot_path + } + connection = _Connection(RETURNED) + client = _Client(connection, self.PROJECT) + api = self._make_one(client) + + api.subscription_seek( + self.SUB_PATH, snapshot=local_snapshot_path) + + self.assertEqual(connection._called_with['method'], 'POST') + path = '/%s:seek' % (self.SUB_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], BODY) + + def test_subscription_seek_time(self): + time = '12345' + RETURNED = {} + BODY = { + 'time': time + } + connection = _Connection(RETURNED) + client = _Client(connection, self.PROJECT) + api = self._make_one(client) + + api.subscription_seek(self.SUB_PATH, time=time) + + self.assertEqual(connection._called_with['method'], 'POST') + path = '/%s:seek' % (self.SUB_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], BODY) + + def test_snapshot_create(self): + RETURNED = { + 'name': self.SNAPSHOT_PATH, + 'subscription': self.SUB_PATH + } + BODY = { + 'subscription': self.SUB_PATH + } + connection = _Connection(RETURNED) + client = _Client(connection, self.PROJECT) + api = self._make_one(client) + + resource = api.snapshot_create(self.SNAPSHOT_PATH, self.SUB_PATH) + + self.assertEqual(resource, RETURNED) + self.assertEqual(connection._called_with['method'], 'PUT') + path = '/%s' % (self.SNAPSHOT_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], BODY) + + def test_snapshot_create_already_exists(self): + from google.cloud.exceptions import NotFound + + BODY = { + 'subscription': self.SUB_PATH + } + connection = _Connection() + client = _Client(connection, self.PROJECT) + api = self._make_one(client) + + with self.assertRaises(NotFound): + resource = api.snapshot_create(self.SNAPSHOT_PATH, self.SUB_PATH) + + self.assertEqual(connection._called_with['method'], 'PUT') + path = '/%s' % (self.SNAPSHOT_PATH,) + self.assertEqual(connection._called_with['path'], path) + self.assertEqual(connection._called_with['data'], BODY) + + def test_snapshot_delete(self): + RETURNED = {} + connection = _Connection(RETURNED) + client = _Client(connection, self.PROJECT) + api = self._make_one(client) + + api.snapshot_delete(self.SNAPSHOT_PATH) + + self.assertEqual(connection._called_with['method'], 'DELETE') + path = '/%s' % (self.SNAPSHOT_PATH,) + self.assertEqual(connection._called_with['path'], path) + class Test_IAMPolicyAPI(_Base): diff --git a/pubsub/tests/unit/test_client.py b/pubsub/tests/unit/test_client.py index d3aa25378b38..e251a0632dc8 100644 --- a/pubsub/tests/unit/test_client.py +++ b/pubsub/tests/unit/test_client.py @@ -379,7 +379,17 @@ def test_topic(self): 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME)) self.assertFalse(new_topic.timestamp_messages) + def test_list_snapshots(self): + creds = _make_credentials() + client = self._make_one(project=self.PROJECT, credentials=creds) + client._connection = object() + api = _FauxSubscriberAPI() + response = api._list_snapshots_response = object() + client._subscriber_api = api + self.assertEqual(client.list_snapshots(), response) + self.assertEqual(api._listed_snapshots, (self.PROJECT, None, None)) + class _Iterator(object): def __init__(self, items, token): @@ -407,6 +417,9 @@ def list_subscriptions(self, project, page_size, page_token): self._listed_subscriptions = (project, page_size, page_token) return self._list_subscriptions_response + def list_snapshots(self, project, page_size, page_token): + self._listed_snapshots = (project, page_size, page_token) + return self._list_snapshots_response class _Connection(object): diff --git a/pubsub/tests/unit/test_snpashot.py b/pubsub/tests/unit/test_snpashot.py new file mode 100644 index 000000000000..5834a1fedd89 --- /dev/null +++ b/pubsub/tests/unit/test_snpashot.py @@ -0,0 +1,215 @@ +# Copyright 2017 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import mock + + +class TestSnapshot(unittest.TestCase): + PROJECT = 'PROJECT' + SNAPSHOT_NAME = 'snapshot_name' + SNAPSHOT_PATH = 'projects/%s/snapshots/%s' % (PROJECT, SNAPSHOT_NAME) + SUB_NAME = 'subscription_name' + SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) + TOPIC_NAME = 'topic_name' + TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + + @staticmethod + def _get_target_class(): + from google.cloud.pubsub.snapshot import Snapshot + + return Snapshot + + def _make_one(self, *args, **kw): + return self._get_target_class()(*args, **kw) + + def test_ctor(self): + client = _Client(project=self.PROJECT) + snapshot = self._make_one(self.SNAPSHOT_NAME, + client=client) + self.assertEqual(snapshot.name, self.SNAPSHOT_NAME) + self.assertEqual(snapshot.project, self.PROJECT) + self.assertEqual(snapshot.full_name, self.SNAPSHOT_PATH) + self.assertEqual(snapshot.path, '/%s' % (self.SNAPSHOT_PATH, )) + + def test_ctor_w_subscription(self): + client = _Client(project=self.PROJECT) + subscription = _Subscription(name=self.SUB_NAME, client=client) + snapshot = self._make_one(self.SNAPSHOT_NAME, + subscription=subscription) + self.assertEqual(snapshot.name, self.SNAPSHOT_NAME) + self.assertEqual(snapshot.project, self.PROJECT) + self.assertEqual(snapshot.full_name, self.SNAPSHOT_PATH) + self.assertEqual(snapshot.path, '/%s' % (self.SNAPSHOT_PATH, )) + + def test_ctor_error(self): + client = _Client(project=self.PROJECT) + subscription = _Subscription(name=self.SUB_NAME, client=client) + with self.assertRaises(TypeError): + snapshot = self._make_one(self.SNAPSHOT_NAME, + client=client, + subscription=subscription) + + def test_from_api_repr_no_topics(self): + from google.cloud.pubsub.topic import Topic + + client = _Client(project=self.PROJECT) + resource = { + 'name': self.SNAPSHOT_PATH, + 'topic': self.TOPIC_PATH + } + klass = self._get_target_class() + snapshot = klass.from_api_repr(resource, client=client) + self.assertEqual(snapshot.name, self.SNAPSHOT_NAME) + self.assertIs(snapshot._client, client) + self.assertEqual(snapshot.project, self.PROJECT) + self.assertEqual(snapshot.full_name, self.SNAPSHOT_PATH) + self.assertIsInstance(snapshot.topic, Topic) + + def test_from_api_repr_w_deleted_topic(self): + client = _Client(project=self.PROJECT) + klass = self._get_target_class() + resource = { + 'name': self.SNAPSHOT_PATH, + 'topic': klass._DELETED_TOPIC_PATH + } + snapshot = klass.from_api_repr(resource, client=client) + self.assertEqual(snapshot.name, self.SNAPSHOT_NAME) + self.assertIs(snapshot._client, client) + self.assertEqual(snapshot.project, self.PROJECT) + self.assertEqual(snapshot.full_name, self.SNAPSHOT_PATH) + self.assertIsNone(snapshot.topic) + + def test_from_api_repr_w_topics_w_no_topic_match(self): + from google.cloud.pubsub.topic import Topic + + client = _Client(project=self.PROJECT) + klass = self._get_target_class() + resource = { + 'name': self.SNAPSHOT_PATH, + 'topic': self.TOPIC_PATH + } + topics = {} + snapshot = klass.from_api_repr(resource, client=client, topics=topics) + topic = snapshot.topic + self.assertIsInstance(topic, Topic) + self.assertIs(topic, topics[self.TOPIC_PATH]) + self.assertEqual(topic.name, self.TOPIC_NAME) + self.assertEqual(topic.project, self.PROJECT) + + def test_from_api_repr_w_topics_w_topic_match(self): + from google.cloud.pubsub.topic import Topic + + client = _Client(project=self.PROJECT) + klass = self._get_target_class() + resource = { + 'name': self.SNAPSHOT_PATH, + 'topic': self.TOPIC_PATH + } + topic = _Topic(self.TOPIC_NAME, client=client) + topics = {self.TOPIC_PATH: topic} + snapshot = klass.from_api_repr(resource, client=client, topics=topics) + self.assertIs(snapshot.topic, topic) + + def test_create_w_bound_client_error(self): + client = _Client(project=self.PROJECT) + api = client.subscriber_api = _FauxSubscriberAPI() + expected_response = api._snapshot_create_response = object() + snapshot = self._make_one(self.SNAPSHOT_NAME, client=client) + + with self.assertRaises(RuntimeError): + snapshot.create() + + def test_create_w_bound_subscription(self): + client = _Client(project=self.PROJECT) + api = client.subscriber_api = _FauxSubscriberAPI() + expected_result = api._snapshot_create_response = object() + subscription = _Subscription(name=self.SUB_NAME, client=client) + snapshot = self._make_one(self.SNAPSHOT_NAME, subscription=subscription) + + snapshot.create() + + self.assertEqual(api._snapshot_created, (self.SNAPSHOT_PATH, self.SUB_PATH, )) + + def test_create_w_bound_subscription_w_alternate_client(self): + client = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.subscriber_api = _FauxSubscriberAPI() + expected_result = api._snapshot_create_response = object() + subscription = _Subscription(name=self.SUB_NAME, client=client) + snapshot = self._make_one(self.SNAPSHOT_NAME, subscription=subscription) + + snapshot.create(client=client2) + + self.assertEqual(api._snapshot_created, (self.SNAPSHOT_PATH, self.SUB_PATH, )) + + def test_delete_w_bound_client(self): + client = _Client(project=self.PROJECT) + api = client.subscriber_api = _FauxSubscriberAPI() + expected_result = api._snapshot_create_response = object() + snapshot = self._make_one(self.SNAPSHOT_NAME, client=client) + + snapshot.delete() + + self.assertEqual(api._snapshot_deleted, (self.SNAPSHOT_PATH, )) + + def test_delete_w_alternate_client(self): + client = _Client(project=self.PROJECT) + api = client.subscriber_api = _FauxSubscriberAPI() + expected_result = api._snapshot_create_response = object() + subscription = _Subscription(name=self.SUB_NAME, client=client) + snapshot = self._make_one(self.SNAPSHOT_NAME, subscription=subscription) + + snapshot.delete() + + self.assertEqual(api._snapshot_deleted, (self.SNAPSHOT_PATH, )) + + +class _Client(object): + + connection = None + + def __init__(self, project): + self.project = project + + def topic(self, name): + from google.cloud.pubsub.topic import Topic + + return Topic(name, client=self) + + +class _Topic(object): + + def __init__(self, name, client): + self._client = client + + +class _Subscription(object): + + def __init__(self, name, client=None): + self._client = client + self.full_name = 'projects/%s/subscriptions/%s' % ( + client.project, name, ) + + +class _FauxSubscriberAPI(object): + + def snapshot_create(self, snapshot_path, subscription_path): + self._snapshot_created = (snapshot_path, subscription_path, ) + + def snapshot_delete(self, snapshot_path): + self._snapshot_deleted = (snapshot_path, ) + + diff --git a/pubsub/tests/unit/test_subscription.py b/pubsub/tests/unit/test_subscription.py index feebf069915d..c845d601dfca 100644 --- a/pubsub/tests/unit/test_subscription.py +++ b/pubsub/tests/unit/test_subscription.py @@ -21,6 +21,8 @@ class TestSubscription(unittest.TestCase): PROJECT = 'PROJECT' TOPIC_NAME = 'topic_name' TOPIC_PATH = 'projects/%s/topics/%s' % (PROJECT, TOPIC_NAME) + SNAPSHOT_NAME = 'snapshot_name' + SNAPSHOT_PATH = 'projects/%s/snapshots/%s' % (PROJECT, SNAPSHOT_NAME) SUB_NAME = 'sub_name' SUB_PATH = 'projects/%s/subscriptions/%s' % (PROJECT, SUB_NAME) DEADLINE = 42 @@ -458,6 +460,87 @@ def test_modify_ack_deadline_w_alternate_client(self): self.assertEqual(api._subscription_modified_ack_deadline, (self.SUB_PATH, [ACK_ID1, ACK_ID2], self.DEADLINE)) + def test_snapshot(self): + from google.cloud.pubsub.snapshot import Snapshot + + client = _Client(project=self.PROJECT) + topic = _Topic(self.TOPIC_NAME, client=client) + subscription = self._make_one(self.SUB_NAME, topic) + + snapshot = subscription.snapshot(self.SNAPSHOT_NAME) + self.assertIsInstance(snapshot, Snapshot) + self.assertEqual(snapshot.name, self.SNAPSHOT_NAME) + self.assertIs(snapshot.topic, topic) + + def test_seek_snapshot_w_bound_client(self): + from google.cloud.pubsub.snapshot import Snapshot + + client = _Client(project=self.PROJECT) + snapshot = Snapshot + snapshot = Snapshot(self.SNAPSHOT_NAME, client=client) + api = client.subscriber_api = _FauxSubscribererAPI() + api._subscription_seek_response = {} + topic = _Topic(self.TOPIC_NAME, client=client) + subscription = self._make_one(self.SUB_NAME, topic) + + subscription.seek_snapshot(snapshot) + + self.assertEqual(api._subscription_seeked, + (self.SUB_PATH, None, self.SNAPSHOT_PATH)) + + def test_seek_snapshot_w_alternate_client(self): + from google.cloud.pubsub.snapshot import Snapshot + + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + snapshot = Snapshot(self.SNAPSHOT_NAME, client=client1) + api = client2.subscriber_api = _FauxSubscribererAPI() + api._subscription_seek_response = {} + topic = _Topic(self.TOPIC_NAME, client=client1) + subscription = self._make_one(self.SUB_NAME, topic) + + subscription.seek_snapshot(snapshot, client=client2) + + self.assertEqual(api._subscription_seeked, + (self.SUB_PATH, None, self.SNAPSHOT_PATH)) + + def test_seek_time_w_bound_client(self): + import datetime + + from google.cloud import _helpers + + time = datetime.time() + client = _Client(project=self.PROJECT) + api = client.subscriber_api = _FauxSubscribererAPI() + api._subscription_seek_response = {} + topic = _Topic(self.TOPIC_NAME, client=client) + subscription = self._make_one(self.SUB_NAME, topic) + + subscription.seek_timestamp(time) + + self.assertEqual( + api._subscription_seeked, + (self.SUB_PATH, _helpers._datetime_to_rfc3339(time), None)) + + def test_seek_time_w_alternate_client(self): + import datetime + + from google.cloud import _helpers + + time = datetime.time() + client1 = _Client(project=self.PROJECT) + client2 = _Client(project=self.PROJECT) + api = client2.subscriber_api = _FauxSubscribererAPI() + api._subscription_seek_response = {} + topic = _Topic(self.TOPIC_NAME, client=client1) + subscription = self._make_one(self.SUB_NAME, topic) + + subscription.seek_timestamp(time, client=client2) + + self.assertEqual( + api._subscription_seeked, + (self.SUB_PATH, _helpers._datetime_to_rfc3339(time), None)) + def test_get_iam_policy_w_bound_client(self): from google.cloud.pubsub.iam import ( PUBSUB_ADMIN_ROLE, @@ -693,6 +776,11 @@ def subscription_modify_ack_deadline(self, subscription_path, ack_ids, subscription_path, ack_ids, ack_deadline) return self._subscription_modify_ack_deadline_response + def subscription_seek(self, subscription_path, time=None, snapshot=None): + self._subscription_seeked = ( + subscription_path, time, snapshot) + return self._subscription_seek_response + class TestAutoAck(unittest.TestCase):