Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: exactly-once delivery support #550

Merged
merged 44 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
ed5d6e5
Exactly-once changes
pradn Jan 19, 2022
e35de42
rename retry duration constants
pradn Feb 16, 2022
23504c8
add grpc-status dependency
pradn Feb 16, 2022
0a33f37
change dep from grpc-status to grpcio-status per error message during…
pradn Feb 16, 2022
0b6db2e
run lint formatter
pradn Feb 16, 2022
ccbd066
loosen version requirement for grpcio-status dependency to fix build
pradn Feb 16, 2022
bd21fad
rerun formatter with new version of black to fix lint error
pradn Feb 16, 2022
a6ab8ef
Add receive_messages_with_exactly_once_subscribe sample.
pradn Feb 16, 2022
fd73015
Return new AcknowledgeError exception type for ack/modack failures. T…
pradn Feb 18, 2022
349de40
Modify exactly-once subscribe sample to use new AcknowledgeError exce…
pradn Feb 18, 2022
71cb780
Fix formatting for google/cloud/pubsub_v1/subscriber/exceptions.py
pradn Feb 18, 2022
77ce3ef
Address Mahesh's comments
pradn Feb 18, 2022
f6f5d8e
Rename AcknowledgeErrorCode to AcknowledgeStatus bc it includes SUCCESS.
pradn Feb 18, 2022
f35573a
Retry leasing modack failures if exactly-once is enabled. Tests haven…
pradn Feb 18, 2022
c8017f7
Address Mahesh's comments
pradn Feb 18, 2022
b2e9c4b
Shorten use of AcknowledgeError
pradn Feb 18, 2022
21c1431
Fix tests and code
pradn Feb 22, 2022
08351c2
Fix lint errors
pradn Feb 23, 2022
7ba40e1
Improved test coverage.
pradn Feb 23, 2022
94aa54c
Improve coverage.
pradn Feb 23, 2022
97fc72c
improve coverage for streaming_pull_manager
pradn Feb 23, 2022
1220d0f
Improve code coverage
pradn Feb 23, 2022
57235b9
Reformat files
pradn Feb 23, 2022
10f5a93
More fixes
pradn Feb 23, 2022
cdb6340
Improve coverage
pradn Feb 23, 2022
03c8b54
Improve coverage
pradn Feb 23, 2022
8099fde
lint
pradn Feb 23, 2022
7edee1d
Retry on a new thread to avoid blocking the one dispatcher thread.
pradn Feb 24, 2022
6fe7bf8
Remove sample - will be pulled into separate PR.
pradn Feb 24, 2022
1d209c9
Fix type checking errors.
pradn Feb 24, 2022
3facfa5
Address some of Mahesh's comments
pradn Feb 25, 2022
c186713
Return AcknowledgeStatus.SUCCESS for _with_response methods if exactl…
pradn Feb 25, 2022
8fb9d56
Get coverage to 100%
pradn Feb 25, 2022
898715b
Add default value to new Message ctor parameter so PubSubLite code do…
pradn Feb 25, 2022
245cdfd
Complete futures when a permanent RetryError is thrown and the Stream…
pradn Feb 25, 2022
b1e5d70
Fix lint
pradn Feb 25, 2022
377222b
Fix coverage false positive in test by ignoring it.
pradn Feb 25, 2022
a20d48c
Reword some comments with "exactly-once delivery"
pradn Feb 25, 2022
3b390b7
Remove debug print
pradn Feb 25, 2022
9fa28c5
Fix coverage false positive in test by ignoring it.
pradn Feb 25, 2022
c4ba9c7
Complete all requests, not just ones with futures
pradn Feb 28, 2022
36fcab9
Better names to reflect them applying to all reqs not just ones with …
pradn Feb 28, 2022
4158315
Improve tests and comments
pradn Feb 28, 2022
eddd267
Merge branch 'main' into exactly-once2
pradn Mar 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Exactly-once changes
  • Loading branch information
