From 32425829fdf5415563c2ef4b8a0cc55e80d7fdd4 Mon Sep 17 00:00:00 2001 From: jopemachine Date: Mon, 3 Feb 2025 08:17:06 +0000 Subject: [PATCH] refactor: Reflect feedbacks --- .../backend/manager/api/container_registry.py | 2 +- src/ai/backend/manager/api/context.py | 2 +- src/ai/backend/manager/api/group.py | 16 +- src/ai/backend/manager/api/services/base.py | 25 --- .../services/container_registries/harbor.py | 210 ------------------ .../manager/{api/services => client}/BUILD | 0 .../{api/services => client}/__init__.py | 0 .../container_registry}/BUILD | 0 .../container_registry}/__init__.py | 0 .../client/container_registry/harbor.py | 174 +++++++++++++++ src/ai/backend/manager/models/gql.py | 2 +- .../models/gql_models/container_registry.py | 18 +- .../manager/models/gql_models/group.py | 4 +- src/ai/backend/manager/server.py | 19 +- src/ai/backend/manager/service/BUILD | 1 + src/ai/backend/manager/service/__init__.py | 0 src/ai/backend/manager/service/base.py | 18 ++ .../manager/service/container_registry/BUILD | 1 + .../service/container_registry/__init__.py | 0 .../container_registry}/base.py | 54 ++++- .../service/container_registry/harbor.py | 110 +++++++++ 21 files changed, 388 insertions(+), 268 deletions(-) delete mode 100644 src/ai/backend/manager/api/services/base.py delete mode 100644 src/ai/backend/manager/api/services/container_registries/harbor.py rename src/ai/backend/manager/{api/services => client}/BUILD (100%) rename src/ai/backend/manager/{api/services => client}/__init__.py (100%) rename src/ai/backend/manager/{api/services/container_registries => client/container_registry}/BUILD (100%) rename src/ai/backend/manager/{api/services/container_registries => client/container_registry}/__init__.py (100%) create mode 100644 src/ai/backend/manager/client/container_registry/harbor.py create mode 100644 src/ai/backend/manager/service/BUILD create mode 100644 src/ai/backend/manager/service/__init__.py create mode 100644 src/ai/backend/manager/service/base.py create mode 100644 src/ai/backend/manager/service/container_registry/BUILD create mode 100644 src/ai/backend/manager/service/container_registry/__init__.py rename src/ai/backend/manager/{api/services/container_registries => service/container_registry}/base.py (59%) create mode 100644 src/ai/backend/manager/service/container_registry/harbor.py diff --git a/src/ai/backend/manager/api/container_registry.py b/src/ai/backend/manager/api/container_registry.py index 488fb69c010..9fe10cbb422 100644 --- a/src/ai/backend/manager/api/container_registry.py +++ b/src/ai/backend/manager/api/container_registry.py @@ -39,7 +39,7 @@ async def patch_container_registry( request: web.Request, params: PatchContainerRegistryRequestModel ) -> PatchContainerRegistryResponseModel: registry_id = uuid.UUID(request.match_info["registry_id"]) - log.info("PATCH_CONTAINER_REGISTRY (cr:{})", registry_id) + log.info("PATCH_CONTAINER_REGISTRY (registry:{})", registry_id) root_ctx: RootContext = request.app["_root.context"] registry_row_updates = params.model_dump(exclude={"allowed_groups"}, exclude_none=True) diff --git a/src/ai/backend/manager/api/context.py b/src/ai/backend/manager/api/context.py index cd9c9259991..dcd2654076b 100644 --- a/src/ai/backend/manager/api/context.py +++ b/src/ai/backend/manager/api/context.py @@ -5,8 +5,8 @@ import attrs from ai.backend.common.metrics.metric import CommonMetricRegistry -from ai.backend.manager.api.services.base import ServicesContext from ai.backend.manager.plugin.network import NetworkPluginContext +from ai.backend.manager.service.base import ServicesContext if TYPE_CHECKING: from ai.backend.common.bgtask import BackgroundTaskManager diff --git a/src/ai/backend/manager/api/group.py b/src/ai/backend/manager/api/group.py index 77ca2d864fb..2c68457933f 100644 --- a/src/ai/backend/manager/api/group.py +++ b/src/ai/backend/manager/api/group.py @@ -31,13 +31,13 @@ }) ) async def update_registry_quota(request: web.Request, params: Any) -> web.Response: - log.info("UPDATE_REGISTRY_QUOTA (gr:{})", params["group_id"]) + log.info("UPDATE_REGISTRY_QUOTA (group:{})", params["group_id"]) root_ctx: RootContext = request.app["_root.context"] group_id = params["group_id"] scope_id = ProjectScope(project_id=group_id, domain_name=None) quota = int(params["quota"]) - await root_ctx.services_ctx.per_project_container_registries_quota.update(scope_id, quota) + await root_ctx.services_ctx.per_project_container_registries_quota.update_quota(scope_id, quota) return web.Response(status=204) @@ -49,12 +49,12 @@ async def update_registry_quota(request: web.Request, params: Any) -> web.Respon }) ) async def delete_registry_quota(request: web.Request, params: Any) -> web.Response: - log.info("DELETE_REGISTRY_QUOTA (gr:{})", params["group_id"]) + log.info("DELETE_REGISTRY_QUOTA (group:{})", params["group_id"]) root_ctx: RootContext = request.app["_root.context"] group_id = params["group_id"] scope_id = ProjectScope(project_id=group_id, domain_name=None) - await root_ctx.services_ctx.per_project_container_registries_quota.delete(scope_id) + await root_ctx.services_ctx.per_project_container_registries_quota.delete_quota(scope_id) return web.Response(status=204) @@ -67,13 +67,13 @@ async def delete_registry_quota(request: web.Request, params: Any) -> web.Respon }) ) async def create_registry_quota(request: web.Request, params: Any) -> web.Response: - log.info("CREATE_REGISTRY_QUOTA (gr:{})", params["group_id"]) + log.info("CREATE_REGISTRY_QUOTA (group:{})", params["group_id"]) root_ctx: RootContext = request.app["_root.context"] group_id = params["group_id"] scope_id = ProjectScope(project_id=group_id, domain_name=None) quota = int(params["quota"]) - await root_ctx.services_ctx.per_project_container_registries_quota.create(scope_id, quota) + await root_ctx.services_ctx.per_project_container_registries_quota.create_quota(scope_id, quota) return web.Response(status=204) @@ -85,12 +85,12 @@ async def create_registry_quota(request: web.Request, params: Any) -> web.Respon }) ) async def read_registry_quota(request: web.Request, params: Any) -> web.Response: - log.info("READ_REGISTRY_QUOTA (gr:{})", params["group_id"]) + log.info("READ_REGISTRY_QUOTA (group:{})", params["group_id"]) root_ctx: RootContext = request.app["_root.context"] group_id = params["group_id"] scope_id = ProjectScope(project_id=group_id, domain_name=None) - quota = await root_ctx.services_ctx.per_project_container_registries_quota.read(scope_id) + quota = await root_ctx.services_ctx.per_project_container_registries_quota.read_quota(scope_id) return web.json_response({"result": quota}) diff --git a/src/ai/backend/manager/api/services/base.py b/src/ai/backend/manager/api/services/base.py deleted file mode 100644 index 4b42e2c0b7a..00000000000 --- a/src/ai/backend/manager/api/services/base.py +++ /dev/null @@ -1,25 +0,0 @@ -from ai.backend.manager.models.utils import ExtendedAsyncSAEngine - -from .container_registries.base import PerProjectRegistryQuotaRepository -from .container_registries.harbor import ( - PerProjectContainerRegistryQuotaProtocol, - PerProjectContainerRegistryQuotaService, -) - - -class ServicesContext: - """ - In the API layer, requests are processed through the ServicesContext and - its subordinate layers, including the DB, Client, and Repository layers. - Each layer separates the responsibilities specific to its respective level. - """ - - db: ExtendedAsyncSAEngine - - def __init__(self, db: ExtendedAsyncSAEngine) -> None: - self.db = db - - @property - def per_project_container_registries_quota(self) -> PerProjectContainerRegistryQuotaProtocol: - repository = PerProjectRegistryQuotaRepository(db=self.db) - return PerProjectContainerRegistryQuotaService(repository=repository) diff --git a/src/ai/backend/manager/api/services/container_registries/harbor.py b/src/ai/backend/manager/api/services/container_registries/harbor.py deleted file mode 100644 index 9f3b6b03008..00000000000 --- a/src/ai/backend/manager/api/services/container_registries/harbor.py +++ /dev/null @@ -1,210 +0,0 @@ -from __future__ import annotations - -import logging -from typing import TYPE_CHECKING, Any, Protocol, TypedDict - -import aiohttp -import yarl - -from ai.backend.common.container_registry import ContainerRegistryType -from ai.backend.logging import BraceStyleAdapter -from ai.backend.manager.api.exceptions import GenericBadRequest, InternalServerError, ObjectNotFound -from ai.backend.manager.api.services.container_registries.base import ( - ContainerRegistryRow, - PerProjectRegistryQuotaRepository, -) -from ai.backend.manager.models.rbac import ProjectScope - -if TYPE_CHECKING: - pass - -log = BraceStyleAdapter(logging.getLogger(__spec__.name)) - - -__all__ = ( - "AbstractPerProjectRegistryQuotaClient", - "PerProjectHarborQuotaClient", - "PerProjectRegistryQuotaRepository", - "PerProjectContainerRegistryQuotaProtocol", - "PerProjectContainerRegistryQuotaService", -) - - -class HarborProjectQuotaInfo(TypedDict): - previous_quota: int - quota_id: int - - -class AbstractPerProjectRegistryQuotaClient(Protocol): - async def create(self, registry_row: ContainerRegistryRow, quota: int) -> None: ... - async def update(self, registry_row: ContainerRegistryRow, quota: int) -> None: ... - async def delete(self, registry_row: ContainerRegistryRow) -> None: ... - async def read(self, registry_row: ContainerRegistryRow) -> int: ... - - -class PerProjectHarborQuotaClient(AbstractPerProjectRegistryQuotaClient): - async def _get_harbor_project_id( - self, - sess: aiohttp.ClientSession, - registry_row: ContainerRegistryRow, - rqst_args: dict[str, Any], - ) -> str: - get_project_id_api = ( - yarl.URL(registry_row.url) / "api" / "v2.0" / "projects" / registry_row.project - ) - - async with sess.get(get_project_id_api, allow_redirects=False, **rqst_args) as resp: - if resp.status != 200: - raise InternalServerError(f"Failed to get harbor project_id! response: {resp}") - - res = await resp.json() - harbor_project_id = res["project_id"] - return str(harbor_project_id) - - async def _get_quota_info( - self, - sess: aiohttp.ClientSession, - registry_row: ContainerRegistryRow, - rqst_args: dict[str, Any], - ) -> HarborProjectQuotaInfo: - harbor_project_id = await self._get_harbor_project_id(sess, registry_row, rqst_args) - get_quota_id_api = (yarl.URL(registry_row.url) / "api" / "v2.0" / "quotas").with_query({ - "reference": "project", - "reference_id": harbor_project_id, - }) - - async with sess.get(get_quota_id_api, allow_redirects=False, **rqst_args) as resp: - if resp.status != 200: - raise InternalServerError(f"Failed to get quota info! response: {resp}") - - res = await resp.json() - if not res: - raise ObjectNotFound(object_name="quota entity") - if len(res) > 1: - raise InternalServerError( - f"Multiple quota entities found. (project_id: {harbor_project_id})" - ) - - previous_quota = res[0]["hard"]["storage"] - quota_id = res[0]["id"] - return HarborProjectQuotaInfo(previous_quota=previous_quota, quota_id=quota_id) - - async def read(self, registry_row: ContainerRegistryRow) -> int: - ssl_verify = True - connector = aiohttp.TCPConnector(ssl=ssl_verify) - async with aiohttp.ClientSession(connector=connector) as sess: - rqst_args: dict[str, Any] = {} - - quota_info = await self._get_quota_info(sess, registry_row, rqst_args) - previous_quota = quota_info["previous_quota"] - if previous_quota == -1: - raise ObjectNotFound(object_name="quota entity") - return previous_quota - - async def create(self, registry_row: ContainerRegistryRow, quota: int) -> None: - ssl_verify = True - connector = aiohttp.TCPConnector(ssl=ssl_verify) - async with aiohttp.ClientSession(connector=connector) as sess: - rqst_args: dict[str, Any] = {} - rqst_args["auth"] = aiohttp.BasicAuth(registry_row.username, registry_row.password) - - quota_info = await self._get_quota_info(sess, registry_row, rqst_args) - previous_quota, quota_id = quota_info["previous_quota"], quota_info["quota_id"] - - if previous_quota > 0: - raise GenericBadRequest("Quota limit already exists!") - - put_quota_api = yarl.URL(registry_row.url) / "api" / "v2.0" / "quotas" / str(quota_id) - payload = {"hard": {"storage": quota}} - - async with sess.put( - put_quota_api, json=payload, allow_redirects=False, **rqst_args - ) as resp: - if resp.status != 200: - log.error(f"Failed to create quota! response: {resp}") - raise InternalServerError(f"Failed to create quota! response: {resp}") - - async def update(self, registry_row: ContainerRegistryRow, quota: int) -> None: - ssl_verify = True - connector = aiohttp.TCPConnector(ssl=ssl_verify) - async with aiohttp.ClientSession(connector=connector) as sess: - rqst_args: dict[str, Any] = {} - rqst_args["auth"] = aiohttp.BasicAuth(registry_row.username, registry_row.password) - - quota_info = await self._get_quota_info(sess, registry_row, rqst_args) - previous_quota, quota_id = quota_info["previous_quota"], quota_info["quota_id"] - - if previous_quota == -1: - raise ObjectNotFound(object_name="quota entity") - - put_quota_api = yarl.URL(registry_row.url) / "api" / "v2.0" / "quotas" / str(quota_id) - payload = {"hard": {"storage": quota}} - - async with sess.put( - put_quota_api, json=payload, allow_redirects=False, **rqst_args - ) as resp: - if resp.status != 200: - log.error(f"Failed to update quota! response: {resp}") - raise InternalServerError(f"Failed to update quota! response: {resp}") - - async def delete(self, registry_row: ContainerRegistryRow) -> None: - ssl_verify = True - connector = aiohttp.TCPConnector(ssl=ssl_verify) - async with aiohttp.ClientSession(connector=connector) as sess: - rqst_args: dict[str, Any] = {} - rqst_args["auth"] = aiohttp.BasicAuth(registry_row.username, registry_row.password) - - quota_info = await self._get_quota_info(sess, registry_row, rqst_args) - previous_quota, quota_id = quota_info["previous_quota"], quota_info["quota_id"] - - if previous_quota == -1: - raise ObjectNotFound(object_name="quota entity") - - put_quota_api = yarl.URL(registry_row.url) / "api" / "v2.0" / "quotas" / str(quota_id) - payload = {"hard": {"storage": -1}} - - async with sess.put( - put_quota_api, json=payload, allow_redirects=False, **rqst_args - ) as resp: - if resp.status != 200: - log.error(f"Failed to delete quota! response: {resp}") - raise InternalServerError(f"Failed to delete quota! response: {resp}") - - -class PerProjectContainerRegistryQuotaProtocol(Protocol): - async def create(self, scope_id: ProjectScope, quota: int) -> None: ... - async def update(self, scope_id: ProjectScope, quota: int) -> None: ... - async def delete(self, scope_id: ProjectScope) -> None: ... - async def read(self, scope_id: ProjectScope) -> int: ... - - -class PerProjectContainerRegistryQuotaService(PerProjectContainerRegistryQuotaProtocol): - repository: PerProjectRegistryQuotaRepository - - def __init__(self, repository: PerProjectRegistryQuotaRepository): - self.repository = repository - - def make_client(self, type_: ContainerRegistryType) -> AbstractPerProjectRegistryQuotaClient: - match type_: - case ContainerRegistryType.HARBOR2: - return PerProjectHarborQuotaClient() - case _: - raise GenericBadRequest( - f"{type_} does not support registry quota per project management." - ) - - async def create(self, scope_id: ProjectScope, quota: int) -> None: - registry_row = await self.repository.fetch_container_registry_row(scope_id) - await self.make_client(registry_row.type).create(registry_row, quota) - - async def update(self, scope_id: ProjectScope, quota: int) -> None: - registry_row = await self.repository.fetch_container_registry_row(scope_id) - await self.make_client(registry_row.type).update(registry_row, quota) - - async def delete(self, scope_id: ProjectScope) -> None: - registry_row = await self.repository.fetch_container_registry_row(scope_id) - await self.make_client(registry_row.type).delete(registry_row) - - async def read(self, scope_id: ProjectScope) -> int: - registry_row = await self.repository.fetch_container_registry_row(scope_id) - return await self.make_client(registry_row.type).read(registry_row) diff --git a/src/ai/backend/manager/api/services/BUILD b/src/ai/backend/manager/client/BUILD similarity index 100% rename from src/ai/backend/manager/api/services/BUILD rename to src/ai/backend/manager/client/BUILD diff --git a/src/ai/backend/manager/api/services/__init__.py b/src/ai/backend/manager/client/__init__.py similarity index 100% rename from src/ai/backend/manager/api/services/__init__.py rename to src/ai/backend/manager/client/__init__.py diff --git a/src/ai/backend/manager/api/services/container_registries/BUILD b/src/ai/backend/manager/client/container_registry/BUILD similarity index 100% rename from src/ai/backend/manager/api/services/container_registries/BUILD rename to src/ai/backend/manager/client/container_registry/BUILD diff --git a/src/ai/backend/manager/api/services/container_registries/__init__.py b/src/ai/backend/manager/client/container_registry/__init__.py similarity index 100% rename from src/ai/backend/manager/api/services/container_registries/__init__.py rename to src/ai/backend/manager/client/container_registry/__init__.py diff --git a/src/ai/backend/manager/client/container_registry/harbor.py b/src/ai/backend/manager/client/container_registry/harbor.py new file mode 100644 index 00000000000..5fc520a9588 --- /dev/null +++ b/src/ai/backend/manager/client/container_registry/harbor.py @@ -0,0 +1,174 @@ +from __future__ import annotations + +import abc +import logging +from typing import TYPE_CHECKING, Any, override + +import aiohttp +import yarl + +from ai.backend.logging import BraceStyleAdapter +from ai.backend.manager.api.exceptions import GenericBadRequest, InternalServerError, ObjectNotFound + +if TYPE_CHECKING: + from ai.backend.manager.service.container_registry.harbor import ( + HarborAuthArgs, + HarborProjectInfo, + HarborProjectQuotaInfo, + ) + +log = BraceStyleAdapter(logging.getLogger(__spec__.name)) + + +def _get_harbor_auth_args(auth_args: HarborAuthArgs) -> dict[str, Any]: + return {"auth": aiohttp.BasicAuth(auth_args["username"], auth_args["password"])} + + +class AbstractPerProjectRegistryQuotaClient(abc.ABC): + async def create_quota( + self, project_info: HarborProjectInfo, quota: int, auth_args: HarborAuthArgs + ) -> None: + raise NotImplementedError + + async def update_quota( + self, project_info: HarborProjectInfo, quota: int, auth_args: HarborAuthArgs + ) -> None: + raise NotImplementedError + + async def delete_quota( + self, project_info: HarborProjectInfo, auth_args: HarborAuthArgs + ) -> None: + raise NotImplementedError + + async def read_quota(self, project_info: HarborProjectInfo) -> int: + raise NotImplementedError + + +class PerProjectHarborQuotaClient(AbstractPerProjectRegistryQuotaClient): + async def _get_harbor_project_id( + self, + sess: aiohttp.ClientSession, + project_info: HarborProjectInfo, + rqst_args: dict[str, Any], + ) -> str: + get_project_id_api = ( + yarl.URL(project_info.url) / "api" / "v2.0" / "projects" / project_info.project + ) + + async with sess.get(get_project_id_api, allow_redirects=False, **rqst_args) as resp: + if resp.status != 200: + raise InternalServerError(f"Failed to get harbor project_id! response: {resp}") + + res = await resp.json() + harbor_project_id = res["project_id"] + return str(harbor_project_id) + + async def _get_quota_info( + self, + sess: aiohttp.ClientSession, + project_info: HarborProjectInfo, + rqst_args: dict[str, Any], + ) -> HarborProjectQuotaInfo: + from ...service.container_registry.harbor import HarborProjectQuotaInfo + + harbor_project_id = await self._get_harbor_project_id(sess, project_info, rqst_args) + get_quota_id_api = (yarl.URL(project_info.url) / "api" / "v2.0" / "quotas").with_query({ + "reference": "project", + "reference_id": harbor_project_id, + }) + + async with sess.get(get_quota_id_api, allow_redirects=False, **rqst_args) as resp: + if resp.status != 200: + raise InternalServerError(f"Failed to get quota info! response: {resp}") + + res = await resp.json() + if not res: + raise ObjectNotFound(object_name="quota entity") + if len(res) > 1: + raise InternalServerError( + f"Multiple quota entities found. (project_id: {harbor_project_id})" + ) + + previous_quota = res[0]["hard"]["storage"] + quota_id = res[0]["id"] + return HarborProjectQuotaInfo(previous_quota=previous_quota, quota_id=quota_id) + + @override + async def read_quota(self, project_info: HarborProjectInfo) -> int: + connector = aiohttp.TCPConnector(ssl=project_info.ssl_verify) + async with aiohttp.ClientSession(connector=connector) as sess: + rqst_args: dict[str, Any] = {} + quota_info = await self._get_quota_info(sess, project_info, rqst_args) + previous_quota = quota_info["previous_quota"] + if previous_quota == -1: + raise ObjectNotFound(object_name="quota entity") + return previous_quota + + @override + async def create_quota( + self, project_info: HarborProjectInfo, quota: int, auth_args: HarborAuthArgs + ) -> None: + connector = aiohttp.TCPConnector(ssl=project_info.ssl_verify) + async with aiohttp.ClientSession(connector=connector) as sess: + rqst_args = _get_harbor_auth_args(auth_args) + quota_info = await self._get_quota_info(sess, project_info, rqst_args) + previous_quota, quota_id = quota_info["previous_quota"], quota_info["quota_id"] + + if previous_quota > 0: + raise GenericBadRequest("Quota limit already exists!") + + put_quota_api = yarl.URL(project_info.url) / "api" / "v2.0" / "quotas" / str(quota_id) + payload = {"hard": {"storage": quota}} + + async with sess.put( + put_quota_api, json=payload, allow_redirects=False, **rqst_args + ) as resp: + if resp.status != 200: + log.error(f"Failed to create quota! response: {resp}") + raise InternalServerError(f"Failed to create quota! response: {resp}") + + @override + async def update_quota( + self, project_info: HarborProjectInfo, quota: int, auth_args: HarborAuthArgs + ) -> None: + connector = aiohttp.TCPConnector(ssl=project_info.ssl_verify) + async with aiohttp.ClientSession(connector=connector) as sess: + rqst_args = _get_harbor_auth_args(auth_args) + quota_info = await self._get_quota_info(sess, project_info, rqst_args) + previous_quota, quota_id = quota_info["previous_quota"], quota_info["quota_id"] + + if previous_quota == -1: + raise ObjectNotFound(object_name="quota entity") + + put_quota_api = yarl.URL(project_info.url) / "api" / "v2.0" / "quotas" / str(quota_id) + payload = {"hard": {"storage": quota}} + + async with sess.put( + put_quota_api, json=payload, allow_redirects=False, **rqst_args + ) as resp: + if resp.status != 200: + log.error(f"Failed to update quota! response: {resp}") + raise InternalServerError(f"Failed to update quota! response: {resp}") + + @override + async def delete_quota( + self, project_info: HarborProjectInfo, auth_args: HarborAuthArgs + ) -> None: + connector = aiohttp.TCPConnector(ssl=project_info.ssl_verify) + async with aiohttp.ClientSession(connector=connector) as sess: + rqst_args = _get_harbor_auth_args(auth_args) + quota_info = await self._get_quota_info(sess, project_info, rqst_args) + previous_quota, quota_id = quota_info["previous_quota"], quota_info["quota_id"] + + if previous_quota == -1: + raise ObjectNotFound(object_name="quota entity") + + put_quota_api = yarl.URL(project_info.url) / "api" / "v2.0" / "quotas" / str(quota_id) + payload = {"hard": {"storage": -1}} + + async with sess.put( + put_quota_api, json=payload, allow_redirects=False, **rqst_args + ) as resp: + if resp.status != 200: + log.error(f"Failed to delete quota! response: {resp}") + raise InternalServerError(f"Failed to delete quota! response: {resp}") diff --git a/src/ai/backend/manager/models/gql.py b/src/ai/backend/manager/models/gql.py index 4513f8fa9d3..033967202bc 100644 --- a/src/ai/backend/manager/models/gql.py +++ b/src/ai/backend/manager/models/gql.py @@ -11,8 +11,8 @@ from graphql import OperationType, Undefined from graphql.type import GraphQLField -from ai.backend.manager.api.services.base import ServicesContext from ai.backend.manager.plugin.network import NetworkPluginContext +from ai.backend.manager.service.base import ServicesContext set_input_object_type_default_value(Undefined) diff --git a/src/ai/backend/manager/models/gql_models/container_registry.py b/src/ai/backend/manager/models/gql_models/container_registry.py index 20184d8ce80..c5aa718fc0c 100644 --- a/src/ai/backend/manager/models/gql_models/container_registry.py +++ b/src/ai/backend/manager/models/gql_models/container_registry.py @@ -45,8 +45,10 @@ async def mutate( try: match scope_id: case ProjectScope(_): - await graph_ctx.services_ctx.per_project_container_registries_quota.create( - scope_id, int(quota) + await ( + graph_ctx.services_ctx.per_project_container_registries_quota.create_quota( + scope_id, int(quota) + ) ) case _: raise NotImplementedError("Only project scope is supported for now.") @@ -83,8 +85,10 @@ async def mutate( try: match scope_id: case ProjectScope(_): - await graph_ctx.services_ctx.per_project_container_registries_quota.update( - scope_id, int(quota) + await ( + graph_ctx.services_ctx.per_project_container_registries_quota.update_quota( + scope_id, int(quota) + ) ) case _: raise NotImplementedError("Only project scope is supported for now.") @@ -119,8 +123,10 @@ async def mutate( try: match scope_id: case ProjectScope(_): - await graph_ctx.services_ctx.per_project_container_registries_quota.delete( - scope_id + await ( + graph_ctx.services_ctx.per_project_container_registries_quota.delete_quota( + scope_id + ) ) case _: raise NotImplementedError("Only project scope is supported for now.") diff --git a/src/ai/backend/manager/models/gql_models/group.py b/src/ai/backend/manager/models/gql_models/group.py index 48592e5ce28..780b85968cd 100644 --- a/src/ai/backend/manager/models/gql_models/group.py +++ b/src/ai/backend/manager/models/gql_models/group.py @@ -217,7 +217,9 @@ async def resolve_user_nodes( async def resolve_registry_quota(self, info: graphene.ResolveInfo) -> int: graph_ctx: GraphQueryContext = info.context scope_id = ProjectScope(project_id=self.id, domain_name=None) - return await graph_ctx.services_ctx.per_project_container_registries_quota.read(scope_id) + return await graph_ctx.services_ctx.per_project_container_registries_quota.read_quota( + scope_id + ) @classmethod async def get_node(cls, info: graphene.ResolveInfo, id) -> Self: diff --git a/src/ai/backend/manager/server.py b/src/ai/backend/manager/server.py index 8b01a21c5eb..1a39fead1d1 100644 --- a/src/ai/backend/manager/server.py +++ b/src/ai/backend/manager/server.py @@ -62,8 +62,12 @@ from ai.backend.common.types import AgentSelectionStrategy, HostPortPair from ai.backend.common.utils import env_info from ai.backend.logging import BraceStyleAdapter, Logger, LogLevel -from ai.backend.manager.api.services.base import ServicesContext from ai.backend.manager.plugin.network import NetworkPluginContext +from ai.backend.manager.service.base import ServicesContext +from ai.backend.manager.service.container_registry.base import PerProjectRegistryQuotaRepository +from ai.backend.manager.service.container_registry.harbor import ( + PerProjectContainerRegistryQuotaService, +) from . import __version__ from .agent_cache import AgentRPCCache @@ -696,7 +700,15 @@ async def _force_terminate_hanging_sessions( @actxmgr async def services_ctx(root_ctx: RootContext) -> AsyncIterator[None]: - root_ctx.services_ctx = ServicesContext(root_ctx.db) + db = root_ctx.db + + per_project_container_registries_quota = PerProjectContainerRegistryQuotaService( + repository=PerProjectRegistryQuotaRepository(db) + ) + + root_ctx.services_ctx = ServicesContext( + per_project_container_registries_quota, + ) yield None @@ -919,9 +931,6 @@ async def _call_cleanup_context_shutdown_handlers(app: web.Application) -> None: if pidx == 0: log.info("Loading module: {0}", pkg_name[1:]) subapp_mod = importlib.import_module(pkg_name, "ai.backend.manager.api") - - if pkg_name == ".service": - continue init_subapp(pkg_name, app, getattr(subapp_mod, "create_app")) vendor_path = importlib.resources.files("ai.backend.manager.vendor") diff --git a/src/ai/backend/manager/service/BUILD b/src/ai/backend/manager/service/BUILD new file mode 100644 index 00000000000..73574424040 --- /dev/null +++ b/src/ai/backend/manager/service/BUILD @@ -0,0 +1 @@ +python_sources(name="src") diff --git a/src/ai/backend/manager/service/__init__.py b/src/ai/backend/manager/service/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/ai/backend/manager/service/base.py b/src/ai/backend/manager/service/base.py new file mode 100644 index 00000000000..bd4a22aa21a --- /dev/null +++ b/src/ai/backend/manager/service/base.py @@ -0,0 +1,18 @@ +from .container_registry.harbor import ( + PerProjectContainerRegistryQuota, +) + + +class ServicesContext: + """ + In the API layer, requests are processed through the ServicesContext and + its subordinate layers, including the DB, Client, and Repository layers. + Each layer separates the responsibilities specific to its respective level. + """ + + per_project_container_registries_quota: PerProjectContainerRegistryQuota + + def __init__( + self, per_project_container_registries_quota: PerProjectContainerRegistryQuota + ) -> None: + self.per_project_container_registries_quota = per_project_container_registries_quota diff --git a/src/ai/backend/manager/service/container_registry/BUILD b/src/ai/backend/manager/service/container_registry/BUILD new file mode 100644 index 00000000000..73574424040 --- /dev/null +++ b/src/ai/backend/manager/service/container_registry/BUILD @@ -0,0 +1 @@ +python_sources(name="src") diff --git a/src/ai/backend/manager/service/container_registry/__init__.py b/src/ai/backend/manager/service/container_registry/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/ai/backend/manager/api/services/container_registries/base.py b/src/ai/backend/manager/service/container_registry/base.py similarity index 59% rename from src/ai/backend/manager/api/services/container_registries/base.py rename to src/ai/backend/manager/service/container_registry/base.py index ce0abd4dde3..f9567e2cdf8 100644 --- a/src/ai/backend/manager/api/services/container_registries/base.py +++ b/src/ai/backend/manager/service/container_registry/base.py @@ -1,11 +1,15 @@ from __future__ import annotations +import abc import logging -from typing import TYPE_CHECKING +import uuid +from dataclasses import dataclass +from typing import Any, override import sqlalchemy as sa from sqlalchemy.orm import load_only +from ai.backend.common.container_registry import ContainerRegistryType from ai.backend.logging import BraceStyleAdapter from ai.backend.manager.api.exceptions import ( ContainerRegistryNotFound, @@ -15,15 +19,31 @@ from ai.backend.manager.models.rbac import ProjectScope from ai.backend.manager.models.utils import ExtendedAsyncSAEngine -if TYPE_CHECKING: - pass - log = BraceStyleAdapter(logging.getLogger(__spec__.name)) -class PerProjectRegistryQuotaRepository: - """ """ +@dataclass +class ContainerRegistryRowInfo: + id: uuid.UUID + url: str + registry_name: str + type: ContainerRegistryType + project: str + username: str + password: str + ssl_verify: bool + is_global: bool + extra: dict[str, Any] + + +class AbstractPerProjectRegistryQuotaRepository(abc.ABC): + async def fetch_container_registry_row( + self, scope_id: ProjectScope + ) -> ContainerRegistryRowInfo: + raise NotImplementedError + +class PerProjectRegistryQuotaRepository(AbstractPerProjectRegistryQuotaRepository): def __init__(self, db: ExtendedAsyncSAEngine): self.db = db @@ -36,7 +56,10 @@ def _is_valid_group_row(cls, group_row: GroupRow) -> bool: and "project" in group_row.container_registry ) - async def fetch_container_registry_row(self, scope_id: ProjectScope) -> ContainerRegistryRow: + @override + async def fetch_container_registry_row( + self, scope_id: ProjectScope + ) -> ContainerRegistryRowInfo: async with self.db.begin_readonly_session() as db_sess: project_id = scope_id.project_id group_query = ( @@ -49,7 +72,7 @@ async def fetch_container_registry_row(self, scope_id: ProjectScope) -> Containe if not PerProjectRegistryQuotaRepository._is_valid_group_row(group_row): raise ContainerRegistryNotFound( - f"Container registry info does not exist or is invalid in the group. (gr: {project_id})" + f"Container registry info does not exist or is invalid in the group. (group: {project_id})" ) registry_name, project = ( @@ -67,7 +90,18 @@ async def fetch_container_registry_row(self, scope_id: ProjectScope) -> Containe if not registry: raise ContainerRegistryNotFound( - f"Specified container registry row not found. (cr: {registry_name}, gr: {project})" + f"Container registry row not found. (registry: {registry_name}, group: {project})" ) - return registry + return ContainerRegistryRowInfo( + id=registry.id, + url=registry.url, + registry_name=registry.registry_name, + type=registry.type, + project=registry.project, + username=registry.username, + password=registry.password, + ssl_verify=registry.ssl_verify, + is_global=registry.is_global, + extra=registry.extra, + ) diff --git a/src/ai/backend/manager/service/container_registry/harbor.py b/src/ai/backend/manager/service/container_registry/harbor.py new file mode 100644 index 00000000000..f74ec4d9c49 --- /dev/null +++ b/src/ai/backend/manager/service/container_registry/harbor.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +import abc +import logging +from dataclasses import dataclass +from typing import TypedDict, override + +from ai.backend.common.container_registry import ContainerRegistryType +from ai.backend.logging import BraceStyleAdapter +from ai.backend.manager.api.exceptions import GenericBadRequest +from ai.backend.manager.client.container_registry.harbor import ( + AbstractPerProjectRegistryQuotaClient, + PerProjectHarborQuotaClient, +) +from ai.backend.manager.models.rbac import ProjectScope +from ai.backend.manager.service.container_registry.base import ( + ContainerRegistryRowInfo, + PerProjectRegistryQuotaRepository, +) + +log = BraceStyleAdapter(logging.getLogger(__spec__.name)) + + +@dataclass +class HarborProjectInfo: + url: str + project: str + ssl_verify: bool + + +class HarborAuthArgs(TypedDict): + username: str + password: str + + +class HarborProjectQuotaInfo(TypedDict): + previous_quota: int + quota_id: int + + +class PerProjectContainerRegistryQuota(abc.ABC): + async def create_quota(self, scope_id: ProjectScope, quota: int) -> None: + raise NotImplementedError + + async def update_quota(self, scope_id: ProjectScope, quota: int) -> None: + raise NotImplementedError + + async def delete_quota(self, scope_id: ProjectScope) -> None: + raise NotImplementedError + + async def read_quota(self, scope_id: ProjectScope) -> int: + raise NotImplementedError + + +class PerProjectContainerRegistryQuotaService(PerProjectContainerRegistryQuota): + repository: PerProjectRegistryQuotaRepository + + def __init__(self, repository: PerProjectRegistryQuotaRepository): + self.repository = repository + + def _registry_row_to_harbor_project_info( + self, registry_info: ContainerRegistryRowInfo + ) -> HarborProjectInfo: + return HarborProjectInfo( + url=registry_info.url, + project=registry_info.project, + ssl_verify=registry_info.ssl_verify, + ) + + def _make_client(self, type_: ContainerRegistryType) -> AbstractPerProjectRegistryQuotaClient: + match type_: + case ContainerRegistryType.HARBOR2: + return PerProjectHarborQuotaClient() + case _: + raise GenericBadRequest( + f"{type_} does not support registry quota per project management." + ) + + @override + async def create_quota(self, scope_id: ProjectScope, quota: int) -> None: + registry_info = await self.repository.fetch_container_registry_row(scope_id) + project_info = self._registry_row_to_harbor_project_info(registry_info) + credential = HarborAuthArgs( + username=registry_info.username, password=registry_info.password + ) + await self._make_client(registry_info.type).create_quota(project_info, quota, credential) + + @override + async def update_quota(self, scope_id: ProjectScope, quota: int) -> None: + registry_info = await self.repository.fetch_container_registry_row(scope_id) + project_info = self._registry_row_to_harbor_project_info(registry_info) + credential = HarborAuthArgs( + username=registry_info.username, password=registry_info.password + ) + await self._make_client(registry_info.type).update_quota(project_info, quota, credential) + + @override + async def delete_quota(self, scope_id: ProjectScope) -> None: + registry_info = await self.repository.fetch_container_registry_row(scope_id) + project_info = self._registry_row_to_harbor_project_info(registry_info) + credential = HarborAuthArgs( + username=registry_info.username, password=registry_info.password + ) + await self._make_client(registry_info.type).delete_quota(project_info, credential) + + @override + async def read_quota(self, scope_id: ProjectScope) -> int: + registry_info = await self.repository.fetch_container_registry_row(scope_id) + project_info = self._registry_row_to_harbor_project_info(registry_info) + return await self._make_client(registry_info.type).read_quota(project_info)