From b49341f953b06d5fa535e13f044344cccf22577f Mon Sep 17 00:00:00 2001
From: Peter Lamut <inbox@peterlamut.com>
Date: Wed, 31 Jul 2019 00:02:34 +0200
Subject: [PATCH 1/6] Remove Message lease() method and autolease param

These two have been deprecated in 0.44.0 and it's time to remove them.
---
 .../_protocol/streaming_pull_manager.py       |  5 +-
 .../cloud/pubsub_v1/subscriber/message.py     | 35 +-----------
 .../unit/pubsub_v1/subscriber/test_message.py | 54 ++++++-------------
 3 files changed, 17 insertions(+), 77 deletions(-)

diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
index 68ab452fc564..4a6c26f7cf1e 100644
--- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
+++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
@@ -518,10 +518,7 @@ def _on_response(self, response):
 
         for received_message in response.received_messages:
             message = google.cloud.pubsub_v1.subscriber.message.Message(
-                received_message.message,
-                received_message.ack_id,
-                self._scheduler.queue,
-                autolease=False,
+                received_message.message, received_message.ack_id, self._scheduler.queue
             )
             if self.load < 1.0:
                 req = requests.LeaseRequest(
diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/message.py b/pubsub/google/cloud/pubsub_v1/subscriber/message.py
index db8e650db06c..41bc42755ad7 100644
--- a/pubsub/google/cloud/pubsub_v1/subscriber/message.py
+++ b/pubsub/google/cloud/pubsub_v1/subscriber/message.py
@@ -18,7 +18,6 @@
 import json
 import math
 import time
-import warnings
 
 from google.api_core import datetime_helpers
 from google.cloud.pubsub_v1.subscriber._protocol import requests
@@ -71,7 +70,7 @@ class Message(object):
             published.
     """
 
-    def __init__(self, message, ack_id, request_queue, autolease=True):
+    def __init__(self, message, ack_id, request_queue):
         """Construct the Message.
 
         .. note::
@@ -86,13 +85,6 @@ def __init__(self, message, ack_id, request_queue, autolease=True):
             request_queue (queue.Queue): A queue provided by the policy that
                 can accept requests; the policy is responsible for handling
                 those requests.
-            autolease (bool): An optional flag determining whether a new Message
-                instance should automatically lease itself upon creation.
-                Defaults to :data:`True`.
-
-                .. note::
-                    .. deprecated:: 0.44.0
-                        Parameter will be removed in future versions.
         """
         self._message = message
         self._ack_id = ack_id
@@ -104,11 +96,6 @@ def __init__(self, message, ack_id, request_queue, autolease=True):
         # the default lease deadline.
         self._received_timestamp = time.time()
 
-        # The policy should lease this message, telling PubSub that it has
-        # it until it is acked or otherwise dropped.
-        if autolease:
-            self.lease()
-
     def __repr__(self):
         # Get an abbreviated version of the data.
         abbv_data = self._message.data
@@ -213,26 +200,6 @@ def drop(self):
             requests.DropRequest(ack_id=self._ack_id, byte_size=self.size)
         )
 
-    def lease(self):
-        """Inform the policy to lease this message continually.
-
-        .. note::
-            By default this method is called by the constructor, and you should
-            never need to call it manually, unless the
-            :class:`~.pubsub_v1.subscriber.message.Message` instance was
-            created with ``autolease=False``.
-
-            .. deprecated:: 0.44.0
-                Will be removed in future versions.
-        """
-        warnings.warn(
-            "lease() is deprecated since 0.44.0, and will be removed in future versions.",
-            category=DeprecationWarning,
-        )
-        self._request_queue.put(
-            requests.LeaseRequest(ack_id=self._ack_id, byte_size=self.size)
-        )
-
     def modify_ack_deadline(self, seconds):
         """Resets the deadline for acknowledgement.
 
diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py
index 0a7d7fb8c391..4bb3329a29f0 100644
--- a/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py
+++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py
@@ -16,7 +16,6 @@
 import time
 
 import mock
-import pytest
 import pytz
 from six.moves import queue
 from google.protobuf import timestamp_pb2
@@ -34,28 +33,22 @@
 PUBLISHED_SECONDS = datetime_helpers.to_milliseconds(PUBLISHED) // 1000
 
 
-def create_message(data, ack_id="ACKID", autolease=True, **attrs):
-    with mock.patch.object(message.Message, "lease") as lease:
-        with mock.patch.object(time, "time") as time_:
-            time_.return_value = RECEIVED_SECONDS
-            msg = message.Message(
-                types.PubsubMessage(
-                    attributes=attrs,
-                    data=data,
-                    message_id="message_id",
-                    publish_time=timestamp_pb2.Timestamp(
-                        seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
-                    ),
+def create_message(data, ack_id="ACKID", **attrs):
+    with mock.patch.object(time, "time") as time_:
+        time_.return_value = RECEIVED_SECONDS
+        msg = message.Message(
+            types.PubsubMessage(
+                attributes=attrs,
+                data=data,
+                message_id="message_id",
+                publish_time=timestamp_pb2.Timestamp(
+                    seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
                 ),
-                ack_id,
-                queue.Queue(),
-                autolease=autolease,
-            )
-            if autolease:
-                lease.assert_called_once_with()
-            else:
-                lease.assert_not_called()
-            return msg
+            ),
+            ack_id,
+            queue.Queue(),
+        )
+        return msg
 
 
 def test_attributes():
@@ -84,11 +77,6 @@ def test_publish_time():
     assert msg.publish_time == PUBLISHED
 
 
-def test_disable_autolease_on_creation():
-    # the create_message() helper does the actual assertion
-    create_message(b"foo", autolease=False)
-
-
 def check_call_types(mock, *args, **kwargs):
     """Checks a mock's call types.
 
@@ -134,18 +122,6 @@ def test_drop():
         check_call_types(put, requests.DropRequest)
 
 
-def test_lease():
-    msg = create_message(b"foo", ack_id="bogus_ack_id")
-
-    pytest_warns = pytest.warns(DeprecationWarning)
-    with pytest_warns, mock.patch.object(msg._request_queue, "put") as put:
-        msg.lease()
-        put.assert_called_once_with(
-            requests.LeaseRequest(ack_id="bogus_ack_id", byte_size=30)
-        )
-        check_call_types(put, requests.LeaseRequest)
-
-
 def test_modify_ack_deadline():
     msg = create_message(b"foo", ack_id="bogus_ack_id")
     with mock.patch.object(msg._request_queue, "put") as put:

From 162810ce59755b843e241096ef706a997b28f73f Mon Sep 17 00:00:00 2001
From: Peter Lamut <inbox@peterlamut.com>
Date: Wed, 31 Jul 2019 00:07:28 +0200
Subject: [PATCH 2/6] Remove FlowControl.resume_threshold setting

---
 .../subscriber/_protocol/streaming_pull_manager.py   |  2 +-
 pubsub/google/cloud/pubsub_v1/types.py               | 12 ------------
 2 files changed, 1 insertion(+), 13 deletions(-)

diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
index 4a6c26f7cf1e..3958a4a43db6 100644
--- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
+++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
@@ -252,7 +252,7 @@ def maybe_resume_consumer(self):
             # currently on hold, if the current load allows for it.
             self._maybe_release_messages()
 
-            if self.load < self.flow_control.resume_threshold:
+            if self.load < 0.8:
                 _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
                 self._consumer.resume()
             else:
diff --git a/pubsub/google/cloud/pubsub_v1/types.py b/pubsub/google/cloud/pubsub_v1/types.py
index 9b0d3fef3f64..f578ddcaf04f 100644
--- a/pubsub/google/cloud/pubsub_v1/types.py
+++ b/pubsub/google/cloud/pubsub_v1/types.py
@@ -71,7 +71,6 @@
     [
         "max_bytes",
         "max_messages",
-        "resume_threshold",
         "max_requests",
         "max_request_batch_size",
         "max_request_batch_latency",
@@ -81,7 +80,6 @@
 FlowControl.__new__.__defaults__ = (
     100 * 1024 * 1024,  # max_bytes: 100mb
     100,  # max_messages: 100
-    0.8,  # resume_threshold: 80%
     100,  # max_requests: 100
     100,  # max_request_batch_size: 100
     0.01,  # max_request_batch_latency: 0.01s
@@ -101,16 +99,6 @@
         "The maximum number of received - but not yet processed - messages before "
         "pausing the message stream."
     )
-    FlowControl.resume_threshold.__doc__ = textwrap.dedent(
-        """
-        The relative threshold of the ``max_bytes`` and ``max_messages`` limits
-        below which to resume the message stream. Must be a positive number not
-        greater than ``1.0``.
-
-        .. note::
-            .. deprecated:: 0.44.0
-                Will be removed in future versions."""
-    )
     FlowControl.max_requests.__doc__ = textwrap.dedent(
         """
         Currently not in use.

From b9e9a4858836c0c9c61b67cf3d87922d4fdfb4c8 Mon Sep 17 00:00:00 2001
From: Peter Lamut <inbox@peterlamut.com>
Date: Wed, 31 Jul 2019 00:09:03 +0200
Subject: [PATCH 3/6] Remove FlowControl.max_requests setting

---
 pubsub/google/cloud/pubsub_v1/types.py | 10 ----------
 1 file changed, 10 deletions(-)

diff --git a/pubsub/google/cloud/pubsub_v1/types.py b/pubsub/google/cloud/pubsub_v1/types.py
index f578ddcaf04f..cabd87370f30 100644
--- a/pubsub/google/cloud/pubsub_v1/types.py
+++ b/pubsub/google/cloud/pubsub_v1/types.py
@@ -71,7 +71,6 @@
     [
         "max_bytes",
         "max_messages",
-        "max_requests",
         "max_request_batch_size",
         "max_request_batch_latency",
         "max_lease_duration",
@@ -80,7 +79,6 @@
 FlowControl.__new__.__defaults__ = (
     100 * 1024 * 1024,  # max_bytes: 100mb
     100,  # max_messages: 100
-    100,  # max_requests: 100
     100,  # max_request_batch_size: 100
     0.01,  # max_request_batch_latency: 0.01s
     2 * 60 * 60,  # max_lease_duration: 2 hours.
@@ -99,14 +97,6 @@
         "The maximum number of received - but not yet processed - messages before "
         "pausing the message stream."
     )
-    FlowControl.max_requests.__doc__ = textwrap.dedent(
-        """
-        Currently not in use.
-
-        .. note::
-            .. deprecated:: 0.44.0
-                Will be removed in future versions."""
-    )
     FlowControl.max_request_batch_size.__doc__ = textwrap.dedent(
         """
         The maximum number of requests scheduled by callbacks to process and

From ce5d2ecc9686d36da3353313e745c3a56ead810e Mon Sep 17 00:00:00 2001
From: Peter Lamut <inbox@peterlamut.com>
Date: Wed, 31 Jul 2019 00:12:43 +0200
Subject: [PATCH 4/6] Remove FlowControl.max_request_batch_size setting

---
 .../subscriber/_protocol/dispatcher.py         |  2 +-
 pubsub/google/cloud/pubsub_v1/types.py         | 18 +-----------------
 2 files changed, 2 insertions(+), 18 deletions(-)

diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
index e41341afab3d..b1b02f5867f8 100644
--- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
+++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
@@ -46,7 +46,7 @@ def start(self):
             worker = helper_threads.QueueCallbackWorker(
                 self._queue,
                 self.dispatch_callback,
-                max_items=flow_control.max_request_batch_size,
+                max_items=100,
                 max_latency=flow_control.max_request_batch_latency,
             )
             # Create and start the helper thread.
diff --git a/pubsub/google/cloud/pubsub_v1/types.py b/pubsub/google/cloud/pubsub_v1/types.py
index cabd87370f30..68bac512ee2c 100644
--- a/pubsub/google/cloud/pubsub_v1/types.py
+++ b/pubsub/google/cloud/pubsub_v1/types.py
@@ -68,18 +68,11 @@
 # The defaults should be fine for most use cases.
 FlowControl = collections.namedtuple(
     "FlowControl",
-    [
-        "max_bytes",
-        "max_messages",
-        "max_request_batch_size",
-        "max_request_batch_latency",
-        "max_lease_duration",
-    ],
+    ["max_bytes", "max_messages", "max_request_batch_latency", "max_lease_duration"],
 )
 FlowControl.__new__.__defaults__ = (
     100 * 1024 * 1024,  # max_bytes: 100mb
     100,  # max_messages: 100
-    100,  # max_request_batch_size: 100
     0.01,  # max_request_batch_latency: 0.01s
     2 * 60 * 60,  # max_lease_duration: 2 hours.
 )
@@ -97,15 +90,6 @@
         "The maximum number of received - but not yet processed - messages before "
         "pausing the message stream."
     )
-    FlowControl.max_request_batch_size.__doc__ = textwrap.dedent(
-        """
-        The maximum number of requests scheduled by callbacks to process and
-        dispatch at a time.
-
-        .. note::
-            .. deprecated:: 0.44.0
-                Will be removed in future versions."""
-    )
     FlowControl.max_request_batch_latency.__doc__ = textwrap.dedent(
         """
         The maximum amount of time in seconds to wait for additional request

From ecf5eb31045edff6f11a11fb81c57e18b4e92fb9 Mon Sep 17 00:00:00 2001
From: Peter Lamut <inbox@peterlamut.com>
Date: Wed, 31 Jul 2019 00:18:45 +0200
Subject: [PATCH 5/6] Remove FlowControl.max_request_batch_latency

---
 .../pubsub_v1/subscriber/_protocol/dispatcher.py   |  6 +-----
 pubsub/google/cloud/pubsub_v1/types.py             | 14 +-------------
 2 files changed, 2 insertions(+), 18 deletions(-)

diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
index b1b02f5867f8..d4194a56897f 100644
--- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
+++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
@@ -42,12 +42,8 @@ def start(self):
             if self._thread is not None:
                 raise ValueError("Dispatcher is already running.")
 
-            flow_control = self._manager.flow_control
             worker = helper_threads.QueueCallbackWorker(
-                self._queue,
-                self.dispatch_callback,
-                max_items=100,
-                max_latency=flow_control.max_request_batch_latency,
+                self._queue, self.dispatch_callback, max_items=100, max_latency=0.01
             )
             # Create and start the helper thread.
             thread = threading.Thread(name=_CALLBACK_WORKER_NAME, target=worker)
diff --git a/pubsub/google/cloud/pubsub_v1/types.py b/pubsub/google/cloud/pubsub_v1/types.py
index 68bac512ee2c..733d3bf97ac0 100644
--- a/pubsub/google/cloud/pubsub_v1/types.py
+++ b/pubsub/google/cloud/pubsub_v1/types.py
@@ -15,7 +15,6 @@
 from __future__ import absolute_import
 import collections
 import sys
-import textwrap
 
 from google.api import http_pb2
 from google.iam.v1 import iam_policy_pb2
@@ -67,13 +66,11 @@
 # these settings can be altered to tweak Pub/Sub behavior.
 # The defaults should be fine for most use cases.
 FlowControl = collections.namedtuple(
-    "FlowControl",
-    ["max_bytes", "max_messages", "max_request_batch_latency", "max_lease_duration"],
+    "FlowControl", ["max_bytes", "max_messages", "max_lease_duration"]
 )
 FlowControl.__new__.__defaults__ = (
     100 * 1024 * 1024,  # max_bytes: 100mb
     100,  # max_messages: 100
-    0.01,  # max_request_batch_latency: 0.01s
     2 * 60 * 60,  # max_lease_duration: 2 hours.
 )
 
@@ -90,15 +87,6 @@
         "The maximum number of received - but not yet processed - messages before "
         "pausing the message stream."
     )
-    FlowControl.max_request_batch_latency.__doc__ = textwrap.dedent(
-        """
-        The maximum amount of time in seconds to wait for additional request
-        items before processing the next batch of requests.
-
-        .. note::
-            .. deprecated:: 0.44.0
-                Will be removed in future versions."""
-    )
     FlowControl.max_lease_duration.__doc__ = (
         "The maximum amount of time in seconds to hold a lease on a message "
         "before dropping it from the lease management."

From bb9c7b710c74407fba3f14f75d41843ccbadcdc8 Mon Sep 17 00:00:00 2001
From: Peter Lamut <inbox@peterlamut.com>
Date: Wed, 31 Jul 2019 00:52:59 +0200
Subject: [PATCH 6/6] Promote hardcoded values to module constants

---
 .../pubsub_v1/subscriber/_protocol/dispatcher.py    | 13 ++++++++++++-
 .../subscriber/_protocol/streaming_pull_manager.py  | 13 +++++++++----
 2 files changed, 21 insertions(+), 5 deletions(-)

diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
index d4194a56897f..2b2574829306 100644
--- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
+++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
@@ -27,6 +27,14 @@
 _CALLBACK_WORKER_NAME = "Thread-CallbackRequestDispatcher"
 
 
+_MAX_BATCH_SIZE = 100
+"""The maximum number of requests to process and dispatch at a time."""
+
+_MAX_BATCH_LATENCY = 0.01
+"""The maximum amount of time in seconds to wait for additional request items
+before processing the next batch of requests."""
+
+
 class Dispatcher(object):
     def __init__(self, manager, queue):
         self._manager = manager
@@ -43,7 +51,10 @@ def start(self):
                 raise ValueError("Dispatcher is already running.")
 
             worker = helper_threads.QueueCallbackWorker(
-                self._queue, self.dispatch_callback, max_items=100, max_latency=0.01
+                self._queue,
+                self.dispatch_callback,
+                max_items=_MAX_BATCH_SIZE,
+                max_latency=_MAX_BATCH_LATENCY,
             )
             # Create and start the helper thread.
             thread = threading.Thread(name=_CALLBACK_WORKER_NAME, target=worker)
diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
index 3958a4a43db6..af6883fd067e 100644
--- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
+++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py
@@ -44,6 +44,11 @@
     exceptions.GatewayTimeout,
     exceptions.Aborted,
 )
+_MAX_LOAD = 1.0
+"""The load threshold above which to pause the incoming message stream."""
+
+_RESUME_THRESHOLD = 0.8
+"""The load threshold below which to resume the incoming message stream."""
 
 
 def _maybe_wrap_exception(exception):
@@ -223,7 +228,7 @@ def add_close_callback(self, callback):
     def maybe_pause_consumer(self):
         """Check the current load and pause the consumer if needed."""
         with self._pause_resume_lock:
-            if self.load >= 1.0:
+            if self.load >= _MAX_LOAD:
                 if self._consumer is not None and not self._consumer.is_paused:
                     _LOGGER.debug(
                         "Message backlog over load at %.2f, pausing.", self.load
@@ -252,7 +257,7 @@ def maybe_resume_consumer(self):
             # currently on hold, if the current load allows for it.
             self._maybe_release_messages()
 
-            if self.load < 0.8:
+            if self.load < _RESUME_THRESHOLD:
                 _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
                 self._consumer.resume()
             else:
@@ -271,7 +276,7 @@ def _maybe_release_messages(self):
         The method assumes the caller has acquired the ``_pause_resume_lock``.
         """
         while True:
-            if self.load >= 1.0:
+            if self.load >= _MAX_LOAD:
                 break  # already overloaded
 
             try:
@@ -520,7 +525,7 @@ def _on_response(self, response):
             message = google.cloud.pubsub_v1.subscriber.message.Message(
                 received_message.message, received_message.ack_id, self._scheduler.queue
             )
-            if self.load < 1.0:
+            if self.load < _MAX_LOAD:
                 req = requests.LeaseRequest(
                     ack_id=message.ack_id, byte_size=message.size
                 )