Skip to content

Commit

Permalink
Propagate subscribe callback errors to main thread (#7954)
Browse files Browse the repository at this point in the history
The **reCAPTCHA Enterprise** build failure is unrelated to this, thus merging.
  • Loading branch information
plamut authored May 15, 2019
1 parent 6b8eb5f commit f4ab042
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _maybe_wrap_exception(exception):
return exception


def _wrap_callback_errors(callback, message):
def _wrap_callback_errors(callback, on_callback_error, message):
"""Wraps a user callback so that if an exception occurs the message is
nacked.
Expand All @@ -62,14 +62,15 @@ def _wrap_callback_errors(callback, message):
"""
try:
callback(message)
except Exception:
except Exception as exc:
# Note: the likelihood of this failing is extremely low. This just adds
# a message to a queue, so if this doesn't work the world is in an
# unrecoverable state and this thread should just bail.
_LOGGER.exception(
"Top-level exception occurred in callback while processing a " "message"
"Top-level exception occurred in callback while processing a message"
)
message.nack()
on_callback_error(exc)


class StreamingPullManager(object):
Expand Down Expand Up @@ -299,21 +300,26 @@ def heartbeat(self):
if self._rpc is not None and self._rpc.is_active:
self._rpc.send(types.StreamingPullRequest())

def open(self, callback):
def open(self, callback, on_callback_error):
"""Begin consuming messages.
Args:
callback (Callable[None, google.cloud.pubsub_v1.message.Messages]):
callback (Callable[None, google.cloud.pubsub_v1.message.Message]):
A callback that will be called for each message received on the
stream.
on_callback_error (Callable[Exception]):
A callable that will be called if an exception is raised in
the provided `callback`.
"""
if self.is_active:
raise ValueError("This manager is already open.")

if self._closed:
raise ValueError("This manager has been closed and can not be re-used.")

self._callback = functools.partial(_wrap_callback_errors, callback)
self._callback = functools.partial(
_wrap_callback_errors, callback, on_callback_error
)

# Create the RPC
self._rpc = bidi.ResumableBidiRpc(
Expand Down
2 changes: 1 addition & 1 deletion pubsub/google/cloud/pubsub_v1/subscriber/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,6 @@ def callback(message):

future = futures.StreamingPullFuture(manager)

manager.open(callback)
manager.open(callback=callback, on_callback_error=future.set_exception)

return future
29 changes: 29 additions & 0 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import threading
import time

import mock
import pytest
import six

Expand Down Expand Up @@ -178,6 +179,34 @@ def test_subscribe_to_messages_async_callbacks(
future.cancel()


class TestStreamingPull(object):
def test_streaming_pull_callback_error_propagation(
self, publisher, topic_path, subscriber, subscription_path, cleanup
):
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, topic_path))
cleanup.append((subscriber.delete_subscription, subscription_path))

# create a topic and subscribe to it
publisher.create_topic(topic_path)
subscriber.create_subscription(subscription_path, topic_path)

# publish a messages and wait until published
future = publisher.publish(topic_path, b"hello!")
future.result(timeout=30)

# Now subscribe to the topic and verify that an error in the callback
# is propagated through the streaming pull future.
class CallbackError(Exception):
pass

callback = mock.Mock(side_effect=CallbackError)
future = subscriber.subscribe(subscription_path, callback)

with pytest.raises(CallbackError):
future.result(timeout=30)


class AckCallback(object):
def __init__(self):
self.calls = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,26 @@ def test__maybe_wrap_exception(exception, expected_cls):
def test__wrap_callback_errors_no_error():
msg = mock.create_autospec(message.Message, instance=True)
callback = mock.Mock()
on_callback_error = mock.Mock()

streaming_pull_manager._wrap_callback_errors(callback, msg)
streaming_pull_manager._wrap_callback_errors(callback, on_callback_error, msg)

callback.assert_called_once_with(msg)
msg.nack.assert_not_called()
on_callback_error.assert_not_called()


def test__wrap_callback_errors_error():
callback_error = ValueError("meep")

msg = mock.create_autospec(message.Message, instance=True)
callback = mock.Mock(side_effect=ValueError("meep"))
callback = mock.Mock(side_effect=callback_error)
on_callback_error = mock.Mock()

streaming_pull_manager._wrap_callback_errors(callback, msg)
streaming_pull_manager._wrap_callback_errors(callback, on_callback_error, msg)

msg.nack.assert_called_once()
on_callback_error.assert_called_once_with(callback_error)


def test_constructor_and_default_state():
Expand Down Expand Up @@ -319,7 +325,7 @@ def test_heartbeat_inactive():
def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc):
manager = make_manager()

manager.open(mock.sentinel.callback)
manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)

heartbeater.assert_called_once_with(manager)
heartbeater.return_value.start.assert_called_once()
Expand Down Expand Up @@ -357,15 +363,15 @@ def test_open_already_active():
manager._consumer.is_active = True

with pytest.raises(ValueError, match="already open"):
manager.open(mock.sentinel.callback)
manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)


def test_open_has_been_closed():
manager = make_manager()
manager._closed = True

with pytest.raises(ValueError, match="closed"):
manager.open(mock.sentinel.callback)
manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)


def make_running_manager():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ def test_subscribe(manager_open):
assert isinstance(future, futures.StreamingPullFuture)

assert future._manager._subscription == "sub_name_a"
manager_open.assert_called_once_with(mock.ANY, mock.sentinel.callback)
manager_open.assert_called_once_with(
mock.ANY, mock.sentinel.callback, future.set_exception
)


@mock.patch(
Expand All @@ -97,4 +99,6 @@ def test_subscribe_options(manager_open):
assert future._manager._subscription == "sub_name_a"
assert future._manager.flow_control == flow_control
assert future._manager._scheduler == scheduler
manager_open.assert_called_once_with(mock.ANY, mock.sentinel.callback)
manager_open.assert_called_once_with(
mock.ANY, mock.sentinel.callback, future.set_exception
)

0 comments on commit f4ab042

Please sign in to comment.