Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Handle duplicate acks with streaming pull #662

Merged
merged 43 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
fe9b23d
fix: de-duplicate AckRequests in error handling
acocuzzo Apr 19, 2022
fd8e7de
fix: de-duplicate AckRequests in error handling
acocuzzo Apr 19, 2022
fe8a015
adding asserts to check eod
acocuzzo Apr 19, 2022
a0c9057
adding asserts to check eod
acocuzzo Apr 19, 2022
7a07982
adding asserts to check eod
acocuzzo Apr 19, 2022
47b823b
adding asserts to check eod
acocuzzo Apr 19, 2022
ed4e527
removing unused imports
acocuzzo Apr 19, 2022
846079f
Merge branch 'googleapis:main' into duplicate-ack-handling
acocuzzo Apr 19, 2022
035a7b3
allow duplicate AckRequests in ack_reqs_dict and add handling of futures
acocuzzo Apr 20, 2022
0f2cedd
fixing linter/rebase errors
acocuzzo Apr 20, 2022
9b82d83
removing coverage for test assertions
acocuzzo Apr 20, 2022
1ae67e0
lint
acocuzzo Apr 20, 2022
049fb1b
changing comments
acocuzzo Apr 21, 2022
2743c39
Merge branch 'main' into duplicate-ack-handling
acocuzzo Apr 21, 2022
095e5a4
revert test_streaming_pull_manager
acocuzzo Apr 21, 2022
02e076b
revert streaming_pull changes
acocuzzo Apr 21, 2022
d8ba72b
updating comment
acocuzzo Apr 21, 2022
40acbcf
deflaking test
acocuzzo Apr 21, 2022
cfa800d
deflaking test
acocuzzo Apr 21, 2022
156f302
adding typed_backoff
acocuzzo Apr 21, 2022
b15c3f8
deflaking test
acocuzzo Apr 21, 2022
171718c
deflaking test
acocuzzo Apr 21, 2022
45ba418
Revert "deflaking test"
acocuzzo Apr 21, 2022
9ef18ea
add verbose to tests
acocuzzo Apr 21, 2022
df0ae1d
reverting test changes
acocuzzo Apr 22, 2022
0dda164
Merge branch 'main' into duplicate-ack-handling
acocuzzo Apr 22, 2022
0d7fa61
reverint noxfile changeS
acocuzzo Apr 22, 2022
6ec2262
Merge branch 'main' into duplicate-ack-handling
acocuzzo Apr 25, 2022
9cde911
adding duplicate handling to modack and address comments
acocuzzo May 3, 2022
241501f
updating unit test
acocuzzo May 4, 2022
286ac1a
move deduplicating to dispatch_callback
acocuzzo May 4, 2022
018a822
reformat
acocuzzo May 4, 2022
be4c390
remove unused var
acocuzzo May 4, 2022
0a6833d
Merge branch 'main' into duplicate-ack-handling
acocuzzo May 4, 2022
9426015
fix typing _handle_duplicate_request
acocuzzo May 5, 2022
ae016c7
Merge branch 'main' into duplicate-ack-handling
acocuzzo May 5, 2022
d95f20a
addressing comments
acocuzzo May 10, 2022
4bb0856
returning ValueError
acocuzzo May 10, 2022
25531e2
remove unused import
acocuzzo May 10, 2022
0896e06
remove unused import
acocuzzo May 10, 2022
105952f
Merge branch 'main' into duplicate-ack-handling
acocuzzo May 10, 2022
add7591
removing unused tests
acocuzzo May 10, 2022
9f34719
adding test to ensure different ack_id sets
acocuzzo May 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 64 additions & 5 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@

from google.cloud.pubsub_v1.subscriber._protocol import helper_threads
from google.cloud.pubsub_v1.subscriber._protocol import requests
from google.cloud.pubsub_v1.subscriber.exceptions import (
AcknowledgeStatus,
)

if typing.TYPE_CHECKING: # pragma: NO COVER
import queue
Expand Down Expand Up @@ -128,17 +131,50 @@ def dispatch_callback(self, items: Sequence[RequestItem]) -> None:
nack_requests: List[requests.NackRequest] = []
drop_requests: List[requests.DropRequest] = []

lease_ids = set()
modack_ids = set()
ack_ids = set()
nack_ids = set()
drop_ids = set()
exactly_once_delivery_enabled = self._manager._exactly_once_delivery_enabled()

