Skip to content

Commit

Permalink
add runtime config
Browse files Browse the repository at this point in the history
  • Loading branch information
tsv1 committed Jun 7, 2024
1 parent 345dbb4 commit d3def1a
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 27 deletions.
6 changes: 3 additions & 3 deletions vedro/plugins/ensurer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ._ensure import ensure
from ._ensurer import Ensurer, EnsurerPlugin
from ._ensure import Ensure
from ._ensurer import Ensurer, EnsurerPlugin, ensure

__all__ = ("Ensurer", "EnsurerPlugin", "ensure",)
__all__ = ("Ensurer", "EnsurerPlugin", "ensure", "Ensure",)
36 changes: 16 additions & 20 deletions vedro/plugins/ensurer/_ensure.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@
import asyncio
import time
from functools import partial, wraps
from functools import wraps
from typing import Any, Callable, Coroutine, Optional, Tuple, Type, TypeVar, Union, cast, overload

__all__ = ("ensure",)
__all__ = ("Ensure", "AttemptType", "DelayValueType", "DelayCallableType", "DelayType",
"ExceptionType", "SwallowExceptionType", "LoggerType",)

F = TypeVar("F", bound=Callable[..., Any])
AF = TypeVar("AF", bound=Callable[..., Coroutine[Any, Any, Any]])

Attempt = int
DelayValue = Union[float, int]
DelayCallable = Callable[[Attempt], DelayValue]
AttemptType = int
DelayValueType = Union[float, int]
DelayCallableType = Callable[[AttemptType], DelayValueType]
DelayType = Union[DelayValueType, DelayCallableType]

ExceptionType = Type[BaseException]
SwallowException = Union[Tuple[ExceptionType, ...], ExceptionType]
SwallowExceptionType = Union[Tuple[ExceptionType, ...], ExceptionType]

Logger = Callable[[Any, Attempt, Union[BaseException, None]], Any]
LoggerType = Callable[[Any, AttemptType, Union[BaseException, None]], Any]


class Ensure:
def __init__(self, *, attempts: Optional[Attempt] = None,
delay: Optional[Union[DelayValue, DelayCallable]] = None,
swallow: Optional[SwallowException] = None,
logger: Optional[Logger] = None) -> None:
self._attempts = attempts or 3
self._delay = delay or 0.0
self._swallow = (BaseException,) if (swallow is None) else swallow
def __init__(self, *, attempts: AttemptType = 3,
delay: DelayType = 0.0,
swallow: SwallowExceptionType = BaseException,
logger: Optional[LoggerType] = None) -> None:
self._attempts = attempts
self._delay = delay
self._swallow = swallow
self._logger = logger

@overload
Expand Down Expand Up @@ -87,9 +89,3 @@ async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
def __repr__(self) -> str:
return (f"{self.__class__.__name__}"
f"(attempts={self._attempts}, delay={self._delay!r}, swallow={self._swallow!r})")


ensure = partial(Ensure,
logger=lambda _, attempt, exc=None:
print(f"-> attempt {attempt} failed with {exc!r}")
if exc else print(f"-> attempt {attempt} succeeded"))
66 changes: 62 additions & 4 deletions vedro/plugins/ensurer/_ensurer.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,76 @@
from typing import Type
from typing import Optional, Type, Union

from vedro.core import Dispatcher, Plugin, PluginConfig
from vedro.events import StepFailedEvent, StepPassedEvent, StepRunEvent
from ._ensure import AttemptType, DelayType, Ensure, LoggerType, SwallowExceptionType
from ._runtime_config import RuntimeConfig
from ._runtime_config import runtime_config as _runtime_config

__all__ = ("Ensurer", "EnsurerPlugin",)
__all__ = ("Ensurer", "EnsurerPlugin", "ensure",)


def ensure(*, attempts: Optional[AttemptType] = None,
delay: Optional[DelayType] = None,
swallow: Optional[SwallowExceptionType] = None,
logger: Optional[LoggerType] = None) -> Ensure:
return Ensure(attempts=attempts or _runtime_config.get_attempts(),
delay=delay or _runtime_config.get_delay(),
swallow=swallow or _runtime_config.get_swallow(),
logger=logger or _runtime_config.get_logger())


class EnsurerPlugin(Plugin):
def __init__(self, config: Type["Ensurer"]) -> None:
def __init__(self, config: Type["Ensurer"], *,
runtime_config: RuntimeConfig = _runtime_config) -> None:
super().__init__(config)
self._runtime_config = runtime_config
self._runtime_config.set_attempts(config.default_attempts)
self._runtime_config.set_delay(config.default_delay)
self._runtime_config.set_swallow(config.default_swallow)
self._runtime_config.set_logger(None)

self._show_attempts = config.show_attempts
if self._show_attempts:
self._runtime_config.set_logger(self._logger)
self._last_log = []

def subscribe(self, dispatcher: Dispatcher) -> None:
pass
dispatcher.listen(StepRunEvent, self.on_step_run) \
.listen(StepPassedEvent, self.on_step_end) \
.listen(StepFailedEvent, self.on_step_end)

def on_step_run(self, event: StepRunEvent) -> None:
self._last_log = []

def on_step_end(self, event: Union[StepPassedEvent, StepFailedEvent]) -> None:
if not self._show_attempts:
return

if not self._last_log:
return

for fn, attempt, exc in self._last_log:
orig_step = event.step_result.step._orig_step
if fn != getattr(orig_step, "__wrapped__", orig_step):
continue

if exc:
event.step_result.add_extra_details(f"[{attempt}] attempt failed with {exc!r}")
else:
event.step_result.add_extra_details(f"[{attempt}] attempt succeeded")

def _logger(self, fn, attempt: int, exc: Union[BaseException, None]) -> None:
self._last_log.append((fn, attempt, exc))


class Ensurer(PluginConfig):
plugin = EnsurerPlugin
description = "<description>"

show_attempts = True

default_attempts = 3

default_delay = 0.0

default_swallow = BaseException
40 changes: 40 additions & 0 deletions vedro/plugins/ensurer/_runtime_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from niltype import Nil, Nilable

from ._ensure import AttemptType, DelayType, LoggerType, SwallowExceptionType

__all__ = ("RuntimeConfig", "runtime_config",)


class RuntimeConfig:
def __init__(self) -> None:
self._attempts: Nilable[AttemptType] = Nil
self._delay: Nilable[DelayType] = Nil
self._swallow: Nilable[SwallowExceptionType] = Nil
self._logger: Nilable[LoggerType] = Nil

def get_attempts(self) -> Nilable[AttemptType]:
return self._attempts

def set_attempts(self, attempts: AttemptType) -> None:
self._attempts = attempts

def get_delay(self) -> Nilable[DelayType]:
return self._delay

def set_delay(self, delay: DelayType) -> None:
self._delay = delay

def get_swallow(self) -> Nilable[SwallowExceptionType]:
return self._swallow

def set_swallow(self, swallow: SwallowExceptionType) -> None:
self._swallow = swallow

def get_logger(self) -> Nilable[LoggerType]:
return self._logger

def set_logger(self, logger: LoggerType) -> None:
self._logger = logger


runtime_config = RuntimeConfig()

0 comments on commit d3def1a

Please sign in to comment.