Skip to content

Commit

Permalink
process: make sure type annoations pass with mypy (#542)
Browse files Browse the repository at this point in the history
* Add mypy to nox sessions

* Mark google/cloud package as type-checked

* Ignore types for dependencies lacking type info

* Fix type annotations in publish flow controller

* Fix type hints in thread-based Batch

* Fix type annotations in sequencers

* Fix type hints in dispatcher

* Fix type hints in publisher client

* Fix misc type errors in various modules

* Fix type hints in leaser

* Fix type annotations in streaming pull manager

* Fix gapic timeout hint in older api-core versions

google-api-core versions prior to v2.2.2 lack the definition of
_MethodDefault, thus a workaround is needed for that.

* Remove py.typed marker file

The autogenerated code does not pass mypy type checks yet, thus
we should not advertise the package as type-checked.

* Replace typing.cast with is not None assertions

* Replace batched_commands dict with separate lists

* Rename variable to avoid false type warnings

* Get rid of type cast by using a new variable

* Just ignore the line where type checkers disagree

* Remove unused imports

* Replace type casts with is not None assertions

* Cover missing dispatcher case after refactoring
  • Loading branch information
plamut authored Nov 30, 2021
1 parent c1b424e commit 8f32dd4
Show file tree
Hide file tree
Showing 20 changed files with 240 additions and 130 deletions.
4 changes: 3 additions & 1 deletion google/cloud/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List

try:
import pkg_resources

pkg_resources.declare_namespace(__name__)
except ImportError:
import pkgutil

__path__ = pkgutil.extend_path(__path__, __name__)
__path__: List[str] = pkgutil.extend_path(__path__, __name__) # type: ignore
4 changes: 2 additions & 2 deletions google/cloud/pubsub_v1/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from __future__ import absolute_import

import concurrent.futures
from typing import Any, NoReturn
from typing import Any, NoReturn, Optional

import google.api_core.future

Expand Down Expand Up @@ -47,7 +47,7 @@ def set_result(self, result: Any):
"""
return super().set_result(result=result)

def set_exception(self, exception: Exception):
def set_exception(self, exception: Optional[BaseException]):
"""Set the result of the future as being the given exception.
Do not use this method, it should only be used internally by the library and its
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsub_v1/publisher/_batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __len__(self):

@staticmethod
@abc.abstractmethod
def make_lock() -> None: # pragma: NO COVER
def make_lock(): # pragma: NO COVER
"""Return a lock in the chosen concurrency model.
Returns:
Expand Down
17 changes: 8 additions & 9 deletions google/cloud/pubsub_v1/publisher/_batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import threading
import time
import typing
from typing import Any, Callable, Optional, Sequence
from typing import Any, Callable, List, Optional, Sequence

import google.api_core.exceptions
from google.api_core import gapic_v1
Expand All @@ -28,11 +28,10 @@
from google.pubsub_v1 import types as gapic_types

if typing.TYPE_CHECKING: # pragma: NO COVER
from google import api_core
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1 import PublisherClient

from google.cloud.pubsub_v1.publisher import Client as PublisherClient
from google.pubsub_v1.services.publisher.client import OptionalRetry

_LOGGER = logging.getLogger(__name__)
_CAN_COMMIT = (base.BatchStatus.ACCEPTING_MESSAGES, base.BatchStatus.STARTING)
Expand Down Expand Up @@ -93,8 +92,8 @@ def __init__(
settings: "types.BatchSettings",
batch_done_callback: Callable[[bool], Any] = None,
commit_when_full: bool = True,
commit_retry: "api_core.retry.Retry" = gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
):
self._client = client
self._topic = topic
Expand All @@ -108,8 +107,8 @@ def __init__(
# _futures list should remain unchanged after batch
# status changed from ACCEPTING_MESSAGES to any other
# in order to avoid race conditions
self._futures = []
self._messages = []
self._futures: List[futures.Future] = []
self._messages: List[gapic_types.PubsubMessage] = []
self._status = base.BatchStatus.ACCEPTING_MESSAGES

# The initial size is not zero, we need to account for the size overhead
Expand Down Expand Up @@ -368,7 +367,7 @@ def publish(
), "Publish after stop() or publish error."

if self.status != base.BatchStatus.ACCEPTING_MESSAGES:
return
return None

size_increase = gapic_types.PublishRequest(
messages=[message]
Expand Down
7 changes: 2 additions & 5 deletions google/cloud/pubsub_v1/publisher/_sequencer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@

if typing.TYPE_CHECKING: # pragma: NO COVER
from concurrent import futures
from google.api_core import retry
from google.pubsub_v1.services.publisher.client import OptionalRetry


class Sequencer(metaclass=abc.ABCMeta):
"""The base class for sequencers for Pub/Sub publishing. A sequencer
sequences messages to be published.
"""

@staticmethod
@abc.abstractmethod
def is_finished(self) -> bool: # pragma: NO COVER
""" Whether the sequencer is finished and should be cleaned up.
Expand All @@ -40,7 +39,6 @@ def is_finished(self) -> bool: # pragma: NO COVER
"""
raise NotImplementedError

@staticmethod
@abc.abstractmethod
def unpause(self) -> None: # pragma: NO COVER
""" Unpauses this sequencer.
Expand All @@ -51,12 +49,11 @@ def unpause(self) -> None: # pragma: NO COVER
"""
raise NotImplementedError

@staticmethod
@abc.abstractmethod
def publish(
self,
message: gapic_types.PubsubMessage,
retry: "retry.Retry" = None,
retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
) -> "futures.Future": # pragma: NO COVER
""" Publish message for this ordering key.
Expand Down
25 changes: 13 additions & 12 deletions google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,22 @@

import enum
import collections
import concurrent.futures as futures
import threading
import typing
from typing import Iterable, Sequence
from typing import Deque, Iterable, Sequence

from google.api_core import gapic_v1
from google.cloud.pubsub_v1.publisher import futures
from google.cloud.pubsub_v1.publisher import exceptions
from google.cloud.pubsub_v1.publisher._sequencer import base as sequencer_base
from google.cloud.pubsub_v1.publisher._batch import base as batch_base
from google.pubsub_v1 import types as gapic_types

if typing.TYPE_CHECKING: # pragma: NO COVER
from google.api_core import retry
from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher import _batch
from google.cloud.pubsub_v1.publisher.client import Client as PublisherClient
from google.pubsub_v1.services.publisher.client import OptionalRetry


class _OrderedSequencerStatus(str, enum.Enum):
Expand Down Expand Up @@ -101,7 +102,7 @@ def __init__(self, client: "PublisherClient", topic: str, ordering_key: str):
# Batches ordered from first (head/left) to last (right/tail).
# Invariant: always has at least one batch after the first publish,
# unless paused or stopped.
self._ordered_batches = collections.deque()
self._ordered_batches: Deque["_batch.thread.Batch"] = collections.deque()
# See _OrderedSequencerStatus for valid state transitions.
self._state = _OrderedSequencerStatus.ACCEPTING_MESSAGES

Expand Down Expand Up @@ -237,8 +238,8 @@ def unpause(self) -> None:

def _create_batch(
self,
commit_retry: "retry.Retry" = gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
) -> "_batch.thread.Batch":
""" Create a new batch using the client's batch class and other stored
settings.
Expand All @@ -262,8 +263,8 @@ def _create_batch(
def publish(
self,
message: gapic_types.PubsubMessage,
retry: "retry.Retry" = gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
) -> futures.Future:
""" Publish message for this ordering key.
Expand All @@ -289,12 +290,12 @@ def publish(
"""
with self._state_lock:
if self._state == _OrderedSequencerStatus.PAUSED:
future = futures.Future()
errored_future = futures.Future()
exception = exceptions.PublishToPausedOrderingKeyException(
self._ordering_key
)
future.set_exception(exception)
return future
errored_future.set_exception(exception)
return errored_future

# If waiting to be cleaned-up, convert to accepting messages to
# prevent this sequencer from being cleaned-up only to have another
Expand Down
19 changes: 11 additions & 8 deletions google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,20 @@
# limitations under the License.

import typing
from typing import Optional

from google.api_core import gapic_v1

from google.cloud.pubsub_v1.publisher._sequencer import base
from google.pubsub_v1 import types as gapic_types

if typing.TYPE_CHECKING: # pragma: NO COVER
from concurrent import futures
from google.api_core import retry
from google.cloud.pubsub_v1 import PublisherClient
from google.cloud.pubsub_v1.publisher import _batch
from google.cloud.pubsub_v1.publisher import futures
from google.cloud.pubsub_v1.publisher.client import Client as PublisherClient
from google.pubsub_v1.services.publisher.client import OptionalRetry

from google.cloud.pubsub_v1 import types


class UnorderedSequencer(base.Sequencer):
Expand All @@ -35,7 +38,7 @@ class UnorderedSequencer(base.Sequencer):
def __init__(self, client: "PublisherClient", topic: str):
self._client = client
self._topic = topic
self._current_batch = None
self._current_batch: Optional["_batch.thread.Batch"] = None
self._stopped = False

def is_finished(self) -> bool:
Expand Down Expand Up @@ -88,8 +91,8 @@ def unpause(self) -> typing.NoReturn:

def _create_batch(
self,
commit_retry: "retry.Retry" = gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
) -> "_batch.thread.Batch":
""" Create a new batch using the client's batch class and other stored
settings.
Expand All @@ -113,8 +116,8 @@ def _create_batch(
def publish(
self,
message: gapic_types.PubsubMessage,
retry: "retry.Retry" = gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
) -> "futures.Future":
""" Batch message into existing or new batch.
Expand Down
31 changes: 18 additions & 13 deletions google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import threading
import time
import typing
from typing import Any, Sequence, Type, Union
from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union

from google.api_core import gapic_v1
from google.auth.credentials import AnonymousCredentials
from google.oauth2 import service_account
from google.auth.credentials import AnonymousCredentials # type: ignore
from google.oauth2 import service_account # type: ignore

from google.cloud.pubsub_v1 import _gapic
from google.cloud.pubsub_v1 import types
Expand All @@ -46,10 +46,9 @@
__version__ = "0.0"

if typing.TYPE_CHECKING: # pragma: NO COVER
from google import api_core
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.publisher._sequencer.base import Sequencer
from google.cloud.pubsub_v1.publisher import _batch
from google.pubsub_v1.services.publisher.client import OptionalRetry


_LOGGER = logging.getLogger(__name__)
Expand All @@ -62,6 +61,10 @@

_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb()

SequencerType = Union[
ordered_sequencer.OrderedSequencer, unordered_sequencer.UnorderedSequencer
]


@_gapic.add_methods(publisher_client.PublisherClient, denylist=_DENYLISTED_METHODS)
class Client(object):
Expand Down Expand Up @@ -152,10 +155,10 @@ def __init__(
# messages. One batch exists for each topic.
self._batch_lock = self._batch_class.make_lock()
# (topic, ordering_key) => sequencers object
self._sequencers = {}
self._sequencers: Dict[Tuple[str, str], SequencerType] = {}
self._is_stopped = False
# Thread created to commit all sequencers after a timeout.
self._commit_thread = None
self._commit_thread: Optional[threading.Thread] = None

# The object controlling the message publishing flow
self._flow_controller = FlowController(self.publisher_options.flow_control)
Expand Down Expand Up @@ -196,7 +199,7 @@ def target(self) -> str:
"""
return self._target

def _get_or_create_sequencer(self, topic: str, ordering_key: str) -> "Sequencer":
def _get_or_create_sequencer(self, topic: str, ordering_key: str) -> SequencerType:
""" Get an existing sequencer or create a new one given the (topic,
ordering_key) pair.
"""
Expand Down Expand Up @@ -254,8 +257,8 @@ def publish(
topic: str,
data: bytes,
ordering_key: str = "",
retry: "api_core.retry.Retry" = gapic_v1.method.DEFAULT,
timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
retry: "OptionalRetry" = gapic_v1.method.DEFAULT,
timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT,
**attrs: Union[bytes, str],
) -> "pubsub_v1.publisher.futures.Future":
"""Publish a single message.
Expand Down Expand Up @@ -380,8 +383,10 @@ def on_publish_done(future):
if retry is gapic_v1.method.DEFAULT:
# use the default retry for the publish GRPC method as a base
transport = self.api._transport
retry = transport._wrapped_methods[transport.publish]._retry
retry = retry.with_deadline(2.0 ** 32)
base_retry = transport._wrapped_methods[transport.publish]._retry
retry = base_retry.with_deadline(2.0 ** 32)
else:
retry = retry.with_deadline(2.0 ** 32)

# Delegate the publishing to the sequencer.
sequencer = self._get_or_create_sequencer(topic, ordering_key)
Expand Down Expand Up @@ -490,7 +495,7 @@ def _set_batch_class(self, batch_class: Type) -> None:

# Used only for testing.
def _set_sequencer(
self, topic: str, sequencer: "Sequencer", ordering_key: str = ""
self, topic: str, sequencer: SequencerType, ordering_key: str = ""
) -> None:
sequencer_key = (topic, ordering_key)
self._sequencers[sequencer_key] = sequencer
Loading

0 comments on commit 8f32dd4

Please sign in to comment.