-
Notifications
You must be signed in to change notification settings - Fork 207
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add context manager capability to subscriber
- Loading branch information
Showing
4 changed files
with
82 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
import itertools | ||
import operator as op | ||
import os | ||
import psutil | ||
import threading | ||
import time | ||
|
||
|
@@ -46,7 +47,7 @@ def publisher(): | |
yield pubsub_v1.PublisherClient() | ||
|
||
|
||
@pytest.fixture(scope=u"module") | ||
@pytest.fixture() | ||
def subscriber(): | ||
yield pubsub_v1.SubscriberClient() | ||
|
||
|
@@ -383,6 +384,50 @@ def test_managing_subscription_iam_policy( | |
assert bindings[1].members == ["group:[email protected]"] | ||
|
||
|
||
def test_subscriber_not_leaking_open_sockets( | ||
publisher, topic_path, subscriber, subscription_path, cleanup | ||
): | ||
# Make sure the topic and the supscription get deleted. | ||
# NOTE: Since `subscriber` will be closed in the test, we need another | ||
# subscriber to clean up the subscription. | ||
# Also, we need to make sure that auxiliary subscriber releases the sockets, too. | ||
subscriber_2 = pubsub_v1.SubscriberClient() | ||
cleanup.append((subscriber_2.delete_subscription, subscription_path)) | ||
|
||
def one_arg_close(subscriber): # the cleanup helper expects exactly one argument | ||
subscriber.close() | ||
|
||
cleanup.append((one_arg_close, subscriber_2)) | ||
cleanup.append((publisher.delete_topic, topic_path)) | ||
|
||
# Create topic before starting to track connection count (any sockets opened | ||
# by the publisher client are not counted by this test). | ||
publisher.create_topic(topic_path) | ||
|
||
current_process = psutil.Process() | ||
conn_count_start = len(current_process.connections()) | ||
|
||
# Publish a few messages, then synchronously pull them and check that | ||
# no sockets are leaked. | ||
with subscriber: | ||
subscriber.create_subscription(name=subscription_path, topic=topic_path) | ||
|
||
# Publish a few messages, wait for the publish to succeed. | ||
publish_futures = [ | ||
publisher.publish(topic_path, u"message {}".format(i).encode()) | ||
for i in range(1, 4) | ||
] | ||
for future in publish_futures: | ||
future.result() | ||
|
||
# Synchronously pull messages. | ||
response = subscriber.pull(subscription_path, max_messages=3) | ||
assert len(response.received_messages) == 3 | ||
|
||
conn_count_end = len(current_process.connections()) | ||
assert conn_count_end == conn_count_start | ||
|
||
|
||
class TestStreamingPull(object): | ||
def test_streaming_pull_callback_error_propagation( | ||
self, publisher, topic_path, subscriber, subscription_path, cleanup | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters