diff --git a/changelog.d/20250217_150229_rra_DM_48977.md b/changelog.d/20250217_150229_rra_DM_48977.md new file mode 100644 index 00000000..002756b4 --- /dev/null +++ b/changelog.d/20250217_150229_rra_DM_48977.md @@ -0,0 +1,3 @@ +### Bug fixes + +- Fix jitter calculations in Nublado businesses. diff --git a/src/mobu/asyncio.py b/src/mobu/asyncio.py index 0780f0f8..fbd627f1 100644 --- a/src/mobu/asyncio.py +++ b/src/mobu/asyncio.py @@ -9,9 +9,7 @@ from contextlib import AbstractAsyncContextManager from datetime import timedelta from types import TracebackType -from typing import Literal, TypeVar - -T = TypeVar("T") +from typing import Literal __all__ = [ "aclosing_iter", @@ -88,7 +86,7 @@ async def loop() -> None: return asyncio.ensure_future(loop()) -async def wait_first(*args: Coroutine[None, None, T]) -> T | None: +async def wait_first[T](*args: Coroutine[None, None, T]) -> T | None: """Return the result of the first awaitable to finish. The other awaitables will be cancelled. The first awaitable determines diff --git a/src/mobu/services/business/base.py b/src/mobu/services/business/base.py index b3e10596..203f24df 100644 --- a/src/mobu/services/business/base.py +++ b/src/mobu/services/business/base.py @@ -8,7 +8,7 @@ from collections.abc import AsyncGenerator, AsyncIterable from datetime import timedelta from enum import Enum -from typing import Generic, TypedDict, TypeVar +from typing import TypedDict from safir.datetime import current_datetime from structlog.stdlib import BoundLogger @@ -19,9 +19,6 @@ from ...models.user import AuthenticatedUser from ...sentry import capturing_start_span, start_transaction -T = TypeVar("T", bound="BusinessOptions") -U = TypeVar("U") - __all__ = ["Business"] @@ -37,7 +34,7 @@ class BusinessCommand(Enum): STOP = "STOP" -class Business(Generic[T], metaclass=ABCMeta): +class Business[T: BusinessOptions](metaclass=ABCMeta): """Base class for monkey business (one type of repeated operation). The basic flow for a monkey business is as follows: @@ -114,21 +111,21 @@ def __init__( # Methods that should be overridden by child classes if needed. - async def startup(self) -> None: + async def startup(self) -> None: # noqa: B027 """Run before the start of the first iteration and then not again.""" @abstractmethod async def execute(self) -> None: """Execute the core of each business loop.""" - async def close(self) -> None: + async def close(self) -> None: # noqa: B027 """Clean up any allocated resources. This should be overridden by child classes to free any resources that were allocated in ``__init__``. """ - async def shutdown(self) -> None: + async def shutdown(self) -> None: # noqa: B027 """Perform any cleanup required after stopping.""" # Public Business API called by the Monkey class. These methods handle the @@ -258,7 +255,7 @@ async def pause(self, interval: timedelta) -> bool: except (TimeoutError, QueueEmpty): return True - async def iter_with_timeout( + async def iter_with_timeout[U]( self, iterable: AsyncIterable[U], timeout: timedelta ) -> AsyncGenerator[U]: """Run an iterator with a timeout. diff --git a/src/mobu/services/business/nublado.py b/src/mobu/services/business/nublado.py index 72a54cd5..bbe06225 100644 --- a/src/mobu/services/business/nublado.py +++ b/src/mobu/services/business/nublado.py @@ -9,7 +9,7 @@ from dataclasses import dataclass, field from datetime import datetime, timedelta from random import SystemRandom -from typing import Generic, TypeVar +from typing import Any import sentry_sdk from rubin.nublado.client import JupyterLabSession, NubladoClient @@ -35,12 +35,11 @@ from ...sentry import capturing_start_span, start_transaction from .base import Business -T = TypeVar("T", bound="NubladoBusinessOptions") - __all__ = ["NubladoBusiness", "ProgressLogMessage"] _ANSI_REGEX = re.compile(r"(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]") """Regex that matches ANSI escape sequences.""" + _CHDIR_TEMPLATE = 'import os; os.chdir("{wd}")' """Template to construct the code to run to set the working directory.""" @@ -78,7 +77,9 @@ def __str__(self) -> str: return f"{timestamp} - {self.message}" -class NubladoBusiness(Business, Generic[T], metaclass=ABCMeta): +class NubladoBusiness[T: NubladoBusinessOptions]( + Business[T], metaclass=ABCMeta +): """Base class for business that executes Python code in a Nublado notebook. This class modifies the core `~mobu.business.base.Business` loop by @@ -216,9 +217,11 @@ async def shutdown(self) -> None: async def idle(self) -> None: if self.options.jitter: self.logger.info("Idling...") + jitter = self.options.jitter.total_seconds() + delay_seconds = self._random.uniform(0, jitter) + delay = timedelta(seconds=delay_seconds) with capturing_start_span(op="idle"): - extra_delay = self._random.uniform(0, self.options.jitter) - await self.pause(self.options.idle_time + extra_delay) + await self.pause(self.options.idle_time + delay) else: await super().idle() @@ -304,7 +307,9 @@ async def open_session( self, notebook: str | None = None ) -> AsyncGenerator[JupyterLabSession]: self.logger.info("Creating lab session") - opts = {"max_websocket_size": self.options.max_websocket_message_size} + opts: dict[str, Any] = { + "max_websocket_size": self.options.max_websocket_message_size + } create_session_cm = capturing_start_span(op="create_session") create_session_cm.__enter__() async with self._client.open_lab_session(notebook, **opts) as session: diff --git a/src/mobu/services/business/tap.py b/src/mobu/services/business/tap.py index 7bfb7713..5734dae0 100644 --- a/src/mobu/services/business/tap.py +++ b/src/mobu/services/business/tap.py @@ -5,7 +5,6 @@ import asyncio from abc import ABCMeta, abstractmethod from concurrent.futures import ThreadPoolExecutor -from typing import Generic, TypeVar import pyvo import requests @@ -21,12 +20,10 @@ from ...sentry import capturing_start_span, start_transaction from .base import Business -T = TypeVar("T", bound="TAPBusinessOptions") - __all__ = ["TAPBusiness"] -class TAPBusiness(Business, Generic[T], metaclass=ABCMeta): +class TAPBusiness[T: TAPBusinessOptions](Business[T], metaclass=ABCMeta): """Base class for business that executes TAP query. This class modifies the core `~mobu.business.base.Business` loop by