From 768f9931c90e42a8c1b34f30cbc507aa9522e0df Mon Sep 17 00:00:00 2001 From: Russ Allbery Date: Fri, 7 Feb 2025 16:44:24 -0800 Subject: [PATCH] Create Gafaelfawr tokens in parallel When creating a flock, create the Gafaelfawr tokens in parallel (up to the limit of the httpx connection pool, which is 100) instead of serially. No business is using the shared global httpx client and connection pool, so remove it from the constructors of all of the business classes and stop passing it down. The Nublado business uses a separate connection pool per monkey maintained in the Nublado client, and the TAP business uses pyvo, which manages its own connections. --- changelog.d/20250207_162856_rra_DM_48838.md | 3 +++ src/mobu/factory.py | 1 - src/mobu/services/business/base.py | 7 ------- src/mobu/services/business/gitlfs.py | 3 --- src/mobu/services/business/notebookrunner.py | 5 ----- src/mobu/services/business/nublado.py | 5 ----- src/mobu/services/business/nubladopythonloop.py | 5 ----- src/mobu/services/business/tap.py | 5 ----- src/mobu/services/business/tapqueryrunner.py | 5 ----- src/mobu/services/business/tapquerysetrunner.py | 5 ----- src/mobu/services/flock.py | 6 +++--- src/mobu/services/monkey.py | 6 ------ tests/business/tapquerysetrunner_test.py | 3 --- 13 files changed, 6 insertions(+), 53 deletions(-) create mode 100644 changelog.d/20250207_162856_rra_DM_48838.md diff --git a/changelog.d/20250207_162856_rra_DM_48838.md b/changelog.d/20250207_162856_rra_DM_48838.md new file mode 100644 index 00000000..d181fdeb --- /dev/null +++ b/changelog.d/20250207_162856_rra_DM_48838.md @@ -0,0 +1,3 @@ +### Bug fixes + +- When starting a flock, create user tokens simultaneously (up to the limit of the httpx connection pool size of 100) rather than serially. diff --git a/src/mobu/factory.py b/src/mobu/factory.py index ef4c9edd..f7ae9477 100644 --- a/src/mobu/factory.py +++ b/src/mobu/factory.py @@ -51,7 +51,6 @@ def __init__(self, http_client: AsyncClient, events: Events) -> None: logger=self.logger, events=self.events, ) - self.events = events async def aclose(self) -> None: """Clean up a process context. diff --git a/src/mobu/services/business/base.py b/src/mobu/services/business/base.py index 14cc69dd..b3e10596 100644 --- a/src/mobu/services/business/base.py +++ b/src/mobu/services/business/base.py @@ -10,7 +10,6 @@ from enum import Enum from typing import Generic, TypedDict, TypeVar -from httpx import AsyncClient from safir.datetime import current_datetime from structlog.stdlib import BoundLogger @@ -62,8 +61,6 @@ class Business(Generic[T], metaclass=ABCMeta): Configuration options for the business. user User with their authentication token to use to run the business. - http_client - Shared HTTP client. events Event publishers. logger @@ -77,8 +74,6 @@ class Business(Generic[T], metaclass=ABCMeta): Configuration options for the business. user User with their authentication token to use to run the business. - http_client - Shared HTTP client. events Event publishers. logger @@ -101,14 +96,12 @@ def __init__( *, options: T, user: AuthenticatedUser, - http_client: AsyncClient, events: Events, logger: BoundLogger, flock: str | None, ) -> None: self.options = options self.user = user - self.http_client = http_client self.events = events self.logger = logger self.success_count = 0 diff --git a/src/mobu/services/business/gitlfs.py b/src/mobu/services/business/gitlfs.py index 976b6d80..d603e334 100644 --- a/src/mobu/services/business/gitlfs.py +++ b/src/mobu/services/business/gitlfs.py @@ -7,7 +7,6 @@ from pathlib import Path from urllib.parse import urlparse -from httpx import AsyncClient from safir.sentry import duration from structlog.stdlib import BoundLogger @@ -30,7 +29,6 @@ def __init__( *, options: GitLFSBusinessOptions, user: AuthenticatedUser, - http_client: AsyncClient, events: Events, logger: BoundLogger, flock: str | None, @@ -38,7 +36,6 @@ def __init__( super().__init__( options=options, user=user, - http_client=http_client, events=events, logger=logger, flock=flock, diff --git a/src/mobu/services/business/notebookrunner.py b/src/mobu/services/business/notebookrunner.py index 775f1243..019abf04 100644 --- a/src/mobu/services/business/notebookrunner.py +++ b/src/mobu/services/business/notebookrunner.py @@ -19,7 +19,6 @@ import sentry_sdk import yaml -from httpx import AsyncClient from rubin.nublado.client import JupyterLabSession from rubin.nublado.client.exceptions import CodeExecutionError from rubin.nublado.client.models import CodeContext @@ -71,8 +70,6 @@ class NotebookRunner(NubladoBusiness): Configuration options for the business. user User with their authentication token to use to run the business. - http_client - Shared HTTP client for general web access. events Event publishers. logger @@ -86,7 +83,6 @@ def __init__( *, options: NotebookRunnerOptions | ListNotebookRunnerOptions, user: AuthenticatedUser, - http_client: AsyncClient, events: Events, logger: BoundLogger, flock: str | None, @@ -94,7 +90,6 @@ def __init__( super().__init__( options=options, user=user, - http_client=http_client, events=events, logger=logger, flock=flock, diff --git a/src/mobu/services/business/nublado.py b/src/mobu/services/business/nublado.py index c202be1e..72a54cd5 100644 --- a/src/mobu/services/business/nublado.py +++ b/src/mobu/services/business/nublado.py @@ -12,7 +12,6 @@ from typing import Generic, TypeVar import sentry_sdk -from httpx import AsyncClient from rubin.nublado.client import JupyterLabSession, NubladoClient from safir.datetime import current_datetime, format_datetime_for_logging from safir.sentry import duration @@ -101,8 +100,6 @@ class NubladoBusiness(Business, Generic[T], metaclass=ABCMeta): Configuration options for the business. user User with their authentication token to use to run the business. - http_client - Shared HTTP client for general web access. events Event publishers. logger @@ -114,7 +111,6 @@ def __init__( *, options: T, user: AuthenticatedUser, - http_client: AsyncClient, events: Events, logger: BoundLogger, flock: str | None, @@ -122,7 +118,6 @@ def __init__( super().__init__( options=options, user=user, - http_client=http_client, events=events, logger=logger, flock=flock, diff --git a/src/mobu/services/business/nubladopythonloop.py b/src/mobu/services/business/nubladopythonloop.py index a9853f46..32cce85e 100644 --- a/src/mobu/services/business/nubladopythonloop.py +++ b/src/mobu/services/business/nubladopythonloop.py @@ -9,7 +9,6 @@ from datetime import timedelta import sentry_sdk -from httpx import AsyncClient from rubin.nublado.client import JupyterLabSession from rubin.nublado.client.exceptions import CodeExecutionError from safir.sentry import duration @@ -33,8 +32,6 @@ class NubladoPythonLoop(NubladoBusiness): Configuration options for the business. user User with their authentication token to use to run the business. - http_client - Shared HTTP client for general web access. logger Logger to use to report the results of business. flock @@ -46,7 +43,6 @@ def __init__( *, options: NubladoPythonLoopOptions, user: AuthenticatedUser, - http_client: AsyncClient, events: Events, logger: BoundLogger, flock: str | None, @@ -54,7 +50,6 @@ def __init__( super().__init__( options=options, user=user, - http_client=http_client, events=events, logger=logger, flock=flock, diff --git a/src/mobu/services/business/tap.py b/src/mobu/services/business/tap.py index a5b36e6f..7bfb7713 100644 --- a/src/mobu/services/business/tap.py +++ b/src/mobu/services/business/tap.py @@ -9,7 +9,6 @@ import pyvo import requests -from httpx import AsyncClient from safir.sentry import duration from sentry_sdk import set_context from structlog.stdlib import BoundLogger @@ -41,8 +40,6 @@ class TAPBusiness(Business, Generic[T], metaclass=ABCMeta): Configuration options for the business. user User with their authentication token to use to run the business. - http_client - Shared HTTP client for general web access. events Event publishers. logger @@ -54,7 +51,6 @@ def __init__( *, options: T, user: AuthenticatedUser, - http_client: AsyncClient, events: Events, logger: BoundLogger, flock: str | None, @@ -62,7 +58,6 @@ def __init__( super().__init__( options=options, user=user, - http_client=http_client, events=events, logger=logger, flock=flock, diff --git a/src/mobu/services/business/tapqueryrunner.py b/src/mobu/services/business/tapqueryrunner.py index bae29218..7cb0dce7 100644 --- a/src/mobu/services/business/tapqueryrunner.py +++ b/src/mobu/services/business/tapqueryrunner.py @@ -4,7 +4,6 @@ from random import SystemRandom -from httpx import AsyncClient from structlog.stdlib import BoundLogger from ...events import Events @@ -24,8 +23,6 @@ class TAPQueryRunner(TAPBusiness): Configuration options for the business. user User with their authentication token to use to run the business. - http_client - Shared HTTP client for general web access. events Event publishers. logger @@ -39,7 +36,6 @@ def __init__( *, options: TAPQueryRunnerOptions, user: AuthenticatedUser, - http_client: AsyncClient, events: Events, logger: BoundLogger, flock: str | None, @@ -47,7 +43,6 @@ def __init__( super().__init__( options=options, user=user, - http_client=http_client, events=events, logger=logger, flock=flock, diff --git a/src/mobu/services/business/tapquerysetrunner.py b/src/mobu/services/business/tapquerysetrunner.py index 77a093b1..8d8337b1 100644 --- a/src/mobu/services/business/tapquerysetrunner.py +++ b/src/mobu/services/business/tapquerysetrunner.py @@ -9,7 +9,6 @@ import jinja2 import shortuuid import yaml -from httpx import AsyncClient from structlog.stdlib import BoundLogger from ...events import Events @@ -29,8 +28,6 @@ class TAPQuerySetRunner(TAPBusiness): Configuration options for the business. user User with their authentication token to use to run the business. - http_client - Shared HTTP client for general web access. events Event publishers. logger @@ -44,7 +41,6 @@ def __init__( *, options: TAPQuerySetRunnerOptions, user: AuthenticatedUser, - http_client: AsyncClient, events: Events, logger: BoundLogger, flock: str | None, @@ -52,7 +48,6 @@ def __init__( super().__init__( options=options, user=user, - http_client=http_client, events=events, logger=logger, flock=flock, diff --git a/src/mobu/services/flock.py b/src/mobu/services/flock.py index 12fda207..d671d540 100644 --- a/src/mobu/services/flock.py +++ b/src/mobu/services/flock.py @@ -179,10 +179,10 @@ async def _create_users(self) -> list[AuthenticatedUser]: count = self._config.count users = self._users_from_spec(self._config.user_spec, count) scopes = self._config.scopes - return [ - await self._gafaelfawr.create_service_token(u, scopes) - for u in users + coros = [ + self._gafaelfawr.create_service_token(u, scopes) for u in users ] + return await asyncio.gather(*coros) def _users_from_spec(self, spec: UserSpec, count: int) -> list[User]: """Generate count Users from the provided spec.""" diff --git a/src/mobu/services/monkey.py b/src/mobu/services/monkey.py index 70136287..ee264d63 100644 --- a/src/mobu/services/monkey.py +++ b/src/mobu/services/monkey.py @@ -96,7 +96,6 @@ def __init__( self.business = EmptyLoop( options=business_config.options, user=user, - http_client=self._http_client, events=self._events, logger=self._logger, flock=self._flock, @@ -105,7 +104,6 @@ def __init__( self.business = GitLFSBusiness( options=business_config.options, user=user, - http_client=self._http_client, events=self._events, logger=self._logger, flock=self._flock, @@ -114,7 +112,6 @@ def __init__( self.business = NubladoPythonLoop( options=business_config.options, user=user, - http_client=self._http_client, events=self._events, logger=self._logger, flock=self._flock, @@ -123,7 +120,6 @@ def __init__( self.business = NotebookRunner( options=business_config.options, user=user, - http_client=self._http_client, events=self._events, logger=self._logger, flock=self._flock, @@ -132,7 +128,6 @@ def __init__( self.business = TAPQueryRunner( options=business_config.options, user=user, - http_client=self._http_client, events=self._events, logger=self._logger, flock=self._flock, @@ -141,7 +136,6 @@ def __init__( self.business = TAPQuerySetRunner( options=business_config.options, user=user, - http_client=self._http_client, events=self._events, logger=self._logger, flock=self._flock, diff --git a/tests/business/tapquerysetrunner_test.py b/tests/business/tapquerysetrunner_test.py index edb4dfa1..37e9568b 100644 --- a/tests/business/tapquerysetrunner_test.py +++ b/tests/business/tapquerysetrunner_test.py @@ -13,7 +13,6 @@ import yaml from anys import ANY_AWARE_DATETIME_STR, AnyContains, AnySearch, AnyWithEntries from httpx import AsyncClient -from safir.dependencies.http_client import http_client_dependency from safir.metrics import NOT_NONE, MockEventPublisher from safir.testing.sentry import Captured @@ -224,12 +223,10 @@ async def test_random_object(events: Events) -> None: ) logger = structlog.get_logger(__file__) options = TAPQuerySetRunnerOptions(query_set=query_set) - http_client = await http_client_dependency() with patch.object(pyvo.dal, "TAPService"): runner = TAPQuerySetRunner( options=options, user=user, - http_client=http_client, events=events, logger=logger, flock=None,