From 50d3abc94d987d18305a19994747a0406a90fdb3 Mon Sep 17 00:00:00 2001 From: Danny Hermes Date: Wed, 20 Dec 2017 14:33:03 -0800 Subject: [PATCH] Allowed a custom `Event` type in Pub / Sub futures. --- pubsub/google/cloud/pubsub_v1/futures.py | 14 ++++- .../cloud/pubsub_v1/publisher/batch/thread.py | 2 +- .../cloud/pubsub_v1/publisher/futures.py | 10 +++ .../cloud/pubsub_v1/subscriber/futures.py | 12 +++- .../pubsub_v1/subscriber/policy/thread.py | 2 +- .../pubsub_v1/{publisher => }/test_futures.py | 62 ++++++++++++++----- 6 files changed, 79 insertions(+), 23 deletions(-) rename pubsub/tests/unit/pubsub_v1/{publisher => }/test_futures.py (69%) diff --git a/pubsub/google/cloud/pubsub_v1/futures.py b/pubsub/google/cloud/pubsub_v1/futures.py index 067fc7429ab9..ea7a0babf050 100644 --- a/pubsub/google/cloud/pubsub_v1/futures.py +++ b/pubsub/google/cloud/pubsub_v1/futures.py @@ -29,6 +29,16 @@ class Future(google.api_core.future.Future): This object should not be created directly, but is returned by other methods in this library. + + Args: + event_factory (Optional[Callable[[], Any]]): An event factory, expected + to take no arguments and return an event with the same interface as + :class:`threading.Event`. This is provided so that callers + with different concurrency models (e.g. ``threading`` or + ``multiprocessing``) can wait on an event that is compatible + with that model. The ``wait()`` and ``set()`` methods will be + used on the returned event. The default is + :class:`threading.Event`. """ # This could be a sentinel object or None, but the sentinel object's ID @@ -36,11 +46,11 @@ class Future(google.api_core.future.Future): # actually being a result. _SENTINEL = uuid.uuid4() - def __init__(self): + def __init__(self, event_factory=threading.Event): self._result = self._SENTINEL self._exception = self._SENTINEL self._callbacks = [] - self._completed = threading.Event() + self._completed = event_factory() def cancel(self): """Actions in Pub/Sub generally may not be canceled. diff --git a/pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py index 12b9790c6b80..70e1c7d36b5c 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py @@ -271,7 +271,7 @@ def publish(self, message): self._messages.append(message) # Track the future on this batch (so that the result of the # future can be set). - future = futures.Future() + future = futures.Future(event_factory=threading.Event) self._futures.append(future) # Determine the number of messages before releasing the lock. num_messages = len(self._messages) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/futures.py b/pubsub/google/cloud/pubsub_v1/publisher/futures.py index cca1c97f5f2f..072242be7822 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/futures.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/futures.py @@ -25,6 +25,16 @@ class Future(futures.Future): This object should not be created directly, but is returned by other methods in this library. + + Args: + event_factory (Optional[Callable[[], Any]]): An event factory, expected + to take no arguments and return an event with the same interface as + :class:`threading.Event`. This is provided so that callers + with different concurrency models (e.g. ``threading`` or + ``multiprocessing``) can wait on an event that is compatible + with that model. The ``wait()`` and ``set()`` methods will be + used on the returned event. The default is + :class:`threading.Event`. """ # The publishing-side subclass does not need any special behavior # at this time. diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/futures.py b/pubsub/google/cloud/pubsub_v1/subscriber/futures.py index fa1f457a2602..a502b9b48fa2 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/futures.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/futures.py @@ -29,10 +29,18 @@ class Future(futures.Future): Args: policy (~.pubsub_v1.subscriber.policy.base.BasePolicy): The policy that creates this Future. + event_factory (Optional[Callable[[], Any]]): An event factory, expected + to take no arguments and return an event with the same interface as + :class:`threading.Event`. This is provided so that callers + with different concurrency models (e.g. ``threading`` or + ``multiprocessing``) can wait on an event that is compatible + with that model. The ``wait()`` and ``set()`` methods will be + used on the returned event. The default is + :class:`threading.Event`. """ - def __init__(self, policy): + def __init__(self, policy, *args, **kwargs): self._policy = policy - super(Future, self).__init__() + super(Future, self).__init__(*args, **kwargs) def running(self): """Return whether this subscription is opened with this Future. diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py index 39f161a3b93e..0957f14a0762 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py @@ -263,7 +263,7 @@ def open(self, callback): # Create the Future that this method will return. # This future is the main thread's interface to handle exceptions, # block on the subscription, etc. - self._future = Future(policy=self) + self._future = Future(policy=self, event_factory=threading.Event) # Start the thread to pass the requests. self._callback = callback diff --git a/pubsub/tests/unit/pubsub_v1/publisher/test_futures.py b/pubsub/tests/unit/pubsub_v1/test_futures.py similarity index 69% rename from pubsub/tests/unit/pubsub_v1/publisher/test_futures.py rename to pubsub/tests/unit/pubsub_v1/test_futures.py index f179afa7012b..fc5170d2f21e 100644 --- a/pubsub/tests/unit/pubsub_v1/publisher/test_futures.py +++ b/pubsub/tests/unit/pubsub_v1/test_futures.py @@ -12,44 +12,72 @@ # See the License for the specific language governing permissions and # limitations under the License. -import mock +import threading +import mock import pytest -from google.cloud.pubsub_v1.publisher import exceptions -from google.cloud.pubsub_v1.publisher.futures import Future +from google.cloud.pubsub_v1 import exceptions +from google.cloud.pubsub_v1 import futures + + +def _future(*args, **kwargs): + return futures.Future(*args, **kwargs) + + +def test_constructor_defaults(): + future = _future() + + assert future._result == futures.Future._SENTINEL + assert future._exception == futures.Future._SENTINEL + assert future._callbacks == [] + event_type = type(threading.Event()) + assert isinstance(future._completed, event_type) + + +def test_constructor_with_factory(): + event_factory = mock.Mock(spec=()) + future = _future(event_factory=event_factory) + + assert future._result == futures.Future._SENTINEL + assert future._exception == futures.Future._SENTINEL + assert future._callbacks == [] + assert future._completed is event_factory.return_value + + # Check mocks. + event_factory.assert_called_once_with() def test_cancel(): - assert Future().cancel() is False + assert _future().cancel() is False def test_cancelled(): - assert Future().cancelled() is False + assert _future().cancelled() is False def test_running(): - future = Future() + future = _future() assert future.running() is True future.set_result('foobar') assert future.running() is False def test_done(): - future = Future() + future = _future() assert future.done() is False future.set_result('12345') assert future.done() is True def test_exception_no_error(): - future = Future() + future = _future() future.set_result('12345') assert future.exception() is None def test_exception_with_error(): - future = Future() + future = _future() error = RuntimeError('Something really bad happened.') future.set_exception(error) @@ -63,26 +91,26 @@ def test_exception_with_error(): def test_exception_timeout(): - future = Future() + future = _future() with pytest.raises(exceptions.TimeoutError): future.exception(timeout=0.01) def test_result_no_error(): - future = Future() + future = _future() future.set_result('42') assert future.result() == '42' def test_result_with_error(): - future = Future() + future = _future() future.set_exception(RuntimeError('Something really bad happened.')) with pytest.raises(RuntimeError): future.result() def test_add_done_callback_pending_batch(): - future = Future() + future = _future() callback = mock.Mock() future.add_done_callback(callback) assert len(future._callbacks) == 1 @@ -91,7 +119,7 @@ def test_add_done_callback_pending_batch(): def test_add_done_callback_completed_batch(): - future = Future() + future = _future() future.set_result('12345') callback = mock.Mock(spec=()) future.add_done_callback(callback) @@ -99,7 +127,7 @@ def test_add_done_callback_completed_batch(): def test_trigger(): - future = Future() + future = _future() callback = mock.Mock(spec=()) future.add_done_callback(callback) assert callback.call_count == 0 @@ -108,14 +136,14 @@ def test_trigger(): def test_set_result_once_only(): - future = Future() + future = _future() future.set_result('12345') with pytest.raises(RuntimeError): future.set_result('67890') def test_set_exception_once_only(): - future = Future() + future = _future() future.set_exception(ValueError('wah wah')) with pytest.raises(RuntimeError): future.set_exception(TypeError('other wah wah'))