for item in items:
if isinstance(item, requests.LeaseRequest):
lease_requests.append(item)
if (
item.ack_id not in lease_ids
): # LeaseRequests have no futures to handle.
lease_ids.add(item.ack_id)
lease_requests.append(item)
elif isinstance(item, requests.ModAckRequest):
modack_requests.append(item)
if item.ack_id in modack_ids:
self._handle_duplicate_request_future(
exactly_once_delivery_enabled, item
)
else:
modack_ids.add(item.ack_id)
modack_requests.append(item)
elif isinstance(item, requests.AckRequest):
ack_requests.append(item)
if item.ack_id in ack_ids:
self._handle_duplicate_request_future(
exactly_once_delivery_enabled, item
)
else:
ack_ids.add(item.ack_id)
ack_requests.append(item)
elif isinstance(item, requests.NackRequest):
nack_requests.append(item)
if item.ack_id in nack_ids:
self._handle_duplicate_request_future(
exactly_once_delivery_enabled, item
)
else:
nack_ids.add(item.ack_id)
nack_requests.append(item)
elif isinstance(item, requests.DropRequest):
drop_requests.append(item)
if (
item.ack_id not in drop_ids
): # DropRequests have no futures to handle.
drop_ids.add(item.ack_id)
drop_requests.append(item)
else:
warnings.warn(
f'Skipping unknown request item of type "{type(item)}"',
Expand All @@ -164,6 +200,29 @@ def dispatch_callback(self, items: Sequence[RequestItem]) -> None:
if drop_requests:
self.drop(drop_requests)

def _handle_duplicate_request_future(
self,
exactly_once_delivery_enabled: bool,
item: Union[requests.AckRequest, requests.ModAckRequest, requests.NackRequest],
) -> None:
_LOGGER.debug(
"This is a duplicate %s with the same ack_id: %s.",
type(item),
item.ack_id,
)
if item.future:
if exactly_once_delivery_enabled:
item.future.set_exception(
ValueError(f"Duplicate ack_id for {type(item)}")
)
# Futures may be present even with exactly-once delivery
# disabled, in transition periods after the setting is changed on
# the subscription.
else:
# When exactly-once delivery is NOT enabled, acks/modacks are considered
# best-effort, so the future should succeed even though this is a duplicate.
item.future.set_result(AcknowledgeStatus.SUCCESS)

def ack(self, items: Sequence[requests.AckRequest]) -> None:
"""Acknowledge the given messages.

Expand Down
272 changes: 272 additions & 0 deletions tests/unit/pubsub_v1/subscriber/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

import mock
import pytest
from google.cloud.pubsub_v1.subscriber.exceptions import (
AcknowledgeStatus,
)


@pytest.mark.parametrize(
Expand All @@ -48,6 +51,7 @@ def test_dispatch_callback_active_manager(item, method_name):
dispatcher_.dispatch_callback(items)

method.assert_called_once_with([item])
manager._exactly_once_delivery_enabled.assert_called()


@pytest.mark.parametrize(
Expand All @@ -73,6 +77,274 @@ def test_dispatch_callback_inactive_manager(item, method_name):
dispatcher_.dispatch_callback(items)

method.assert_called_once_with([item])
manager._exactly_once_delivery_enabled.assert_called()


@pytest.mark.parametrize(
"items,method_name",
[
(
[
requests.AckRequest("0", 0, 0, "", None),
requests.AckRequest("0", 0, 1, "", None),
],
"ack",
),
(
[
requests.DropRequest("0", 0, ""),
requests.DropRequest("0", 1, ""),
],
"drop",
),
(
[
requests.LeaseRequest("0", 0, ""),
requests.LeaseRequest("0", 1, ""),
],
"lease",
),
(
[
requests.ModAckRequest("0", 0, None),
requests.ModAckRequest("0", 1, None),
],
"modify_ack_deadline",
),
(
[
requests.NackRequest("0", 0, "", None),
requests.NackRequest("0", 1, "", None),
],
"nack",
),
],
)
def test_dispatch_duplicate_items_callback_active_manager_no_futures(
items, method_name
):
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
)
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)

manager._exactly_once_delivery_enabled.return_value = False
with mock.patch.object(dispatcher_, method_name) as method:
dispatcher_.dispatch_callback(items)

method.assert_called_once_with([items[0]])
manager._exactly_once_delivery_enabled.assert_called()


@pytest.mark.parametrize(
"items,method_name",
[
(
[
requests.AckRequest("0", 0, 0, "", None),
requests.AckRequest("0", 0, 1, "", futures.Future()),
],
"ack",
),
(
[
requests.DropRequest("0", 0, ""),
requests.DropRequest("0", 1, ""),
],
"drop",
),
(
[
requests.LeaseRequest("0", 0, ""),
requests.LeaseRequest("0", 1, ""),
],
"lease",
),
(
[
requests.ModAckRequest("0", 0, None),
requests.ModAckRequest("0", 1, futures.Future()),
],
"modify_ack_deadline",
),
(
[
requests.NackRequest("0", 0, "", None),
requests.NackRequest("0", 1, "", futures.Future()),
],
"nack",
),
],
)
def test_dispatch_duplicate_items_callback_active_manager_with_futures_no_eod(
items, method_name
):
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
)
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)

manager._exactly_once_delivery_enabled.return_value = False
with mock.patch.object(dispatcher_, method_name) as method:
dispatcher_.dispatch_callback(items)

method.assert_called_once_with([items[0]])
manager._exactly_once_delivery_enabled.assert_called()

if method_name != "drop" and method_name != "lease":
assert items[1].future.result() == AcknowledgeStatus.SUCCESS


@pytest.mark.parametrize(
"items,method_name",
[
(
[
requests.AckRequest("0", 0, 0, "", None),
requests.AckRequest("0", 0, 1, "", futures.Future()),
],
"ack",
),
(
[
requests.DropRequest("0", 0, ""),
requests.DropRequest("0", 1, ""),
],
"drop",
),
(
[
requests.LeaseRequest("0", 0, ""),
requests.LeaseRequest("0", 1, ""),
],
"lease",
),
(
[
requests.ModAckRequest("0", 0, None),
requests.ModAckRequest("0", 1, futures.Future()),
],
"modify_ack_deadline",
),
(
[
requests.NackRequest("0", 0, "", None),
requests.NackRequest("0", 1, "", futures.Future()),
],
"nack",
),
],
)
def test_dispatch_duplicate_items_callback_active_manager_with_futures_eod(
items, method_name
):
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
)
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)

manager._exactly_once_delivery_enabled.return_value = True
with mock.patch.object(dispatcher_, method_name) as method:
dispatcher_.dispatch_callback(items)

method.assert_called_once_with([items[0]])
manager._exactly_once_delivery_enabled.assert_called()

if method_name != "drop" and method_name != "lease":
with pytest.raises(ValueError) as err:
items[1].future.result()
assert err.errisinstance(ValueError)


def test_dispatch_duplicate_items_diff_types_callback_active_manager_with_futures_eod():
ack_future = futures.Future()
ack_request = requests.AckRequest("0", 0, 1, "", ack_future)
drop_request = requests.DropRequest("0", 1, "")
lease_request = requests.LeaseRequest("0", 1, "")
nack_future = futures.Future()
nack_request = requests.NackRequest("0", 1, "", nack_future)
modack_future = futures.Future()
modack_request = requests.ModAckRequest("0", 1, modack_future)

items = [ack_request, drop_request, lease_request, nack_request, modack_request]

manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
)
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)

manager._exactly_once_delivery_enabled.return_value = True
with mock.patch.multiple(
dispatcher_,
ack=mock.DEFAULT,
nack=mock.DEFAULT,
drop=mock.DEFAULT,
lease=mock.DEFAULT,
modify_ack_deadline=mock.DEFAULT,
):
dispatcher_.dispatch_callback(items)
manager._exactly_once_delivery_enabled.assert_called()
dispatcher_.ack.assert_called_once_with([ack_request])
dispatcher_.drop.assert_called_once_with([drop_request])
dispatcher_.lease.assert_called_once_with([lease_request])
dispatcher_.nack.assert_called_once_with([nack_request])
dispatcher_.modify_ack_deadline.assert_called_once_with([modack_request])


@pytest.mark.parametrize(
"items,method_name",
[
(
[
requests.AckRequest("0", 0, 0, "", None),
requests.AckRequest("0", 0, 1, "", None),
],
"ack",
),
(
[
requests.DropRequest("0", 0, ""),
requests.DropRequest("0", 1, ""),
],
"drop",
),
(
[
requests.LeaseRequest("0", 0, ""),
requests.LeaseRequest("0", 1, ""),
],
"lease",
),
(
[
requests.ModAckRequest("0", 0, None),
requests.ModAckRequest("0", 1, None),
],
"modify_ack_deadline",
),
(
[
requests.NackRequest("0", 0, "", None),
requests.NackRequest("0", 1, "", None),
],
"nack",
),
],
)
def test_dispatch_duplicate_items_callback_active_manager_no_futures_eod(
items, method_name
):
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
)
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)

manager._exactly_once_delivery_enabled.return_value = True
with mock.patch.object(dispatcher_, method_name) as method:
dispatcher_.dispatch_callback(items)

method.assert_called_once_with([items[0]])
manager._exactly_once_delivery_enabled.assert_called()


def test_unknown_request_type():
Expand Down