Skip to content

Commit

Permalink
Allowed a custom Event type in Pub / Sub futures.
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes committed Dec 21, 2017
1 parent b2c7c11 commit d5d7d7e
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 23 deletions.
14 changes: 12 additions & 2 deletions pubsub/google/cloud/pubsub_v1/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,28 @@ class Future(google.api_core.future.Future):
This object should not be created directly, but is returned by other
methods in this library.
Args:
event_factory (Optional[Callable[[], Any]]): An event factory, expected
to take no arguments and return an event with the same interface as
:class:`threading.Event`. This is provided so that callers
with different concurrency models (e.g. ``threading`` or
``multiprocessing``) can wait on an event that is compatible
with that model. The ``wait()`` and ``set()`` methods will be
used on the returned event. The default is
:class:`threading.Event`.
"""

# This could be a sentinel object or None, but the sentinel object's ID
# can change if the process is forked, and None has the possibility of
# actually being a result.
_SENTINEL = uuid.uuid4()

def __init__(self):
def __init__(self, event_factory=threading.Event):
self._result = self._SENTINEL
self._exception = self._SENTINEL
self._callbacks = []
self._completed = threading.Event()
self._completed = event_factory()

def cancel(self):
"""Actions in Pub/Sub generally may not be canceled.
Expand Down
2 changes: 1 addition & 1 deletion pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def publish(self, message):
self._messages.append(message)
# Track the future on this batch (so that the result of the
# future can be set).
future = futures.Future()
future = futures.Future(event_factory=threading.Event)
self._futures.append(future)
# Determine the number of messages before releasing the lock.
num_messages = len(self._messages)
Expand Down
10 changes: 10 additions & 0 deletions pubsub/google/cloud/pubsub_v1/publisher/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ class Future(futures.Future):
This object should not be created directly, but is returned by other
methods in this library.
Args:
event_factory (Optional[Callable[[], Any]]): An event factory, expected
to take no arguments and return an event with the same interface as
:class:`threading.Event`. This is provided so that callers
with different concurrency models (e.g. ``threading`` or
``multiprocessing``) can wait on an event that is compatible
with that model. The ``wait()`` and ``set()`` methods will be
used on the returned event. The default is
:class:`threading.Event`.
"""
# The publishing-side subclass does not need any special behavior
# at this time.
Expand Down
12 changes: 10 additions & 2 deletions pubsub/google/cloud/pubsub_v1/subscriber/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,18 @@ class Future(futures.Future):
Args:
policy (~.pubsub_v1.subscriber.policy.base.BasePolicy): The policy
that creates this Future.
event_factory (Optional[Callable[[], Any]]): An event factory, expected
to take no arguments and return an event with the same interface as
:class:`threading.Event`. This is provided so that callers
with different concurrency models (e.g. ``threading`` or
``multiprocessing``) can wait on an event that is compatible
with that model. The ``wait()`` and ``set()`` methods will be
used on the returned event. The default is
:class:`threading.Event`.
"""
def __init__(self, policy):
def __init__(self, policy, *args, **kwargs):
self._policy = policy
super(Future, self).__init__()
super(Future, self).__init__(*args, **kwargs)

def running(self):
"""Return whether this subscription is opened with this Future.
Expand Down
2 changes: 1 addition & 1 deletion pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def open(self, callback):
# Create the Future that this method will return.
# This future is the main thread's interface to handle exceptions,
# block on the subscription, etc.
self._future = Future(policy=self)
self._future = Future(policy=self, event_factory=threading.Event)

# Start the thread to pass the requests.
self._callback = callback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,44 +12,72 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import mock
import threading

import mock
import pytest

from google.cloud.pubsub_v1.publisher import exceptions
from google.cloud.pubsub_v1.publisher.futures import Future
from google.cloud.pubsub_v1 import exceptions
from google.cloud.pubsub_v1 import futures


def _future(*args, **kwargs):
return futures.Future(*args, **kwargs)


def test_constructor_defaults():
future = _future()

assert future._result == futures.Future._SENTINEL
assert future._exception == futures.Future._SENTINEL
assert future._callbacks == []
event_type = type(threading.Event())
assert isinstance(future._completed, event_type)


def test_constructor_with_factory():
event_factory = mock.Mock(spec=())
future = _future(event_factory=event_factory)

assert future._result == futures.Future._SENTINEL
assert future._exception == futures.Future._SENTINEL
assert future._callbacks == []
assert future._completed is event_factory.return_value

# Check mocks.
event_factory.assert_called_once_with()


def test_cancel():
assert Future().cancel() is False
assert _future().cancel() is False


def test_cancelled():
assert Future().cancelled() is False
assert _future().cancelled() is False


def test_running():
future = Future()
future = _future()
assert future.running() is True
future.set_result('foobar')
assert future.running() is False


def test_done():
future = Future()
future = _future()
assert future.done() is False
future.set_result('12345')
assert future.done() is True


def test_exception_no_error():
future = Future()
future = _future()
future.set_result('12345')
assert future.exception() is None


def test_exception_with_error():
future = Future()
future = _future()
error = RuntimeError('Something really bad happened.')
future.set_exception(error)

Expand All @@ -63,26 +91,26 @@ def test_exception_with_error():


def test_exception_timeout():
future = Future()
future = _future()
with pytest.raises(exceptions.TimeoutError):
future.exception(timeout=0.01)


def test_result_no_error():
future = Future()
future = _future()
future.set_result('42')
assert future.result() == '42'


def test_result_with_error():
future = Future()
future = _future()
future.set_exception(RuntimeError('Something really bad happened.'))
with pytest.raises(RuntimeError):
future.result()


def test_add_done_callback_pending_batch():
future = Future()
future = _future()
callback = mock.Mock()
future.add_done_callback(callback)
assert len(future._callbacks) == 1
Expand All @@ -91,15 +119,15 @@ def test_add_done_callback_pending_batch():


def test_add_done_callback_completed_batch():
future = Future()
future = _future()
future.set_result('12345')
callback = mock.Mock(spec=())
future.add_done_callback(callback)
callback.assert_called_once_with(future)


def test_trigger():
future = Future()
future = _future()
callback = mock.Mock(spec=())
future.add_done_callback(callback)
assert callback.call_count == 0
Expand All @@ -108,14 +136,14 @@ def test_trigger():


def test_set_result_once_only():
future = Future()
future = _future()
future.set_result('12345')
with pytest.raises(RuntimeError):
future.set_result('67890')


def test_set_exception_once_only():
future = Future()
future = _future()
future.set_exception(ValueError('wah wah'))
with pytest.raises(RuntimeError):
future.set_exception(TypeError('other wah wah'))

0 comments on commit d5d7d7e

Please sign in to comment.