Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor:eventhub/_eventprocessor:type hint added #26208

Merged
merged 4 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

from __future__ import annotations
from datetime import datetime
from contextlib import contextmanager
Expand Down Expand Up @@ -49,8 +48,11 @@ class EventProcessorMixin(object):
{}
) # type: Union[int, str, datetime, Dict[str, Union[int, str, datetime]]]

def get_init_event_position(self, partition_id, checkpoint):
# type: (str, Optional[Dict[str, Any]]) -> Tuple[Union[str, int, datetime], bool]
def get_init_event_position(
self,
partition_id: str,
checkpoint: Optional[Dict[str, Any]]
) -> Tuple[Union[str, int, datetime], bool]:
checkpoint_offset = checkpoint.get("offset") if checkpoint else None

event_position_inclusive = False
Expand All @@ -75,13 +77,12 @@ def get_init_event_position(self, partition_id, checkpoint):

def create_consumer(
self,
partition_id, # type: str
initial_event_position, # type: Union[str, int, datetime]
initial_event_position_inclusive, # type: bool
on_event_received, # type: Callable[[Union[Optional[EventData], List[EventData]]], None]
**kwargs, # type: Any
):
# type: (...) -> Union[EventHubConsumer, EventHubConsumerAsync]
partition_id: str,
initial_event_position: Union[str, int, datetime],
initial_event_position_inclusive: bool,
on_event_received: Callable[[Union[Optional[EventData], List[EventData]]], None],
**kwargs: Any
) -> Union[EventHubConsumer, EventHubConsumerAsync]:
consumer = self._eventhub_client._create_consumer( # type: ignore # pylint: disable=protected-access
self._consumer_group,
partition_id,
Expand All @@ -96,8 +97,7 @@ def create_consumer(
return consumer

@contextmanager
def _context(self, links=None):
# type: (List[Link]) -> Iterator[None]
def _context(self, links: List[Link]=None) -> Iterator[None]:
"""Tracing"""
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
if span_impl_type is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ class CheckpointStore(object):

@abstractmethod
def list_ownership(
self, fully_qualified_namespace, eventhub_name, consumer_group, **kwargs
):
# type: (str, str, str, Any) -> Iterable[Dict[str, Any]]
self,
fully_qualified_namespace: str,
eventhub_name: str,
consumer_group: str,
**kwargs: Any
) -> Iterable[Dict[str, Any]]:
"""Retrieves a complete ownership list from the chosen storage service.

:param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to.
Expand All @@ -40,8 +43,11 @@ def list_ownership(
"""

@abstractmethod
def claim_ownership(self, ownership_list, **kwargs):
# type: (Iterable[Dict[str, Any]], Any) -> Iterable[Dict[str, Any]]
def claim_ownership(
self,
ownership_list: Iterable[Dict[str, Any]],
**kwargs: Any
) -> Iterable[Dict[str, Any]]:
"""Tries to claim ownership for a list of specified partitions.

:param Iterable[Dict[str,Any]] ownership_list: Iterable of dictionaries containing all the ownerships to claim.
Expand All @@ -60,8 +66,7 @@ def claim_ownership(self, ownership_list, **kwargs):
"""

@abstractmethod
def update_checkpoint(self, checkpoint, **kwargs):
# type: (Dict[str, Optional[Union[str, int]]], Any) -> None
def update_checkpoint(self, checkpoint: Dict[str, Optional[Union[str, int]]], **kwargs: Any) -> None:
"""Updates the checkpoint using the given information for the offset, associated partition and
consumer group in the chosen storage service.

Expand All @@ -86,9 +91,12 @@ def update_checkpoint(self, checkpoint, **kwargs):

@abstractmethod
def list_checkpoints(
self, fully_qualified_namespace, eventhub_name, consumer_group, **kwargs
):
# type: (str, str, str, Any) -> Iterable[Dict[str, Any]]
self,
fully_qualified_namespace: str,
eventhub_name: str,
consumer_group: str,
**kwargs: Any
) -> Iterable[Dict[str, Any]]:
"""List the updated checkpoints from the chosen storage service.

:param str fully_qualified_namespace: The fully qualified namespace that the Event Hub belongs to.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import annotations
import random
import uuid
import logging
Expand Down Expand Up @@ -49,12 +50,11 @@ class EventProcessor(

def __init__(
self,
eventhub_client, # type: EventHubConsumerClient
consumer_group, # type: str
on_event, # type: Callable[[PartitionContext, Union[Optional[EventData], List[EventData]]], None]
**kwargs # type: Any
):
# type: (...) -> None
eventhub_client: EventHubConsumerClient,
consumer_group: str,
on_event: Callable[[PartitionContext, Union[Optional[EventData], List[EventData]]], None],
**kwargs: Any
) -> None:
# pylint: disable=line-too-long
self._consumer_group = consumer_group
self._eventhub_client = eventhub_client
Expand Down Expand Up @@ -122,8 +122,7 @@ def __init__(
self._partition_id,
)

def __repr__(self):
# type: () -> str
def __repr__(self) -> str:
return "EventProcessor: id {}".format(self._id)

def _process_error(self, partition_context, err):
Expand All @@ -141,8 +140,7 @@ def _process_error(self, partition_context, err):
err_again,
)

