Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-44763: Fix typing of retry_async_transaction decorator #176

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions src/vocutouts/uws/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

from __future__ import annotations

from collections.abc import Awaitable, Callable
from collections.abc import Callable, Coroutine
from datetime import datetime, timedelta
from functools import wraps
from typing import Any, TypeVar, cast
from typing import ParamSpec, TypeVar

from safir.arq import JobMetadata, JobResult
from safir.database import datetime_from_db, datetime_to_db
Expand All @@ -31,8 +31,8 @@
from .schema.job_parameter import JobParameter as SQLJobParameter
from .schema.job_result import JobResult as SQLJobResult

F = TypeVar("F", bound=Callable[..., Any])
G = TypeVar("G", bound=Callable[..., Awaitable[Any]])
T = TypeVar("T")
P = ParamSpec("P")

__all__ = ["JobStore"]

Expand Down Expand Up @@ -85,7 +85,9 @@ def _convert_job(job: SQLJob) -> UWSJob:
)


def retry_async_transaction(g: G) -> G:
def retry_async_transaction(
f: Callable[P, Coroutine[None, None, T]],
) -> Callable[P, Coroutine[None, None, T]]:
"""Retry once if a transaction failed.

Notes
Expand All @@ -104,16 +106,16 @@ def retry_async_transaction(g: G) -> G:
multiple times.
"""

@wraps(g)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
@wraps(f)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
for _ in range(1, 5):
try:
return await g(*args, **kwargs)
return await f(*args, **kwargs)
except (DBAPIError, OperationalError):
continue
return await g(*args, **kwargs)
return await f(*args, **kwargs)

return cast(G, wrapper)
return wrapper


class JobStore:
Expand Down