Skip to content

Commit

Permalink
PubSub: Remove deprecated methods and settings (#8836)
Browse files Browse the repository at this point in the history
* Remove Message lease() method and autolease param

These two have been deprecated in 0.44.0 and it's time to remove them.

* Remove FlowControl.resume_threshold setting

* Remove FlowControl.max_requests setting

* Remove FlowControl.max_request_batch_size setting

* Remove FlowControl.max_request_batch_latency

* Promote hardcoded values to module constants
  • Loading branch information
plamut authored Jul 31, 2019
1 parent 7652ee0 commit 7e16c08
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 135 deletions.
13 changes: 10 additions & 3 deletions pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@
_CALLBACK_WORKER_NAME = "Thread-CallbackRequestDispatcher"


_MAX_BATCH_SIZE = 100
"""The maximum number of requests to process and dispatch at a time."""

_MAX_BATCH_LATENCY = 0.01
"""The maximum amount of time in seconds to wait for additional request items
before processing the next batch of requests."""


class Dispatcher(object):
def __init__(self, manager, queue):
self._manager = manager
Expand All @@ -42,12 +50,11 @@ def start(self):
if self._thread is not None:
raise ValueError("Dispatcher is already running.")

flow_control = self._manager.flow_control
worker = helper_threads.QueueCallbackWorker(
self._queue,
self.dispatch_callback,
max_items=flow_control.max_request_batch_size,
max_latency=flow_control.max_request_batch_latency,
max_items=_MAX_BATCH_SIZE,
max_latency=_MAX_BATCH_LATENCY,
)
# Create and start the helper thread.
thread = threading.Thread(name=_CALLBACK_WORKER_NAME, target=worker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
exceptions.GatewayTimeout,
exceptions.Aborted,
)
_MAX_LOAD = 1.0
"""The load threshold above which to pause the incoming message stream."""

_RESUME_THRESHOLD = 0.8
"""The load threshold below which to resume the incoming message stream."""


def _maybe_wrap_exception(exception):
Expand Down Expand Up @@ -223,7 +228,7 @@ def add_close_callback(self, callback):
def maybe_pause_consumer(self):
"""Check the current load and pause the consumer if needed."""
with self._pause_resume_lock:
if self.load >= 1.0:
if self.load >= _MAX_LOAD:
if self._consumer is not None and not self._consumer.is_paused:
_LOGGER.debug(
"Message backlog over load at %.2f, pausing.", self.load
Expand Down Expand Up @@ -252,7 +257,7 @@ def maybe_resume_consumer(self):
# currently on hold, if the current load allows for it.
self._maybe_release_messages()

if self.load < self.flow_control.resume_threshold:
if self.load < _RESUME_THRESHOLD:
_LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
self._consumer.resume()
else:
Expand All @@ -271,7 +276,7 @@ def _maybe_release_messages(self):
The method assumes the caller has acquired the ``_pause_resume_lock``.
"""
while True:
if self.load >= 1.0:
if self.load >= _MAX_LOAD:
break # already overloaded

try:
Expand Down Expand Up @@ -518,12 +523,9 @@ def _on_response(self, response):

for received_message in response.received_messages:
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message,
received_message.ack_id,
self._scheduler.queue,
autolease=False,
received_message.message, received_message.ack_id, self._scheduler.queue
)
if self.load < 1.0:
if self.load < _MAX_LOAD:
req = requests.LeaseRequest(
ack_id=message.ack_id, byte_size=message.size
)
Expand Down
35 changes: 1 addition & 34 deletions pubsub/google/cloud/pubsub_v1/subscriber/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import json
import math
import time
import warnings

from google.api_core import datetime_helpers
from google.cloud.pubsub_v1.subscriber._protocol import requests
Expand Down Expand Up @@ -71,7 +70,7 @@ class Message(object):
published.
"""

def __init__(self, message, ack_id, request_queue, autolease=True):
def __init__(self, message, ack_id, request_queue):
"""Construct the Message.
.. note::
Expand All @@ -86,13 +85,6 @@ def __init__(self, message, ack_id, request_queue, autolease=True):
request_queue (queue.Queue): A queue provided by the policy that
can accept requests; the policy is responsible for handling
those requests.
autolease (bool): An optional flag determining whether a new Message
instance should automatically lease itself upon creation.
Defaults to :data:`True`.
.. note::
.. deprecated:: 0.44.0
Parameter will be removed in future versions.
"""
self._message = message
self._ack_id = ack_id
Expand All @@ -104,11 +96,6 @@ def __init__(self, message, ack_id, request_queue, autolease=True):
# the default lease deadline.
self._received_timestamp = time.time()

# The policy should lease this message, telling PubSub that it has
# it until it is acked or otherwise dropped.
if autolease:
self.lease()

def __repr__(self):
# Get an abbreviated version of the data.
abbv_data = self._message.data
Expand Down Expand Up @@ -213,26 +200,6 @@ def drop(self):
requests.DropRequest(ack_id=self._ack_id, byte_size=self.size)
)

def lease(self):
"""Inform the policy to lease this message continually.
.. note::
By default this method is called by the constructor, and you should
never need to call it manually, unless the
:class:`~.pubsub_v1.subscriber.message.Message` instance was
created with ``autolease=False``.
.. deprecated:: 0.44.0
Will be removed in future versions.
"""
warnings.warn(
"lease() is deprecated since 0.44.0, and will be removed in future versions.",
category=DeprecationWarning,
)
self._request_queue.put(
requests.LeaseRequest(ack_id=self._ack_id, byte_size=self.size)
)

def modify_ack_deadline(self, seconds):
"""Resets the deadline for acknowledgement.
Expand Down
52 changes: 1 addition & 51 deletions pubsub/google/cloud/pubsub_v1/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from __future__ import absolute_import
import collections
import sys
import textwrap

from google.api import http_pb2
from google.iam.v1 import iam_policy_pb2
Expand Down Expand Up @@ -67,24 +66,11 @@
# these settings can be altered to tweak Pub/Sub behavior.
# The defaults should be fine for most use cases.
FlowControl = collections.namedtuple(
"FlowControl",
[
"max_bytes",
"max_messages",
"resume_threshold",
"max_requests",
"max_request_batch_size",
"max_request_batch_latency",
"max_lease_duration",
],
"FlowControl", ["max_bytes", "max_messages", "max_lease_duration"]
)
FlowControl.__new__.__defaults__ = (
100 * 1024 * 1024, # max_bytes: 100mb
100, # max_messages: 100
0.8, # resume_threshold: 80%
100, # max_requests: 100
100, # max_request_batch_size: 100
0.01, # max_request_batch_latency: 0.01s
2 * 60 * 60, # max_lease_duration: 2 hours.
)

Expand All @@ -101,42 +87,6 @@
"The maximum number of received - but not yet processed - messages before "
"pausing the message stream."
)
FlowControl.resume_threshold.__doc__ = textwrap.dedent(
"""
The relative threshold of the ``max_bytes`` and ``max_messages`` limits
below which to resume the message stream. Must be a positive number not
greater than ``1.0``.
.. note::
.. deprecated:: 0.44.0
Will be removed in future versions."""
)
FlowControl.max_requests.__doc__ = textwrap.dedent(
"""
Currently not in use.
.. note::
.. deprecated:: 0.44.0
Will be removed in future versions."""
)
FlowControl.max_request_batch_size.__doc__ = textwrap.dedent(
"""
The maximum number of requests scheduled by callbacks to process and
dispatch at a time.
.. note::
.. deprecated:: 0.44.0
Will be removed in future versions."""
)
FlowControl.max_request_batch_latency.__doc__ = textwrap.dedent(
"""
The maximum amount of time in seconds to wait for additional request
items before processing the next batch of requests.
.. note::
.. deprecated:: 0.44.0
Will be removed in future versions."""
)
FlowControl.max_lease_duration.__doc__ = (
"The maximum amount of time in seconds to hold a lease on a message "
"before dropping it from the lease management."
Expand Down
54 changes: 15 additions & 39 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import time

import mock
import pytest
import pytz
from six.moves import queue
from google.protobuf import timestamp_pb2
Expand All @@ -34,28 +33,22 @@
PUBLISHED_SECONDS = datetime_helpers.to_milliseconds(PUBLISHED) // 1000


def create_message(data, ack_id="ACKID", autolease=True, **attrs):
with mock.patch.object(message.Message, "lease") as lease:
with mock.patch.object(time, "time") as time_:
time_.return_value = RECEIVED_SECONDS
msg = message.Message(
types.PubsubMessage(
attributes=attrs,
data=data,
message_id="message_id",
publish_time=timestamp_pb2.Timestamp(
seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
),
def create_message(data, ack_id="ACKID", **attrs):
with mock.patch.object(time, "time") as time_:
time_.return_value = RECEIVED_SECONDS
msg = message.Message(
types.PubsubMessage(
attributes=attrs,
data=data,
message_id="message_id",
publish_time=timestamp_pb2.Timestamp(
seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
),
ack_id,
queue.Queue(),
autolease=autolease,
)
if autolease:
lease.assert_called_once_with()
else:
lease.assert_not_called()
return msg
),
ack_id,
queue.Queue(),
)
return msg


def test_attributes():
Expand Down Expand Up @@ -84,11 +77,6 @@ def test_publish_time():
assert msg.publish_time == PUBLISHED


def test_disable_autolease_on_creation():
# the create_message() helper does the actual assertion
create_message(b"foo", autolease=False)


def check_call_types(mock, *args, **kwargs):
"""Checks a mock's call types.
Expand Down Expand Up @@ -134,18 +122,6 @@ def test_drop():
check_call_types(put, requests.DropRequest)


def test_lease():
msg = create_message(b"foo", ack_id="bogus_ack_id")

pytest_warns = pytest.warns(DeprecationWarning)
with pytest_warns, mock.patch.object(msg._request_queue, "put") as put:
msg.lease()
put.assert_called_once_with(
requests.LeaseRequest(ack_id="bogus_ack_id", byte_size=30)
)
check_call_types(put, requests.LeaseRequest)


def test_modify_ack_deadline():
msg = create_message(b"foo", ack_id="bogus_ack_id")
with mock.patch.object(msg._request_queue, "put") as put:
Expand Down

0 comments on commit 7e16c08

Please sign in to comment.