def _cancel_tasks_for_partitions(self, to_cancel_partitions):
# type: (Iterable[str]) -> None
def _cancel_tasks_for_partitions(self, to_cancel_partitions: Iterable[str]) -> None:
with self._lock:
_LOGGER.debug(
"EventProcessor %r tries to cancel partitions %r",
Expand Down Expand Up @@ -179,8 +177,11 @@ def _initialize_partition_consumer(self, partition_id):
"EventProcessor %r has claimed partition %r", self._id, partition_id
)

def _create_tasks_for_claimed_ownership(self, claimed_partitions, checkpoints=None):
# type: (Iterable[str], Optional[Dict[str, Dict[str, Any]]]) -> None
def _create_tasks_for_claimed_ownership(
self,
claimed_partitions: Iterable[str],
checkpoints: Optional[Dict[str, Dict[str, Any]]]=None
) -> None:
with self._lock:
_LOGGER.debug(
"EventProcessor %r tries to claim partition %r",
Expand Down Expand Up @@ -223,8 +224,11 @@ def _create_tasks_for_claimed_ownership(self, claimed_partitions, checkpoints=No
)
self._initialize_partition_consumer(partition_id)

def _on_event_received(self, partition_context, event):
# type: (PartitionContext, Union[Optional[EventData], List[EventData]]) -> None
def _on_event_received(
self,
partition_context: PartitionContext,
event: Union[Optional[EventData], List[EventData]]
) -> None:
if event:
try:
partition_context._last_received_event = event[-1] # type: ignore #pylint:disable=protected-access
Expand All @@ -236,8 +240,7 @@ def _on_event_received(self, partition_context, event):
else:
self._event_handler(partition_context, event)

def _load_balancing(self):
# type: () -> None
def _load_balancing(self) -> None:
"""Start the EventProcessor.

The EventProcessor will try to claim and balance partition ownership with other `EventProcessor`
Expand Down Expand Up @@ -297,8 +300,12 @@ def _load_balancing(self):

time.sleep(load_balancing_interval)

def _close_consumer(self, partition_id, consumer, reason):
# type: (str, EventHubConsumer, CloseReason) -> None
def _close_consumer(
self,
partition_id: str,
consumer: EventHubConsumer,
reason: CloseReason
) -> None:
consumer.close()
with self._lock:
del self._consumers[partition_id]
Expand Down Expand Up @@ -332,8 +339,7 @@ def _close_consumer(self, partition_id, consumer, reason):

self._ownership_manager.release_ownership(partition_id)

def _do_receive(self, partition_id, consumer):
# type: (str, EventHubConsumer) -> None
def _do_receive(self, partition_id: str, consumer: EventHubConsumer) -> None:
"""Call the consumer.receive() and handle exceptions if any after it exhausts retries."""
try:
consumer.receive(self._batch, self._max_batch_size, self._max_wait_time)
Expand All @@ -352,8 +358,7 @@ def _do_receive(self, partition_id, consumer):
# Does OWNERSHIP_LOST make sense for all errors?
self._close_consumer(partition_id, consumer, CloseReason.OWNERSHIP_LOST)

def start(self):
# type: () -> None
def start(self) -> None:
if self._running:
_LOGGER.info("EventProcessor %r has already started.", self._id)
return
Expand All @@ -378,8 +383,7 @@ def start(self):
for partition_id, consumer in list(self._consumers.items()):
self._close_consumer(partition_id, consumer, CloseReason.SHUTDOWN)

def stop(self):
# type: () -> None
def stop(self) -> None:
"""Stop the EventProcessor.

The EventProcessor will stop receiving events from EventHubs and release the ownership of the partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,22 @@ def __init__(self):
)