pradn committed Mar 1, 2022
commit ed5d6e5938e1c8d56d6f03efdc1b7df5fd95d103
1 change: 1 addition & 0 deletions google/cloud/pubsub_v1/proto/pubsub.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,7 @@ message StreamingPullRequest {
message StreamingPullResponse {
// Subscription properties sent as part of the response.
message SubscriptionProperties {
bool exactly_once_delivery_enabled = 1;
// True iff message ordering is enabled for this subscription.
bool message_ordering_enabled = 2;
}
Expand Down
79 changes: 62 additions & 17 deletions google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import typing
from typing import List, Optional, Sequence, Union
import warnings
from google.api_core.retry import exponential_sleep_generator

from google.cloud.pubsub_v1.subscriber._protocol import helper_threads
from google.cloud.pubsub_v1.subscriber._protocol import requests
Expand Down Expand Up @@ -66,6 +67,13 @@
IDs at a time.
"""

_MIN_ACK_MODACK_RETRY_DURATION_SECS = 1
"""The time to wait for the first retry of failed acks and modacks when exactly-once
is enabled."""

_MAX_ACK_MODACK_RETRY_DURATION_SECS = 20 * 60
"""The maximum amount of time in seconds to retry failed acks and modacks when
exactly-once is enabled."""

class Dispatcher(object):
def __init__(self, manager: "StreamingPullManager", queue: "queue.Queue"):
Expand Down Expand Up @@ -168,17 +176,35 @@ def ack(self, items: Sequence[requests.AckRequest]) -> None:

# We must potentially split the request into multiple smaller requests
# to avoid the server-side max request size limit.
ack_ids = (item.ack_id for item in items)
items_gen = iter(items)
ack_ids_gen = (item.ack_id for item in items)
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))

for _ in range(total_chunks):
request = gapic_types.StreamingPullRequest(
ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE)
)
self._manager.send(request)

# Remove the message from lease management.
self.drop(items)
future_reqs_dict = {req.ack_id: req for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) if req.future}
requests_completed, requests_to_retry = self._manager.send_unary_ack(
ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)),
future_reqs_dict=future_reqs_dict)

# Remove the completed messages from lease management.
self.drop(requests_completed)

# retry acks
retry_delay_gen = exponential_sleep_generator(initial=_MIN_ACK_MODACK_RETRY_DURATION_SECS,
maximum=_MAX_ACK_MODACK_RETRY_DURATION_SECS)
while requests_to_retry:
time_to_wait = retry_delay_gen()
_LOGGER.debug("Retrying {len(requests_to_retry)} ack(s) after delay of " +
str(time_to_wait) + " seconds")
time.sleep(time_to_wait)

future_reqs_dict = {req.ack_id: req for req in requests_to_retry if req.future}
requests_completed, requests_to_retry = self._manager.send_unary_ack(
ack_ids=[req.ack_id for req in requests_to_retry],
future_reqs_dict=future_reqs_dict)
assert len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE, "Too many requests to be retried."
# Remove the completed messages from lease management.
self.drop(requests_completed)

def drop(
self,
Expand Down Expand Up @@ -215,16 +241,35 @@ def modify_ack_deadline(self, items: Sequence[requests.ModAckRequest]) -> None:
"""
# We must potentially split the request into multiple smaller requests
# to avoid the server-side max request size limit.
ack_ids = (item.ack_id for item in items)
seconds = (item.seconds for item in items)
items_gen = iter(items)
ack_ids_gen = (item.ack_id for item in items)
deadline_seconds_gen = (item.seconds for item in items)
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))

for _ in range(total_chunks):
request = gapic_types.StreamingPullRequest(
modify_deadline_ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE),
modify_deadline_seconds=itertools.islice(seconds, _ACK_IDS_BATCH_SIZE),
)
self._manager.send(request)
future_reqs_dict = {req.ack_id: req for req in itertools.islice(items_gen, _ACK_IDS_BATCH_SIZE) if req.future}
# no further work needs to be done for `requests_to_retry`
requests_completed, requests_to_retry = self._manager.send_unary_modack(
modify_deadline_ack_ids=list(itertools.islice(ack_ids_gen, _ACK_IDS_BATCH_SIZE)),
modify_deadline_seconds=list(itertools.islice(deadline_seconds_gen, _ACK_IDS_BATCH_SIZE)),
future_reqs_dict=future_reqs_dict)
assert len(requests_to_retry) <= _ACK_IDS_BATCH_SIZE, "Too many requests to be retried."

# retry modacks
retry_delay_gen = exponential_sleep_generator(initial=_MIN_ACK_MODACK_RETRY_DURATION_SECS,
maximum=_MAX_ACK_MODACK_RETRY_DURATION_SECS)
while requests_to_retry:
time_to_wait = retry_delay_gen()
_LOGGER.debug("Retrying {len(requests_to_retry)} modack(s) after delay of " +
str(time_to_wait) + " seconds")
time.sleep(time_to_wait)

print(requests_to_retry)
future_reqs_dict = {req.ack_id: req for req in requests_to_retry if req.future}
requests_to_retry = self._manager.send_unary_modack(
modify_deadline_ack_ids=[req.ack_id for req in requests_to_retry],
modify_deadline_seconds=[req.seconds for req in requests_to_retry],
future_reqs_dict=future_reqs_dict)

def nack(self, items: Sequence[requests.NackRequest]) -> None:
"""Explicitly deny receipt of messages.
Expand All @@ -233,6 +278,6 @@ def nack(self, items: Sequence[requests.NackRequest]) -> None:
items: The items to deny.
"""
self.modify_ack_deadline(
[requests.ModAckRequest(ack_id=item.ack_id, seconds=0) for item in items]
[requests.ModAckRequest(ack_id=item.ack_id, seconds=0, future=item.future) for item in items]
)
self.drop([requests.DropRequest(*item) for item in items])
self.drop([requests.DropRequest(ack_id=item.ack_id, byte_size=item.byte_size, ordering_key=item.ordering_key) for item in items])
2 changes: 1 addition & 1 deletion google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def maintain_leases(self) -> None:
# is inactive.
assert self._manager.dispatcher is not None
self._manager.dispatcher.modify_ack_deadline(
[requests.ModAckRequest(ack_id, deadline) for ack_id in ack_ids]
[requests.ModAckRequest(ack_id, deadline, None) for ack_id in ack_ids]
)

# Now wait an appropriate period of time and do this again.
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/pubsub_v1/subscriber/_protocol/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import typing
from typing import NamedTuple, Optional

if typing.TYPE_CHECKING: # pragma: NO COVER
from google.cloud.pubsub_v1.subscriber import futures

# Namedtuples for management requests. Used by the Message class to communicate
# items of work back to the policy.
Expand All @@ -22,6 +25,7 @@ class AckRequest(NamedTuple):
byte_size: int
time_to_ack: float
ordering_key: Optional[str]
future: Optional["pubsub_v1.subscriber.futures.Future"]


class DropRequest(NamedTuple):
Expand All @@ -39,9 +43,11 @@ class LeaseRequest(NamedTuple):
class ModAckRequest(NamedTuple):
ack_id: str
seconds: float
future: Optional["pubsub_v1.subscriber.futures.Future"]


class NackRequest(NamedTuple):
ack_id: str
byte_size: int
ordering_key: Optional[str]
future: Optional["pubsub_v1.subscriber.futures.Future"]
Loading