diff --git a/pubsub/google/cloud/pubsub_v1/futures.py b/pubsub/google/cloud/pubsub_v1/futures.py index ea7a0babf050..0940a47709a3 100644 --- a/pubsub/google/cloud/pubsub_v1/futures.py +++ b/pubsub/google/cloud/pubsub_v1/futures.py @@ -31,14 +31,13 @@ class Future(google.api_core.future.Future): methods in this library. Args: - event_factory (Optional[Callable[[], Any]]): An event factory, expected - to take no arguments and return an event with the same interface as + completed (Optional[Any]): An event, with the same interface as :class:`threading.Event`. This is provided so that callers with different concurrency models (e.g. ``threading`` or - ``multiprocessing``) can wait on an event that is compatible + ``multiprocessing``) can supply an event that is compatible with that model. The ``wait()`` and ``set()`` methods will be - used on the returned event. The default is - :class:`threading.Event`. + used. If this argument is not provided, then a new + :class:`threading.Event` will be created and used. """ # This could be a sentinel object or None, but the sentinel object's ID @@ -46,11 +45,13 @@ class Future(google.api_core.future.Future): # actually being a result. _SENTINEL = uuid.uuid4() - def __init__(self, event_factory=threading.Event): + def __init__(self, completed=None): self._result = self._SENTINEL self._exception = self._SENTINEL self._callbacks = [] - self._completed = event_factory() + if completed is None: + completed = threading.Event() + self._completed = completed def cancel(self): """Actions in Pub/Sub generally may not be canceled. diff --git a/pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py b/pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py index 70e1c7d36b5c..73cafb9cde13 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py @@ -271,7 +271,7 @@ def publish(self, message): self._messages.append(message) # Track the future on this batch (so that the result of the # future can be set). - future = futures.Future(event_factory=threading.Event) + future = futures.Future(completed=threading.Event()) self._futures.append(future) # Determine the number of messages before releasing the lock. num_messages = len(self._messages) diff --git a/pubsub/google/cloud/pubsub_v1/publisher/futures.py b/pubsub/google/cloud/pubsub_v1/publisher/futures.py index 072242be7822..9c0e93120bc5 100644 --- a/pubsub/google/cloud/pubsub_v1/publisher/futures.py +++ b/pubsub/google/cloud/pubsub_v1/publisher/futures.py @@ -27,14 +27,13 @@ class Future(futures.Future): methods in this library. Args: - event_factory (Optional[Callable[[], Any]]): An event factory, expected - to take no arguments and return an event with the same interface as + completed (Optional[Any]): An event, with the same interface as :class:`threading.Event`. This is provided so that callers with different concurrency models (e.g. ``threading`` or - ``multiprocessing``) can wait on an event that is compatible + ``multiprocessing``) can supply an event that is compatible with that model. The ``wait()`` and ``set()`` methods will be - used on the returned event. The default is - :class:`threading.Event`. + used. If this argument is not provided, then a new + :class:`threading.Event` will be created and used. """ # The publishing-side subclass does not need any special behavior # at this time. diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/futures.py b/pubsub/google/cloud/pubsub_v1/subscriber/futures.py index a502b9b48fa2..7114a32c9600 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/futures.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/futures.py @@ -29,18 +29,17 @@ class Future(futures.Future): Args: policy (~.pubsub_v1.subscriber.policy.base.BasePolicy): The policy that creates this Future. - event_factory (Optional[Callable[[], Any]]): An event factory, expected - to take no arguments and return an event with the same interface as + completed (Optional[Any]): An event, with the same interface as :class:`threading.Event`. This is provided so that callers with different concurrency models (e.g. ``threading`` or - ``multiprocessing``) can wait on an event that is compatible + ``multiprocessing``) can supply an event that is compatible with that model. The ``wait()`` and ``set()`` methods will be - used on the returned event. The default is - :class:`threading.Event`. + used. If this argument is not provided, then a new + :class:`threading.Event` will be created and used. """ - def __init__(self, policy, *args, **kwargs): + def __init__(self, policy, completed=None): self._policy = policy - super(Future, self).__init__(*args, **kwargs) + super(Future, self).__init__(completed=completed) def running(self): """Return whether this subscription is opened with this Future. diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py index 0957f14a0762..37d8fdc63519 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py @@ -263,7 +263,7 @@ def open(self, callback): # Create the Future that this method will return. # This future is the main thread's interface to handle exceptions, # block on the subscription, etc. - self._future = Future(policy=self, event_factory=threading.Event) + self._future = Future(policy=self, completed=threading.Event()) # Start the thread to pass the requests. self._callback = callback diff --git a/pubsub/tests/unit/pubsub_v1/test_futures.py b/pubsub/tests/unit/pubsub_v1/test_futures.py index fc5170d2f21e..9dd77b506267 100644 --- a/pubsub/tests/unit/pubsub_v1/test_futures.py +++ b/pubsub/tests/unit/pubsub_v1/test_futures.py @@ -26,26 +26,25 @@ def _future(*args, **kwargs): def test_constructor_defaults(): - future = _future() + with mock.patch.object(threading, 'Event', autospec=True) as Event: + future = _future() assert future._result == futures.Future._SENTINEL assert future._exception == futures.Future._SENTINEL assert future._callbacks == [] - event_type = type(threading.Event()) - assert isinstance(future._completed, event_type) + assert future._completed is Event.return_value + + Event.assert_called_once_with() -def test_constructor_with_factory(): - event_factory = mock.Mock(spec=()) - future = _future(event_factory=event_factory) +def test_constructor_explicit_completed(): + completed = mock.sentinel.completed + future = _future(completed=completed) assert future._result == futures.Future._SENTINEL assert future._exception == futures.Future._SENTINEL assert future._callbacks == [] - assert future._completed is event_factory.return_value - - # Check mocks. - event_factory.assert_called_once_with() + assert future._completed is completed def test_cancel():