def list_ownership(
self, fully_qualified_namespace, eventhub_name, consumer_group, **kwargs
):
# type: (str, str, str, Any) -> Iterable[Dict[str, Any]]
self,
fully_qualified_namespace: str,
eventhub_name: str,
consumer_group: str,
**kwargs: Any
) -> Iterable[Dict[str, Any]]:
consumer_group_node = self._ownerships_trie.lookup(
(fully_qualified_namespace, eventhub_name, consumer_group)
)
return self._ownerships_trie.list_leaves(consumer_group_node)

def claim_ownership(self, ownership_list, **kwargs):
# type: (Iterable[Dict[str, Any]], Any) -> Iterable[Dict[str, Any]]
def claim_ownership(
self,
ownership_list: Iterable[Dict[str, Any]],
**kwargs: Any
) -> Iterable[Dict[str, Any]]:
result = []
for ownership in ownership_list:
fully_qualified_namespace = ownership["fully_qualified_namespace"]
Expand Down Expand Up @@ -143,8 +149,7 @@ def claim_ownership(self, ownership_list, **kwargs):
result.append(ownership)
return result

def update_checkpoint(self, checkpoint, **kwargs):
# type: (Dict[str, Optional[Union[str, int]]], Any) -> None
def update_checkpoint(self, checkpoint: Dict[str, Optional[Union[str, int]]], **kwargs: Any) -> None:
return self._checkpoints_trie.set_ele(checkpoint)

def list_checkpoints(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from __future__ import annotations
import math
import time
import random
Expand Down Expand Up @@ -29,13 +30,13 @@ class OwnershipManager(object): # pylint:disable=too-many-instance-attributes

def __init__(
self,
eventhub_client, # type: Union[EventHubConsumerClient, EventHubProducerClient]
consumer_group, # type: str
owner_id, # type: str
checkpoint_store, # type: Optional[CheckpointStore]
ownership_timeout, # type: float
load_balancing_strategy, # type: LoadBalancingStrategy
partition_id, # type: Optional[str]
eventhub_client: Union[EventHubConsumerClient, EventHubProducerClient],
consumer_group: str,
owner_id: str,
checkpoint_store: Optional[CheckpointStore],
ownership_timeout: float,
load_balancing_strategy: LoadBalancingStrategy,
partition_id: Optional[str]
):
self.cached_parition_ids = [] # type: List[str]
self.owned_partitions = [] # type: Iterable[Dict[str, Any]]
Expand All @@ -51,8 +52,7 @@ def __init__(
self.load_balancing_strategy = load_balancing_strategy
self.partition_id = partition_id

def claim_ownership(self):
# type: () -> List[str]
def claim_ownership(self) -> List[str]:
"""Claims ownership for this EventProcessor"""
if not self.cached_parition_ids:
self._retrieve_partition_ids()
Expand All @@ -78,8 +78,7 @@ def claim_ownership(self):
)
return [x["partition_id"] for x in self.owned_partitions]

def release_ownership(self, partition_id):
# type: (str) -> None
def release_ownership(self, partition_id: str) -> None:
"""Explicitly release ownership of a partition if we still have it.

This is called when a consumer is shutdown, and is achieved by resetting the associated
Expand All @@ -100,15 +99,15 @@ def release_ownership(self, partition_id):
partition_ownership[0]["owner_id"] = ""
self.checkpoint_store.claim_ownership(partition_ownership)

def _retrieve_partition_ids(self):
# type: () -> None
def _retrieve_partition_ids(self) -> None:
"""List all partition ids of the event hub that the EventProcessor is working on."""
self.cached_parition_ids = self.eventhub_client.get_partition_ids()

def _balance_ownership( # pylint:disable=too-many-locals
self, ownership_list, all_partition_ids
):
# type: (Iterable[Dict[str, Any]], List[str]) -> List[Dict[str, Any]]
self,
ownership_list: Iterable[Dict[str, Any]],
all_partition_ids: List[str]
) -> List[Dict[str, Any]]:
"""Balances and claims ownership of partitions for this EventProcessor."""
now = time.time()
ownership_dict = {
Expand Down Expand Up @@ -206,8 +205,7 @@ def _balance_ownership( # pylint:disable=too-many-locals
to_claim.append(to_steal_partition)
return to_claim

def get_checkpoints(self):
# type: () -> Dict[str, Dict[str, Any]]
def get_checkpoints(self) -> Dict[str, Dict[str, Any]]:
if self.checkpoint_store:
checkpoints = self.checkpoint_store.list_checkpoints(
self.fully_qualified_namespace, self.eventhub_name, self.consumer_group
Expand Down
Loading