From 1b8918d577cd66d82de48ec693b9a1a966b1c009 Mon Sep 17 00:00:00 2001 From: Mukund Ananthu Date: Mon, 22 Apr 2024 21:00:57 +0000 Subject: [PATCH] Testing: sync modack timed --- .../subscriber/_protocol/streaming_pull_manager.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index f07db8546..1418e85b6 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -41,6 +41,7 @@ import google.cloud.pubsub_v1.subscriber.message from google.cloud.pubsub_v1.subscriber import futures from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler +from datetime import datetime from google.pubsub_v1 import types as gapic_types from grpc_status import rpc_status # type: ignore from google.rpc.error_details_pb2 import ErrorInfo # type: ignore @@ -1098,10 +1099,15 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: # Immediately (i.e. without waiting for the auto lease management) # modack the messages we received, as this tells the server that we've # received them. + before_acks = datetime.now() ack_id_gen = (message.ack_id for message in received_messages) expired_ack_ids = self._send_lease_modacks( ack_id_gen, self.ack_deadline, warn_on_invalid=False ) + after_acks = datetime.now() + sync_modack_time_diff = after_acks - before_acks + print(f"time for sync modacking = {sync_modack_time_diff.total_seconds()*1000}") + with self._pause_resume_lock: assert self._scheduler is not None