Skip to content

Commit

Permalink
Re-factor to use an instantiated Event() rather than an `event_fact…
Browse files Browse the repository at this point in the history
…ory`.
  • Loading branch information
dhermes committed Dec 21, 2017
1 parent d5d7d7e commit 1020c5e
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 31 deletions.
15 changes: 8 additions & 7 deletions pubsub/google/cloud/pubsub_v1/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,27 @@ class Future(google.api_core.future.Future):
methods in this library.
Args:
event_factory (Optional[Callable[[], Any]]): An event factory, expected
to take no arguments and return an event with the same interface as
completed (Optional[Any]): An event, with the same interface as
:class:`threading.Event`. This is provided so that callers
with different concurrency models (e.g. ``threading`` or
``multiprocessing``) can wait on an event that is compatible
``multiprocessing``) can supply an event that is compatible
with that model. The ``wait()`` and ``set()`` methods will be
used on the returned event. The default is
:class:`threading.Event`.
used. If this argument is not provided, then a new
:class:`threading.Event` will be created and used.
"""

# This could be a sentinel object or None, but the sentinel object's ID
# can change if the process is forked, and None has the possibility of
# actually being a result.
_SENTINEL = uuid.uuid4()

def __init__(self, event_factory=threading.Event):
def __init__(self, completed=None):
self._result = self._SENTINEL
self._exception = self._SENTINEL
self._callbacks = []
self._completed = event_factory()
if completed is None:
completed = threading.Event()
self._completed = completed

def cancel(self):
"""Actions in Pub/Sub generally may not be canceled.
Expand Down
2 changes: 1 addition & 1 deletion pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def publish(self, message):
self._messages.append(message)
# Track the future on this batch (so that the result of the
# future can be set).
future = futures.Future(event_factory=threading.Event)
future = futures.Future(completed=threading.Event())
self._futures.append(future)
# Determine the number of messages before releasing the lock.
num_messages = len(self._messages)
Expand Down
9 changes: 4 additions & 5 deletions pubsub/google/cloud/pubsub_v1/publisher/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ class Future(futures.Future):
methods in this library.
Args:
event_factory (Optional[Callable[[], Any]]): An event factory, expected
to take no arguments and return an event with the same interface as
completed (Optional[Any]): An event, with the same interface as
:class:`threading.Event`. This is provided so that callers
with different concurrency models (e.g. ``threading`` or
``multiprocessing``) can wait on an event that is compatible
``multiprocessing``) can supply an event that is compatible
with that model. The ``wait()`` and ``set()`` methods will be
used on the returned event. The default is
:class:`threading.Event`.
used. If this argument is not provided, then a new
:class:`threading.Event` will be created and used.
"""
# The publishing-side subclass does not need any special behavior
# at this time.
Expand Down
13 changes: 6 additions & 7 deletions pubsub/google/cloud/pubsub_v1/subscriber/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,17 @@ class Future(futures.Future):
Args:
policy (~.pubsub_v1.subscriber.policy.base.BasePolicy): The policy
that creates this Future.
event_factory (Optional[Callable[[], Any]]): An event factory, expected
to take no arguments and return an event with the same interface as
completed (Optional[Any]): An event, with the same interface as
:class:`threading.Event`. This is provided so that callers
with different concurrency models (e.g. ``threading`` or
``multiprocessing``) can wait on an event that is compatible
``multiprocessing``) can supply an event that is compatible
with that model. The ``wait()`` and ``set()`` methods will be
used on the returned event. The default is
:class:`threading.Event`.
used. If this argument is not provided, then a new
:class:`threading.Event` will be created and used.
"""
def __init__(self, policy, *args, **kwargs):
def __init__(self, policy, completed=None):
self._policy = policy
super(Future, self).__init__(*args, **kwargs)
super(Future, self).__init__(completed=completed)

def running(self):
"""Return whether this subscription is opened with this Future.
Expand Down
2 changes: 1 addition & 1 deletion pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def open(self, callback):
# Create the Future that this method will return.
# This future is the main thread's interface to handle exceptions,
# block on the subscription, etc.
self._future = Future(policy=self, event_factory=threading.Event)
self._future = Future(policy=self, completed=threading.Event())

# Start the thread to pass the requests.
self._callback = callback
Expand Down
19 changes: 9 additions & 10 deletions pubsub/tests/unit/pubsub_v1/test_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,25 @@ def _future(*args, **kwargs):


def test_constructor_defaults():
future = _future()
with mock.patch.object(threading, 'Event', autospec=True) as Event:
future = _future()

assert future._result == futures.Future._SENTINEL
assert future._exception == futures.Future._SENTINEL
assert future._callbacks == []
event_type = type(threading.Event())
assert isinstance(future._completed, event_type)
assert future._completed is Event.return_value

Event.assert_called_once_with()


def test_constructor_with_factory():
event_factory = mock.Mock(spec=())
future = _future(event_factory=event_factory)
def test_constructor_explicit_completed():
completed = mock.sentinel.completed
future = _future(completed=completed)

assert future._result == futures.Future._SENTINEL
assert future._exception == futures.Future._SENTINEL
assert future._callbacks == []
assert future._completed is event_factory.return_value

# Check mocks.
event_factory.assert_called_once_with()
assert future._completed is completed


def test_cancel():
Expand Down

0 comments on commit 1020c5e

Please sign in to comment.