diff --git a/nlds/main.py b/nlds/main.py index 18823a04..76892e6a 100644 --- a/nlds/main.py +++ b/nlds/main.py @@ -12,7 +12,7 @@ from .nlds_setup import API_VERSION -from .routers import list, files, probe, status, find, meta, system, init +from .routers import list, files, probe, status, find, meta, system, init, quota nlds = FastAPI() @@ -56,4 +56,9 @@ init.router, tags = ["init", ], prefix = PREFIX + "/init" +) +nlds.include_router( + quota.router, + tags = ["quota", ], + prefix = PREFIX + "/quota" ) \ No newline at end of file diff --git a/nlds/rabbit/publisher.py b/nlds/rabbit/publisher.py index 7f39e29d..6f7ad09c 100644 --- a/nlds/rabbit/publisher.py +++ b/nlds/rabbit/publisher.py @@ -50,6 +50,7 @@ class RabbitMQPublisher(): RK_STAT = "stat" RK_FIND = "find" RK_META = "meta" + RK_QUOTA = "quota" # Exchange routing key parts – root RK_ROOT = "nlds-api" diff --git a/nlds/routers/quota.py b/nlds/routers/quota.py new file mode 100644 index 00000000..3c74b387 --- /dev/null +++ b/nlds/routers/quota.py @@ -0,0 +1,62 @@ +""" + +""" + +from fastapi import Depends, APIRouter, status +from fastapi.exceptions import HTTPException +from fastapi.responses import JSONResponse +from pydantic import BaseModel +import json + +from typing import Optional, List, Dict + +from ..rabbit.publisher import RabbitMQPublisher as RMQP +from ..errors import ResponseError +from ..authenticators.authenticate_methods import authenticate_token, \ + authenticate_group, \ + authenticate_user + +router = APIRouter() + +class QuotaResponse(BaseModel): + quota: int + +############################ GET METHOD ############################ +@router.get("/", + status_code = status.HTTP_202_ACCEPTED, + responses = { + status.HTTPS_202_ACCEPTED: {"model": QuotaResponse}, + status.HTTP_400_BAD_REQUEST: {"model": ResponseError}, + status.HTTP_401_UNAUTHORIZED: {"model": ResponseError}, + status.HTTP_403_FORBIDDEN: {"model": ResponseError}, + status.HTTP_404_NOT_FOUND: {"model": ResponseError}, + status.HTTP_504_GATEWAY: {"model": ResponseError}, + } + ) +async def get(token: str = Depends(authenticate_token), + user: str = Depends(authenticate_user), + group: str = Depends(authenticate_group), + label: Optional[str] = None, + holding_id: Optional[int] = None, + tag: Optional[str] = None + ): + # create the message dictionary + + routing_key = f"{RMQP.RK_ROOT}.{RMQP.RK_ROUTE}.{RMQP.RK_QUOTA}" + api_action = f"{RMQP.RK_QUOTA}" + msg_dict = { + RMQP.MSG_DETAILS: { + RMQP.MSG_USER: user, + RMQP.MSG_GROUP: group, + }, + RMQP.MSG_DATA: {}, + RMQP.MSG_TYPE: RMQP.MSG_TYPE_STANDARD + } + # add the metadata + meta_dict = {} + if (label): + meta_dict[RMQP.MSG_LABEL] = label + if (holding_id): + meta_dict[RMQP.MSG_HOLDING_ID] = holding_id + if (tag): + # convert the string into a dictionary \ No newline at end of file diff --git a/nlds_processors/catalog/catalog.py b/nlds_processors/catalog/catalog.py index f65a8027..dd44721c 100644 --- a/nlds_processors/catalog/catalog.py +++ b/nlds_processors/catalog/catalog.py @@ -4,6 +4,11 @@ from sqlalchemy import func, Enum from sqlalchemy.exc import IntegrityError, OperationalError, ArgumentError, \ NoResultFound +from retry import retry +import requests +import json +from nlds.server_config import load_config +from nlds.utils.construct_url import construct_url from nlds_processors.catalog.catalog_models import CatalogBase, File, Holding,\ Location, Transaction, Aggregation, Storage, Tag @@ -874,4 +879,78 @@ def get_unarchived_files(self, holding: Holding) -> List[File]: f"Couldn't find unarchived files for holding with " f"id:{holding.id}" ) - return unarchived_files \ No newline at end of file + return unarchived_files + + @retry(requests.ConnectTimeout, tries=5, delay=1, backoff=2) + def get_projects_services(self, oauth_token: str, service_name): + """Make a call to the JASMIN Projects Portal to get the service information.""" + self.config = load_config() + self.name = "jasmin_authenticator" + self.auth_name = "authentication" + self._timeout = 10.0 + + config = self.config[self.auth_name][self.name] + token_headers = { + "Content-Type": "application/x-ww-form-urlencoded", + "cache-control": "no-cache", + "Authorization": f"Bearer {oauth_token}", + } + # Contact the user_services_url to get the information about the services + url = construct_url([config["user_services_urk"]], {"name": {service_name}}) + try: + response = requests.get( + url, + headers=token_headers, + timeout=self._timeout, + ) + except requests.exceptions.ConnectionError: + raise RuntimeError(f"User services url {url} could not be reached.") + except KeyError: + raise RuntimeError(f"Could not find 'user_services_url' key in the {self.name} section of the .server_config file.") + if response.status_code == requests.codes.ok: # status code 200 + try: + response_json = json.loads(response.text) + return response_json + except json.JSONDecodeError: + raise RuntimeError(f"Invalid JSON returned from the user services url: {url}") + else: + raise RuntimeError(f"Error getting data for {service_name}") + + def extract_tape_quota(self, oauth_token: str, service_name): + """Get the service information then process it to extract the quota for the service.""" + try: + result = self.get_projects_services(self, oauth_token, service_name) + except (RuntimeError, ValueError) as e: + raise type(e)(f"Error getting information for {service_name}: {e}") + + # Process the result to get the requirements + for attr in result: + # Check that the category is Group Workspace + if attr["category"] == 1: + # Check that there are requirements, otherwise throw an error + if attr["requirements"]: + requirements = attr["requirements"] + else: + raise ValueError(f"Cannot find any requirements for {service_name}.") + else: + raise ValueError(f"Cannot find a Group Workspace with the name {service_name}. Check the category.") + + # Go through the requirements to find the tape resource requirement + for requirement in requirements: + # Only return provisioned requirements + if requirement["status"] == 50: + # Find the tape resource and get its quota + if requirement["resource"]["short_name"] == "tape": + try: + tape_quota = requirement["amount"] + if tape_quota: + return tape_quota + else: + raise ValueError(f"Issue getting tape quota for {service_name}. Quota is zero.") + except KeyError: + raise KeyError(f"Issue getting tape quota for {service_name}. No 'value' field exists.") + else: + raise ValueError(f"No tape resources could be found for {service_name}") + else: + raise ValueError(f"No provisioned requirements found for {service_name}.Check the status of your requested resources.") + \ No newline at end of file diff --git a/nlds_processors/catalog/catalog_worker.py b/nlds_processors/catalog/catalog_worker.py index 403fcaf7..5eff399b 100644 --- a/nlds_processors/catalog/catalog_worker.py +++ b/nlds_processors/catalog/catalog_worker.py @@ -1850,7 +1850,23 @@ def _catalog_meta(self, body: Dict, properties: Header) -> None: msg_dict=body, exchange={'name': ''}, correlation_id=properties.correlation_id - ) + ) + + def _catalog_quota(self, body: Dict, properties: Header) -> None: + """Return the users quota for the given service.""" + message_vars = self._parse_user_vars(body) + if message_vars is None: + # Check if any problems have occured in the parsing of the message + # body and exit if necessary + self.log("Could not parse one or more mandatory variables, exiting" + "callback", self.RK_LOG_ERROR) + return + else: + # Unpack if no problems found in parsing + user, group = message_vars + + try: + group_quota = def attach_database(self, create_db_fl: bool = True): @@ -2024,6 +2040,10 @@ def callback(self, ch: Channel, method: Method, properties: Header, elif (api_method == self.RK_STAT): self._catalog_stat(body, properties) + elif (api_method == self.RK_QUOTA): + # don't need to split any routing key for an RPC method + self._catalog_quota(body, properties) + # If received system test message, reply to it (this is for system status check) elif api_method == "system_stat": if properties.correlation_id is not None and properties.correlation_id != self.channel.consumer_tags[0]: