Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub: Propagate subscribe callback errors to main thread #7954

Merged
merged 1 commit into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _maybe_wrap_exception(exception):
return exception


def _wrap_callback_errors(callback, message):
def _wrap_callback_errors(callback, on_callback_error, message):
"""Wraps a user callback so that if an exception occurs the message is
nacked.

Expand All @@ -62,14 +62,15 @@ def _wrap_callback_errors(callback, message):
"""
try:
callback(message)
except Exception:
except Exception as exc:
# Note: the likelihood of this failing is extremely low. This just adds
# a message to a queue, so if this doesn't work the world is in an
# unrecoverable state and this thread should just bail.
_LOGGER.exception(
"Top-level exception occurred in callback while processing a " "message"
"Top-level exception occurred in callback while processing a message"
)
message.nack()
on_callback_error(exc)


class StreamingPullManager(object):
Expand Down Expand Up @@ -299,21 +300,26 @@ def heartbeat(self):
if self._rpc is not None and self._rpc.is_active:
self._rpc.send(types.StreamingPullRequest())

def open(self, callback):
def open(self, callback, on_callback_error):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, this method signature change is safe, as StreamingPullManager class is only used internally in the subscriber client's subscribe() method, and is not mentioned anywhere in the user docs, at least from what I can see.

"""Begin consuming messages.

Args:
callback (Callable[None, google.cloud.pubsub_v1.message.Messages]):
callback (Callable[None, google.cloud.pubsub_v1.message.Message]):
A callback that will be called for each message received on the
stream.
on_callback_error (Callable[Exception]):
A callable that will be called if an exception is raised in
the provided `callback`.
"""
if self.is_active:
raise ValueError("This manager is already open.")

if self._closed:
raise ValueError("This manager has been closed and can not be re-used.")

self._callback = functools.partial(_wrap_callback_errors, callback)
self._callback = functools.partial(
_wrap_callback_errors, callback, on_callback_error
)

# Create the RPC
self._rpc = bidi.ResumableBidiRpc(
Expand Down
2 changes: 1 addition & 1 deletion pubsub/google/cloud/pubsub_v1/subscriber/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,6 @@ def callback(message):

future = futures.StreamingPullFuture(manager)

manager.open(callback)
manager.open(callback=callback, on_callback_error=future.set_exception)

return future
29 changes: 29 additions & 0 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import threading
import time

import mock
import pytest
import six

Expand Down Expand Up @@ -178,6 +179,34 @@ def test_subscribe_to_messages_async_callbacks(
future.cancel()


class TestStreamingPull(object):
def test_streaming_pull_callback_error_propagation(
self, publisher, topic_path, subscriber, subscription_path, cleanup
):
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, topic_path))
cleanup.append((subscriber.delete_subscription, subscription_path))

# create a topic and subscribe to it
publisher.create_topic(topic_path)
subscriber.create_subscription(subscription_path, topic_path)

# publish a messages and wait until published
future = publisher.publish(topic_path, b"hello!")
future.result(timeout=30)

# Now subscribe to the topic and verify that an error in the callback
# is propagated through the streaming pull future.
class CallbackError(Exception):
pass

callback = mock.Mock(side_effect=CallbackError)
future = subscriber.subscribe(subscription_path, callback)

with pytest.raises(CallbackError):
future.result(timeout=30)


class AckCallback(object):
def __init__(self):
self.calls = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,26 @@ def test__maybe_wrap_exception(exception, expected_cls):
def test__wrap_callback_errors_no_error():
msg = mock.create_autospec(message.Message, instance=True)
callback = mock.Mock()
on_callback_error = mock.Mock()

streaming_pull_manager._wrap_callback_errors(callback, msg)
streaming_pull_manager._wrap_callback_errors(callback, on_callback_error, msg)

callback.assert_called_once_with(msg)
msg.nack.assert_not_called()
on_callback_error.assert_not_called()


def test__wrap_callback_errors_error():
callback_error = ValueError("meep")

msg = mock.create_autospec(message.Message, instance=True)
callback = mock.Mock(side_effect=ValueError("meep"))
callback = mock.Mock(side_effect=callback_error)
on_callback_error = mock.Mock()

streaming_pull_manager._wrap_callback_errors(callback, msg)
streaming_pull_manager._wrap_callback_errors(callback, on_callback_error, msg)

msg.nack.assert_called_once()
on_callback_error.assert_called_once_with(callback_error)


def test_constructor_and_default_state():
Expand Down Expand Up @@ -319,7 +325,7 @@ def test_heartbeat_inactive():
def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc):
manager = make_manager()

manager.open(mock.sentinel.callback)
manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)

heartbeater.assert_called_once_with(manager)
heartbeater.return_value.start.assert_called_once()
Expand Down Expand Up @@ -357,15 +363,15 @@ def test_open_already_active():
manager._consumer.is_active = True

with pytest.raises(ValueError, match="already open"):
manager.open(mock.sentinel.callback)
manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)


def test_open_has_been_closed():
manager = make_manager()
manager._closed = True

with pytest.raises(ValueError, match="closed"):
manager.open(mock.sentinel.callback)
manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)


def make_running_manager():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ def test_subscribe(manager_open):
assert isinstance(future, futures.StreamingPullFuture)

assert future._manager._subscription == "sub_name_a"
manager_open.assert_called_once_with(mock.ANY, mock.sentinel.callback)
manager_open.assert_called_once_with(
mock.ANY, mock.sentinel.callback, future.set_exception
)


@mock.patch(
Expand All @@ -97,4 +99,6 @@ def test_subscribe_options(manager_open):
assert future._manager._subscription == "sub_name_a"
assert future._manager.flow_control == flow_control
assert future._manager._scheduler == scheduler
manager_open.assert_called_once_with(mock.ANY, mock.sentinel.callback)
manager_open.assert_called_once_with(
mock.ANY, mock.sentinel.callback, future.set_exception
)