Skip to content

Commit

Permalink
GET transfer to DISKTAPE:
Browse files Browse the repository at this point in the history
  • Loading branch information
nmassey001 committed Oct 17, 2024
1 parent b644d58 commit 1751b2e
Show file tree
Hide file tree
Showing 14 changed files with 827 additions and 703 deletions.
5 changes: 5 additions & 0 deletions nlds/nlds_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@

API_VERSION = "0.1"
CONFIG_FILE_LOCATION = "/etc/nlds/server_config"
USE_DISKTAPE = True
if USE_DISKTAPE:
DISKTAPE_LOC = "~/DISKTAPE"
else:
DISKTAPE_LOC = None
2 changes: 1 addition & 1 deletion nlds/rabbit/message_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
JOB_LABEL = "job_label"
DATA = "data"
FILELIST = "filelist"
RETRIEVAL_FILELIST = "retrieval_dict"
RETRIEVAL_DICT = "retrieval_dict"
TRANSACTIONS = "transactions"
LOG_TARGET = "log_target"
LOG_MESSAGE = "log_message"
Expand Down
10 changes: 9 additions & 1 deletion nlds/rabbit/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,11 +448,19 @@ def _log(self, log_message: str, log_level: str, target: str, **kwargs) -> None:
self.publish_message(routing_key, message)

def log(
self, log_message: str, log_level: str, target: str = None, **kwargs
self,
log_message: str,
log_level: str,
target: str = None,
body_json: str = None,
**kwargs,
) -> None:
# Attempt to log to publisher's name
if not target:
target = self.name
# convert string json to nice formatted json and append to message
if body_json:
log_message += f"\n{json.dumps(body_json, indent=4)}\n"
self._log(log_message, log_level, target, **kwargs)

def create_log_message(
Expand Down
19 changes: 19 additions & 0 deletions nlds/routers/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ async def get(
target: Optional[str] = None,
job_label: Optional[str] = None,
tenancy: Optional[str] = None,
label: Optional[str] = None,
holding_id: Optional[int] = None,
tag: Optional[str] = None,
access_key: str = "",
secret_key: str = "",
):
Expand Down Expand Up @@ -156,6 +159,22 @@ async def get(
if tenancy:
response.tenancy = tenancy

# add the metadata
meta_dict = {}
if label:
meta_dict[MSG.LABEL] = label
response.label = label
if holding_id:
meta_dict[MSG.HOLDING_ID] = holding_id
response.holding_id = holding_id
if tag:
tag_dict = tag.strip('').split(':')
meta_dict[MSG.TAG] = {tag_dict[0]:tag_dict[1]}
response.tag = tag_dict

if len(meta_dict) > 0:
msg_dict[MSG.META] = meta_dict

rabbit_publisher.publish_message(routing_key, msg_dict)
return JSONResponse(status_code=status.HTTP_202_ACCEPTED, content=response.json())

Expand Down
24 changes: 16 additions & 8 deletions nlds_processors/archiver/archive_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
__contact__ = "[email protected]"

from abc import ABC, abstractmethod
import json
from typing import List, Dict, Tuple
from enum import Enum
from typing import List, Dict, Tuple, Any

from nlds_processors.transferers.base_transfer import BaseTransferConsumer
from nlds_processors.utils.aggregations import bin_files
Expand All @@ -36,13 +34,11 @@ class BaseArchiveConsumer(BaseTransferConsumer, ABC):
_TAPE_POOL = "tape_pool"
_TAPE_URL = "tape_url"
_CHUNK_SIZE = "chunk_size"
_QUERY_CHECKSUM = "query_checksum_fl"
_PRINT_TRACEBACKS = "print_tracebacks_fl"
ARCHIVE_CONSUMER_CONFIG = {
_TAPE_POOL: None,
_TAPE_URL: None,
_CHUNK_SIZE: 5 * (1024**2), # Default to 5 MiB
_QUERY_CHECKSUM: True,
_PRINT_TRACEBACKS: False,
}
DEFAULT_CONSUMER_CONFIG = (
Expand All @@ -55,7 +51,6 @@ def __init__(self, queue=DEFAULT_QUEUE_NAME):
self.tape_pool = self.load_config_value(self._TAPE_POOL)
self.tape_url = self.load_config_value(self._TAPE_URL)
self.chunk_size = int(self.load_config_value(self._CHUNK_SIZE))
self.query_checksum_fl = self.load_config_value(self._QUERY_CHECKSUM)
self.reset()

def callback(self, ch, method, properties, body, connection):
Expand All @@ -72,7 +67,7 @@ def callback(self, ch, method, properties, body, connection):
self.log(
"Aggregating filelist into appropriately sized sub-lists for each "
"Aggregation",
RK.LOG_INFO
RK.LOG_INFO,
)
# Make a new routing key which returns message to this queue
rk_transfer_start = ".".join([self.rk_parts[0], self.rk_parts[1], RK.START])
Expand Down Expand Up @@ -110,7 +105,6 @@ def callback(self, ch, method, properties, body, connection):
else:
raise ArchiveError(f"Unknown routing key {self.rk_parts[2]}")


def get_tape_config(self, body_dict) -> Tuple:
"""Convenience function to extract tape relevant config from the message
details section. Currently this is just the tape
Expand All @@ -135,6 +129,20 @@ def get_tape_config(self, body_dict) -> Tuple:

return tape_url

@classmethod
def get_holding_prefix(cls, body: Dict[str, Any], holding_id: int=-1) -> str:
"""Get the uneditable holding information from the message body to
reproduce the holding prefix made in the catalog"""
try:
if holding_id == -1:
holding_id = body[MSG.META][MSG.HOLDING_ID]
user = body[MSG.DETAILS][MSG.USER]
group = body[MSG.DETAILS][MSG.GROUP]
except KeyError as e:
raise ArchiveError(f"Could not make holding prefix, original error: {e}")

return f"nlds.{holding_id}.{user}.{group}"

@abstractmethod
def transfer(
self,
Expand Down
Loading

0 comments on commit 1751b2e

Please sign in to comment.