From a2f17aaea481e3435b15a862ec93f2e53c4ab44f Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 14 Oct 2019 16:29:06 -0700 Subject: [PATCH 1/8] update sub.py & requirements.txt --- pubsub/cloud-client/quickstart/sub.py | 19 ++--- pubsub/cloud-client/quickstart/sub_test.py | 84 +++++++--------------- pubsub/cloud-client/requirements.txt | 2 +- 3 files changed, 38 insertions(+), 67 deletions(-) diff --git a/pubsub/cloud-client/quickstart/sub.py b/pubsub/cloud-client/quickstart/sub.py index 520803d70a5b..4d85e47dc2bf 100644 --- a/pubsub/cloud-client/quickstart/sub.py +++ b/pubsub/cloud-client/quickstart/sub.py @@ -16,7 +16,6 @@ # [START pubsub_quickstart_sub_all] import argparse -import time # [START pubsub_quickstart_sub_deps] from google.cloud import pubsub_v1 # [END pubsub_quickstart_sub_deps] @@ -34,20 +33,22 @@ def sub(project_id, subscription_name): project_id, subscription_name) def callback(message): - print('Received message {} of message ID {}'.format( + print('Received message {} of message ID {}\n'.format( message, message.message_id)) # Acknowledge the message. Unack'ed messages will be redelivered. message.ack() - print('Acknowledged message of message ID {}\n'.format( - message.message_id)) + print('Acknowledged message {}\n'.format(message.message_id)) - client.subscribe(subscription_path, callback=callback) + streaming_future = client.subscribe(subscription_path, callback=callback) print('Listening for messages on {}..\n'.format(subscription_path)) - # Keep the main thread from exiting so the subscriber can - # process messages in the background. - while True: - time.sleep(60) + # Calling result() on the StreamingPullFuture keeps the main thread alive + # while the callback function processes messages in the background. + try: + streaming_future.result() + except Exception as error: + streaming_future.cancel() + raise error if __name__ == '__main__': diff --git a/pubsub/cloud-client/quickstart/sub_test.py b/pubsub/cloud-client/quickstart/sub_test.py index 9c70384ed693..5dbd848def94 100644 --- a/pubsub/cloud-client/quickstart/sub_test.py +++ b/pubsub/cloud-client/quickstart/sub_test.py @@ -14,10 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -import mock +import multiprocessing as mp import os import pytest -import time from google.api_core.exceptions import AlreadyExists from google.cloud import pubsub_v1 @@ -29,84 +28,55 @@ TOPIC = 'quickstart-sub-test-topic' SUBSCRIPTION = 'quickstart-sub-test-topic-sub' - -@pytest.fixture(scope='module') -def publisher_client(): - yield pubsub_v1.PublisherClient() +publisher_client = pubsub_v1.PublisherClient() +subscriber_client = pubsub_v1.SubscriberClient() @pytest.fixture(scope='module') -def topic_path(publisher_client): +def topic_path(): topic_path = publisher_client.topic_path(PROJECT, TOPIC) try: - publisher_client.create_topic(topic_path) + topic = publisher_client.create_topic(topic_path) + return topic.name except AlreadyExists: - pass - - yield topic_path - - -@pytest.fixture(scope='module') -def subscriber_client(): - yield pubsub_v1.SubscriberClient() + return topic_path @pytest.fixture(scope='module') -def subscription(subscriber_client, topic_path): +def subscription_path(topic_path): subscription_path = subscriber_client.subscription_path( PROJECT, SUBSCRIPTION) try: - subscriber_client.create_subscription(subscription_path, topic_path) + subscription = subscriber_client.create_subscription( + subscription_path, topic_path) + return subscription.name except AlreadyExists: - pass + return subscription_path - yield SUBSCRIPTION - -@pytest.fixture -def to_delete(publisher_client, subscriber_client): - doomed = [] - yield doomed - for client, item in doomed: +def _to_delete(resource_paths): + for item in resource_paths: if 'topics' in item: publisher_client.delete_topic(item) if 'subscriptions' in item: subscriber_client.delete_subscription(item) -def _make_sleep_patch(): - real_sleep = time.sleep - - def new_sleep(period): - if period == 60: - real_sleep(10) - raise RuntimeError('sigil') - else: - real_sleep(period) - - return mock.patch('time.sleep', new=new_sleep) - - -def test_sub(publisher_client, - topic_path, - subscriber_client, - subscription, - to_delete, - capsys): - - publisher_client.publish(topic_path, data=b'Hello, World!') - - to_delete.append((publisher_client, topic_path)) - - with _make_sleep_patch(): - with pytest.raises(RuntimeError, match='sigil'): - sub.sub(PROJECT, subscription) - - to_delete.append((subscriber_client, - 'projects/{}/subscriptions/{}'.format(PROJECT, - SUBSCRIPTION))) +@pytest.fixture(scope='module') +def test_sub(topic_path, subscription_path, capsys): + publish_future = publisher_client.publish(topic_path, data=b'Hello World!') + publish_future.result() + + subscribe_process = mp.Process( + target=sub.sub, args=(PROJECT, SUBSCRIPTION,)) + subscribe_process.start() + subscribe_process.join(timeout=10) + subscribe_process.terminate() + + # Clean up resources. + _to_delete([topic_path, subscription_path]) out, _ = capsys.readouterr() assert "Received message" in out diff --git a/pubsub/cloud-client/requirements.txt b/pubsub/cloud-client/requirements.txt index d8470ecf937d..0836c258a1a5 100644 --- a/pubsub/cloud-client/requirements.txt +++ b/pubsub/cloud-client/requirements.txt @@ -1 +1 @@ -google-cloud-pubsub==0.39.1 +google-cloud-pubsub==1.0.2 From a9c4ff77e026302dac1495e1d758b0116cd9f7c2 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 14 Oct 2019 17:03:37 -0700 Subject: [PATCH 2/8] fix subscriber test --- pubsub/cloud-client/subscriber_test.py | 68 ++++++++++++++------------ 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 2dcfb33e2311..194e9f7d9866 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -42,13 +42,11 @@ def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) try: - publisher_client.delete_topic(topic_path) - except Exception: - pass + response = publisher_client.get_topic(topic_path) + except: #noqa + resposne = publisher_client.create_topic(topic_path) - publisher_client.create_topic(topic_path) - - yield topic_path + yield response.name @pytest.fixture(scope='module') @@ -56,54 +54,49 @@ def subscriber_client(): yield pubsub_v1.SubscriberClient() -@pytest.fixture +@pytest.fixture(scope='module') def subscription(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( PROJECT, SUBSCRIPTION) try: - subscriber_client.delete_subscription(subscription_path) - except Exception: - pass - - try: - subscriber_client.create_subscription(subscription_path, topic=topic) - except google.api_core.exceptions.AlreadyExists: - pass + response = subscriber_client.get_subscription(subscription_path) + except: # noqa + response = subscriber_client.create_subscription( + subscription_path, topic=topic) - yield subscription_path + yield response.name -@pytest.fixture +@pytest.fixture(scope='module') def subscription_sync1(subscriber_client, topic): subscription_sync_path = subscriber_client.subscription_path( PROJECT, SUBSCRIPTION_SYNC1) try: - subscriber_client.delete_subscription(subscription_sync_path) - except Exception: - pass - - subscriber_client.create_subscription(subscription_sync_path, topic=topic) + response = subscriber_client.get_subscription(subscription_path) + except: # noqa + response = subscriber_client.create_subscription( + subscription_path, topic=topic) - yield subscription_sync_path + yield response.name -@pytest.fixture +@pytest.fixture(scope='module') def subscription_sync2(subscriber_client, topic): subscription_sync_path = subscriber_client.subscription_path( PROJECT, SUBSCRIPTION_SYNC2) try: - subscriber_client.delete_subscription(subscription_sync_path) - except Exception: - pass - - subscriber_client.create_subscription(subscription_sync_path, topic=topic) + response = subscriber_client.get_subscription(subscription_path) + except: # noqa + response = subscriber_client.create_subscription( + subscription_path, topic=topic) - yield subscription_sync_path + yield response.name +@pytest.fixture(scope='module') def test_list_in_topic(subscription, capsys): @eventually_consistent.call def _(): @@ -112,6 +105,7 @@ def _(): assert subscription in out +@pytest.fixture(scope='module') def test_list_in_project(subscription, capsys): @eventually_consistent.call def _(): @@ -120,6 +114,7 @@ def _(): assert subscription in out +@pytest.fixture(scope='module') def test_create(subscriber_client): subscription_path = subscriber_client.subscription_path( PROJECT, SUBSCRIPTION) @@ -135,6 +130,7 @@ def _(): assert subscriber_client.get_subscription(subscription_path) +@pytest.fixture(scope='module') def test_create_push(subscriber_client): subscription_path = subscriber_client.subscription_path( PROJECT, SUBSCRIPTION) @@ -150,6 +146,7 @@ def _(): assert subscriber_client.get_subscription(subscription_path) +@pytest.fixture(scope='module') def test_delete(subscriber_client, subscription): subscriber.delete_subscription(PROJECT, SUBSCRIPTION) @@ -159,6 +156,7 @@ def _(): subscriber_client.get_subscription(subscription) +@pytest.fixture(scope='module') def test_update(subscriber_client, subscription, capsys): subscriber.update_subscription(PROJECT, SUBSCRIPTION, NEW_ENDPOINT) @@ -166,14 +164,15 @@ def test_update(subscriber_client, subscription, capsys): assert 'Subscription updated' in out +@pytest.fixture(scope='module') def _publish_messages(publisher_client, topic): for n in range(5): data = u'Message {}'.format(n).encode('utf-8') - future = publisher_client.publish( - topic, data=data) + future = publisher_client.publish(topic, data=data) future.result() +@pytest.fixture(scope='module') def _publish_messages_with_custom_attributes(publisher_client, topic): data = u'Test message'.encode('utf-8') future = publisher_client.publish(topic, data=data, origin='python-sample') @@ -193,6 +192,7 @@ def new_sleep(period): return mock.patch('time.sleep', new=new_sleep) +@pytest.fixture(scope='module') def test_receive(publisher_client, topic, subscription, capsys): _publish_messages(publisher_client, topic) @@ -206,6 +206,7 @@ def test_receive(publisher_client, topic, subscription, capsys): assert 'Message 1' in out +@pytest.fixture(scope='module') def test_receive_synchronously( publisher_client, topic, subscription_sync1, capsys): _publish_messages(publisher_client, topic) @@ -216,6 +217,7 @@ def test_receive_synchronously( assert 'Done.' in out +@pytest.fixture(scope='module') def test_receive_synchronously_with_lease( publisher_client, topic, subscription_sync2, capsys): _publish_messages(publisher_client, topic) @@ -227,6 +229,7 @@ def test_receive_synchronously_with_lease( assert 'Done.' in out +@pytest.fixture(scope='module') def test_receive_with_custom_attributes( publisher_client, topic, subscription, capsys): _publish_messages_with_custom_attributes(publisher_client, topic) @@ -242,6 +245,7 @@ def test_receive_with_custom_attributes( assert 'python-sample' in out +@pytest.fixture(scope='module') def test_receive_with_flow_control( publisher_client, topic, subscription, capsys): _publish_messages(publisher_client, topic) From 95908f816b872184050d4d2ed340a62eb75c5556 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 14 Oct 2019 17:37:41 -0700 Subject: [PATCH 3/8] rewrite test with timeout --- pubsub/cloud-client/quickstart/sub.py | 10 +++--- pubsub/cloud-client/quickstart/sub_test.py | 39 +++++++++++++++++----- 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/pubsub/cloud-client/quickstart/sub.py b/pubsub/cloud-client/quickstart/sub.py index 4234bf35cc8c..e39f14105b1a 100644 --- a/pubsub/cloud-client/quickstart/sub.py +++ b/pubsub/cloud-client/quickstart/sub.py @@ -39,16 +39,16 @@ def callback(message): message.ack() print('Acknowledged message {}\n'.format(message.message_id)) - streaming_future = client.subscribe(subscription_path, callback=callback) + streaming_pull_future = client.subscribe( + subscription_path, callback=callback) print('Listening for messages on {}..\n'.format(subscription_path)) - # Calling result() on the StreamingPullFuture keeps the main thread alive - # while the callback function processes messages in the background. + # Calling result() on StreamingPullFuture keeps the main thread from + # exiting while messages get processed in the callbacks. try: streaming_pull_future.result() - except Exception as error: + except: # noqa streaming_pull_future.cancel() - raise error if __name__ == '__main__': diff --git a/pubsub/cloud-client/quickstart/sub_test.py b/pubsub/cloud-client/quickstart/sub_test.py index 5dbd848def94..476139a02642 100644 --- a/pubsub/cloud-client/quickstart/sub_test.py +++ b/pubsub/cloud-client/quickstart/sub_test.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import multiprocessing as mp import os import pytest @@ -64,16 +63,40 @@ def _to_delete(resource_paths): subscriber_client.delete_subscription(item) -@pytest.fixture(scope='module') -def test_sub(topic_path, subscription_path, capsys): +def _publish_messages(topic_path): publish_future = publisher_client.publish(topic_path, data=b'Hello World!') publish_future.result() - subscribe_process = mp.Process( - target=sub.sub, args=(PROJECT, SUBSCRIPTION,)) - subscribe_process.start() - subscribe_process.join(timeout=10) - subscribe_process.terminate() + +def _sub_timeout(project_id, subscription_name): + # This is an exactly copy of `sub.py` except + # StreamingPullFuture.result() will time out after 10s. + client = pubsub_v1.SubscriberClient() + subscription_path = client.subscription_path( + project_id, subscription_name) + + def callback(message): + print('Received message {} of message ID {}\n'.format( + message, message.message_id)) + message.ack() + print('Acknowledged message {}\n'.format(message.message_id)) + + streaming_pull_future = client.subscribe( + subscription_path, callback=callback) + print('Listening for messages on {}..\n'.format(subscription_path)) + + try: + streaming_pull_future.result(timeout=10) + except: # noqa + streaming_pull_future.cancel() + + +def test_sub(monkeypatch, topic_path, subscription_path, capsys): + monkeypatch.setattr(sub, 'sub', _sub_timeout) + + _publish_messages(topic_path) + + sub.sub(PROJECT, SUBSCRIPTION) # Clean up resources. _to_delete([topic_path, subscription_path]) From 16fafe6f159528b4a5c5f9c225f9ed262e23b310 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 14 Oct 2019 17:42:35 -0700 Subject: [PATCH 4/8] resolve merge conflicts --- pubsub/cloud-client/subscriber_test.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 19d949b40ae2..042efefc7fb2 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -193,11 +193,6 @@ def new_sleep(period): return mock.patch('time.sleep', new=new_sleep) -<<<<<<< HEAD -@pytest.fixture(scope='module') -======= -@flaky ->>>>>>> 5c2e2a6e6734da91246aec6aa48eb6316e5c47a5 def test_receive(publisher_client, topic, subscription, capsys): _publish_messages(publisher_client, topic) @@ -211,7 +206,6 @@ def test_receive(publisher_client, topic, subscription, capsys): assert 'Message 1' in out -@pytest.fixture(scope='module') def test_receive_synchronously( publisher_client, topic, subscription_sync1, capsys): _publish_messages(publisher_client, topic) @@ -222,7 +216,6 @@ def test_receive_synchronously( assert 'Done.' in out -@pytest.fixture(scope='module') def test_receive_synchronously_with_lease( publisher_client, topic, subscription_sync2, capsys): _publish_messages(publisher_client, topic) @@ -234,7 +227,6 @@ def test_receive_synchronously_with_lease( assert 'Done.' in out -@pytest.fixture(scope='module') def test_receive_with_custom_attributes( publisher_client, topic, subscription, capsys): _publish_messages_with_custom_attributes(publisher_client, topic) @@ -250,7 +242,6 @@ def test_receive_with_custom_attributes( assert 'python-sample' in out -@pytest.fixture(scope='module') def test_receive_with_flow_control( publisher_client, topic, subscription, capsys): _publish_messages(publisher_client, topic) From 0da478dd5aa17009328cd59bf420d7f1ec40c8e6 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 14 Oct 2019 17:51:12 -0700 Subject: [PATCH 5/8] fix var name --- pubsub/cloud-client/subscriber_test.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 042efefc7fb2..111cd96b16f0 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -18,7 +18,6 @@ from gcp_devrel.testing import eventually_consistent from gcp_devrel.testing.flaky import flaky from google.cloud import pubsub_v1 -import google.api_core.exceptions import mock import pytest @@ -44,7 +43,7 @@ def topic(publisher_client): try: response = publisher_client.get_topic(topic_path) - except: #noqa + except: # noqa resposne = publisher_client.create_topic(topic_path) yield response.name @@ -71,7 +70,7 @@ def subscription(subscriber_client, topic): @pytest.fixture(scope='module') def subscription_sync1(subscriber_client, topic): - subscription_sync_path = subscriber_client.subscription_path( + subscription_path = subscriber_client.subscription_path( PROJECT, SUBSCRIPTION_SYNC1) try: @@ -85,7 +84,7 @@ def subscription_sync1(subscriber_client, topic): @pytest.fixture(scope='module') def subscription_sync2(subscriber_client, topic): - subscription_sync_path = subscriber_client.subscription_path( + subscription_path = subscriber_client.subscription_path( PROJECT, SUBSCRIPTION_SYNC2) try: From 554d342e96205394fe1f4a00cc2bf0c19e120370 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 14 Oct 2019 17:55:50 -0700 Subject: [PATCH 6/8] typo --- pubsub/cloud-client/subscriber_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 111cd96b16f0..c18cbe663661 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -44,7 +44,7 @@ def topic(publisher_client): try: response = publisher_client.get_topic(topic_path) except: # noqa - resposne = publisher_client.create_topic(topic_path) + response = publisher_client.create_topic(topic_path) yield response.name From 5d599a56aa6a2d08b87fbf281e24f100748dc574 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 14 Oct 2019 18:11:35 -0700 Subject: [PATCH 7/8] remove flaky --- pubsub/cloud-client/subscriber_test.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index c18cbe663661..3d7bfab815b1 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -16,7 +16,6 @@ import time from gcp_devrel.testing import eventually_consistent -from gcp_devrel.testing.flaky import flaky from google.cloud import pubsub_v1 import mock import pytest @@ -164,7 +163,6 @@ def test_update(subscriber_client, subscription, capsys): assert 'Subscription updated' in out -@pytest.fixture(scope='module') def _publish_messages(publisher_client, topic): for n in range(5): data = u'Message {}'.format(n).encode('utf-8') @@ -172,7 +170,6 @@ def _publish_messages(publisher_client, topic): future.result() -@pytest.fixture(scope='module') def _publish_messages_with_custom_attributes(publisher_client, topic): data = u'Test message'.encode('utf-8') future = publisher_client.publish(topic, data=data, origin='python-sample') From 3a1da4086ec256439eff9a62bcbb97de281aa517 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 21 Oct 2019 13:45:28 -0700 Subject: [PATCH 8/8] fix flaky test with separate subscriptions --- pubsub/cloud-client/iam_test.py | 2 +- pubsub/cloud-client/publisher.py | 2 +- pubsub/cloud-client/publisher_test.py | 12 +-- pubsub/cloud-client/quickstart.py | 2 +- pubsub/cloud-client/quickstart_test.py | 2 +- pubsub/cloud-client/subscriber.py | 2 +- pubsub/cloud-client/subscriber_test.py | 124 ++++++++++++------------- 7 files changed, 71 insertions(+), 75 deletions(-) diff --git a/pubsub/cloud-client/iam_test.py b/pubsub/cloud-client/iam_test.py index cfae98ffd00b..8a524c35a061 100644 --- a/pubsub/cloud-client/iam_test.py +++ b/pubsub/cloud-client/iam_test.py @@ -1,4 +1,4 @@ -# Copyright 2019 Google Inc. All Rights Reserved. +# Copyright 2016 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/pubsub/cloud-client/publisher.py b/pubsub/cloud-client/publisher.py index 76554d0258fe..490c903b2c1b 100644 --- a/pubsub/cloud-client/publisher.py +++ b/pubsub/cloud-client/publisher.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2019 Google LLC. All Rights Reserved. +# Copyright 2016 Google LLC. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/pubsub/cloud-client/publisher_test.py b/pubsub/cloud-client/publisher_test.py index b364553c2d41..5e550abd641d 100644 --- a/pubsub/cloud-client/publisher_test.py +++ b/pubsub/cloud-client/publisher_test.py @@ -1,4 +1,4 @@ -# Copyright 2019 Google Inc. All Rights Reserved. +# Copyright 2016 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -36,13 +36,11 @@ def topic(client): topic_path = client.topic_path(PROJECT, TOPIC) try: - client.delete_topic(topic_path) - except Exception: - pass - - client.create_topic(topic_path) + response = client.get_topic(topic_path) + except: # noqa + response = client.create_topic(topic_path) - yield topic_path + yield response.name def _make_sleep_patch(): diff --git a/pubsub/cloud-client/quickstart.py b/pubsub/cloud-client/quickstart.py index 10ff76f9b632..f48d085e06b5 100644 --- a/pubsub/cloud-client/quickstart.py +++ b/pubsub/cloud-client/quickstart.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2019 Google Inc. All Rights Reserved. +# Copyright 2016 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/pubsub/cloud-client/quickstart_test.py b/pubsub/cloud-client/quickstart_test.py index 3fce09dc8f5b..d318b260c63c 100644 --- a/pubsub/cloud-client/quickstart_test.py +++ b/pubsub/cloud-client/quickstart_test.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2019 Google Inc. All Rights Reserved. +# Copyright 2016 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index dbaa396cddd5..3bbad0ead1b0 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright 2019 Google Inc. All Rights Reserved. +# Copyright 2016 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 3d7bfab815b1..4c5fd61223db 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -1,4 +1,4 @@ -# Copyright 2019 Google Inc. All Rights Reserved. +# Copyright 2016 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,9 +24,9 @@ PROJECT = os.environ['GCLOUD_PROJECT'] TOPIC = 'subscription-test-topic' -SUBSCRIPTION = 'subscription-test-subscription' -SUBSCRIPTION_SYNC1 = 'subscription-test-subscription-sync1' -SUBSCRIPTION_SYNC2 = 'subscription-test-subscription-sync2' +SUBSCRIPTION_ONE = 'subscription-test-subscription-one' +SUBSCRIPTION_TWO = 'subscription-test-subscription-two' +SUBSCRIPTION_THREE = 'subscription-test-subscription-three' ENDPOINT = 'https://{}.appspot.com/push'.format(PROJECT) NEW_ENDPOINT = 'https://{}.appspot.com/push2'.format(PROJECT) @@ -54,9 +54,9 @@ def subscriber_client(): @pytest.fixture(scope='module') -def subscription(subscriber_client, topic): +def subscription_one(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION_ONE) try: response = subscriber_client.get_subscription(subscription_path) @@ -68,9 +68,9 @@ def subscription(subscriber_client, topic): @pytest.fixture(scope='module') -def subscription_sync1(subscriber_client, topic): +def subscription_two(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_SYNC1) + PROJECT, SUBSCRIPTION_TWO) try: response = subscriber_client.get_subscription(subscription_path) @@ -82,9 +82,9 @@ def subscription_sync1(subscriber_client, topic): @pytest.fixture(scope='module') -def subscription_sync2(subscriber_client, topic): +def subscription_three(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_SYNC2) + PROJECT, SUBSCRIPTION_THREE) try: response = subscriber_client.get_subscription(subscription_path) @@ -95,72 +95,68 @@ def subscription_sync2(subscriber_client, topic): yield response.name -@pytest.fixture(scope='module') -def test_list_in_topic(subscription, capsys): +def test_list_in_topic(subscription_one, capsys): @eventually_consistent.call def _(): subscriber.list_subscriptions_in_topic(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert subscription in out + assert subscription_one in out -@pytest.fixture(scope='module') -def test_list_in_project(subscription, capsys): +def test_list_in_project(subscription_one, capsys): @eventually_consistent.call def _(): subscriber.list_subscriptions_in_project(PROJECT) out, _ = capsys.readouterr() - assert subscription in out + assert subscription_one in out -@pytest.fixture(scope='module') def test_create(subscriber_client): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION_ONE) + try: subscriber_client.delete_subscription(subscription_path) except Exception: pass - subscriber.create_subscription(PROJECT, TOPIC, SUBSCRIPTION) + subscriber.create_subscription(PROJECT, TOPIC, SUBSCRIPTION_ONE) @eventually_consistent.call def _(): assert subscriber_client.get_subscription(subscription_path) -@pytest.fixture(scope='module') def test_create_push(subscriber_client): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION_ONE) try: subscriber_client.delete_subscription(subscription_path) except Exception: pass - subscriber.create_push_subscription(PROJECT, TOPIC, SUBSCRIPTION, ENDPOINT) + subscriber.create_push_subscription( + PROJECT, TOPIC, SUBSCRIPTION_ONE, ENDPOINT) @eventually_consistent.call def _(): assert subscriber_client.get_subscription(subscription_path) -@pytest.fixture(scope='module') -def test_delete(subscriber_client, subscription): - subscriber.delete_subscription(PROJECT, SUBSCRIPTION) +def test_update(subscriber_client, subscription_one, capsys): + subscriber.update_subscription(PROJECT, SUBSCRIPTION_ONE, NEW_ENDPOINT) - @eventually_consistent.call - def _(): - with pytest.raises(Exception): - subscriber_client.get_subscription(subscription) + out, _ = capsys.readouterr() + assert 'Subscription updated' in out -@pytest.fixture(scope='module') -def test_update(subscriber_client, subscription, capsys): - subscriber.update_subscription(PROJECT, SUBSCRIPTION, NEW_ENDPOINT) +def test_delete(subscriber_client, subscription_one): + subscriber.delete_subscription(PROJECT, SUBSCRIPTION_ONE) - out, _ = capsys.readouterr() - assert 'Subscription updated' in out + @eventually_consistent.call + def _(): + with pytest.raises(Exception): + subscriber_client.get_subscription(subscription_one) def _publish_messages(publisher_client, topic): @@ -189,48 +185,28 @@ def new_sleep(period): return mock.patch('time.sleep', new=new_sleep) -def test_receive(publisher_client, topic, subscription, capsys): +def test_receive(publisher_client, topic, subscription_two, capsys): _publish_messages(publisher_client, topic) with _make_sleep_patch(): with pytest.raises(RuntimeError, match='sigil'): - subscriber.receive_messages(PROJECT, SUBSCRIPTION) + subscriber.receive_messages(PROJECT, SUBSCRIPTION_TWO) out, _ = capsys.readouterr() assert 'Listening' in out - assert subscription in out + assert subscription_two in out assert 'Message 1' in out -def test_receive_synchronously( - publisher_client, topic, subscription_sync1, capsys): - _publish_messages(publisher_client, topic) - - subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC1) - - out, _ = capsys.readouterr() - assert 'Done.' in out - - -def test_receive_synchronously_with_lease( - publisher_client, topic, subscription_sync2, capsys): - _publish_messages(publisher_client, topic) - - subscriber.synchronous_pull_with_lease_management( - PROJECT, SUBSCRIPTION_SYNC2) - - out, _ = capsys.readouterr() - assert 'Done.' in out - - def test_receive_with_custom_attributes( - publisher_client, topic, subscription, capsys): + publisher_client, topic, subscription_two, capsys): + _publish_messages_with_custom_attributes(publisher_client, topic) with _make_sleep_patch(): with pytest.raises(RuntimeError, match='sigil'): subscriber.receive_messages_with_custom_attributes( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION_TWO) out, _ = capsys.readouterr() assert 'Test message' in out @@ -239,15 +215,37 @@ def test_receive_with_custom_attributes( def test_receive_with_flow_control( - publisher_client, topic, subscription, capsys): + publisher_client, topic, subscription_two, capsys): + _publish_messages(publisher_client, topic) with _make_sleep_patch(): with pytest.raises(RuntimeError, match='sigil'): subscriber.receive_messages_with_flow_control( - PROJECT, SUBSCRIPTION) + PROJECT, SUBSCRIPTION_TWO) out, _ = capsys.readouterr() assert 'Listening' in out - assert subscription in out + assert subscription_two in out assert 'Message 1' in out + + +def test_receive_synchronously( + publisher_client, topic, subscription_three, capsys): + _publish_messages(publisher_client, topic) + + subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_THREE) + + out, _ = capsys.readouterr() + assert 'Done.' in out + + +def test_receive_synchronously_with_lease( + publisher_client, topic, subscription_three, capsys): + _publish_messages(publisher_client, topic) + + subscriber.synchronous_pull_with_lease_management( + PROJECT, SUBSCRIPTION_THREE) + + out, _ = capsys.readouterr() + assert 'Done.' in out