diff --git a/nlds/nlds_setup.py b/nlds/nlds_setup.py index 5518f250..8c06116d 100644 --- a/nlds/nlds_setup.py +++ b/nlds/nlds_setup.py @@ -11,3 +11,8 @@ API_VERSION = "0.1" CONFIG_FILE_LOCATION = "/etc/nlds/server_config" +USE_DISKTAPE = True +if USE_DISKTAPE: + DISKTAPE_LOC = "~/DISKTAPE" +else: + DISKTAPE_LOC = None \ No newline at end of file diff --git a/nlds/rabbit/message_keys.py b/nlds/rabbit/message_keys.py index 6124a23c..e69be892 100644 --- a/nlds/rabbit/message_keys.py +++ b/nlds/rabbit/message_keys.py @@ -28,7 +28,7 @@ JOB_LABEL = "job_label" DATA = "data" FILELIST = "filelist" -RETRIEVAL_FILELIST = "retrieval_dict" +RETRIEVAL_DICT = "retrieval_dict" TRANSACTIONS = "transactions" LOG_TARGET = "log_target" LOG_MESSAGE = "log_message" diff --git a/nlds/rabbit/publisher.py b/nlds/rabbit/publisher.py index 8d7fd07c..91bfe973 100644 --- a/nlds/rabbit/publisher.py +++ b/nlds/rabbit/publisher.py @@ -448,11 +448,19 @@ def _log(self, log_message: str, log_level: str, target: str, **kwargs) -> None: self.publish_message(routing_key, message) def log( - self, log_message: str, log_level: str, target: str = None, **kwargs + self, + log_message: str, + log_level: str, + target: str = None, + body_json: str = None, + **kwargs, ) -> None: # Attempt to log to publisher's name if not target: target = self.name + # convert string json to nice formatted json and append to message + if body_json: + log_message += f"\n{json.dumps(body_json, indent=4)}\n" self._log(log_message, log_level, target, **kwargs) def create_log_message( diff --git a/nlds/routers/files.py b/nlds/routers/files.py index d79bfa1a..d9e3565f 100644 --- a/nlds/routers/files.py +++ b/nlds/routers/files.py @@ -102,6 +102,9 @@ async def get( target: Optional[str] = None, job_label: Optional[str] = None, tenancy: Optional[str] = None, + label: Optional[str] = None, + holding_id: Optional[int] = None, + tag: Optional[str] = None, access_key: str = "", secret_key: str = "", ): @@ -156,6 +159,22 @@ async def get( if tenancy: response.tenancy = tenancy + # add the metadata + meta_dict = {} + if label: + meta_dict[MSG.LABEL] = label + response.label = label + if holding_id: + meta_dict[MSG.HOLDING_ID] = holding_id + response.holding_id = holding_id + if tag: + tag_dict = tag.strip('').split(':') + meta_dict[MSG.TAG] = {tag_dict[0]:tag_dict[1]} + response.tag = tag_dict + + if len(meta_dict) > 0: + msg_dict[MSG.META] = meta_dict + rabbit_publisher.publish_message(routing_key, msg_dict) return JSONResponse(status_code=status.HTTP_202_ACCEPTED, content=response.json()) diff --git a/nlds_processors/archiver/archive_base.py b/nlds_processors/archiver/archive_base.py index 457aed51..bf4b3ec6 100644 --- a/nlds_processors/archiver/archive_base.py +++ b/nlds_processors/archiver/archive_base.py @@ -11,9 +11,7 @@ __contact__ = "neil.massey@stfc.ac.uk" from abc import ABC, abstractmethod -import json -from typing import List, Dict, Tuple -from enum import Enum +from typing import List, Dict, Tuple, Any from nlds_processors.transferers.base_transfer import BaseTransferConsumer from nlds_processors.utils.aggregations import bin_files @@ -36,13 +34,11 @@ class BaseArchiveConsumer(BaseTransferConsumer, ABC): _TAPE_POOL = "tape_pool" _TAPE_URL = "tape_url" _CHUNK_SIZE = "chunk_size" - _QUERY_CHECKSUM = "query_checksum_fl" _PRINT_TRACEBACKS = "print_tracebacks_fl" ARCHIVE_CONSUMER_CONFIG = { _TAPE_POOL: None, _TAPE_URL: None, _CHUNK_SIZE: 5 * (1024**2), # Default to 5 MiB - _QUERY_CHECKSUM: True, _PRINT_TRACEBACKS: False, } DEFAULT_CONSUMER_CONFIG = ( @@ -55,7 +51,6 @@ def __init__(self, queue=DEFAULT_QUEUE_NAME): self.tape_pool = self.load_config_value(self._TAPE_POOL) self.tape_url = self.load_config_value(self._TAPE_URL) self.chunk_size = int(self.load_config_value(self._CHUNK_SIZE)) - self.query_checksum_fl = self.load_config_value(self._QUERY_CHECKSUM) self.reset() def callback(self, ch, method, properties, body, connection): @@ -72,7 +67,7 @@ def callback(self, ch, method, properties, body, connection): self.log( "Aggregating filelist into appropriately sized sub-lists for each " "Aggregation", - RK.LOG_INFO + RK.LOG_INFO, ) # Make a new routing key which returns message to this queue rk_transfer_start = ".".join([self.rk_parts[0], self.rk_parts[1], RK.START]) @@ -110,7 +105,6 @@ def callback(self, ch, method, properties, body, connection): else: raise ArchiveError(f"Unknown routing key {self.rk_parts[2]}") - def get_tape_config(self, body_dict) -> Tuple: """Convenience function to extract tape relevant config from the message details section. Currently this is just the tape @@ -135,6 +129,20 @@ def get_tape_config(self, body_dict) -> Tuple: return tape_url + @classmethod + def get_holding_prefix(cls, body: Dict[str, Any], holding_id: int=-1) -> str: + """Get the uneditable holding information from the message body to + reproduce the holding prefix made in the catalog""" + try: + if holding_id == -1: + holding_id = body[MSG.META][MSG.HOLDING_ID] + user = body[MSG.DETAILS][MSG.USER] + group = body[MSG.DETAILS][MSG.GROUP] + except KeyError as e: + raise ArchiveError(f"Could not make holding prefix, original error: {e}") + + return f"nlds.{holding_id}.{user}.{group}" + @abstractmethod def transfer( self, diff --git a/nlds_processors/archiver/archive_get.py b/nlds_processors/archiver/archive_get.py index c6558848..b1d801b0 100644 --- a/nlds_processors/archiver/archive_get.py +++ b/nlds_processors/archiver/archive_get.py @@ -11,91 +11,39 @@ __contact__ = "neil.massey@stfc.ac.uk" from typing import List, Dict, Tuple, Any -import json -from importlib.metadata import version -import tarfile -from tarfile import TarFile -from datetime import timedelta - -from minio import Minio -from minio.helpers import ObjectWriteResult +import os from minio.error import S3Error from retry import retry -from urllib3.exceptions import HTTPError - -# ignore_xrootd is so that we can test the message flow and preparation without -# invoking an xrootd client -ignore_xrootd = False -try: - from XRootD import client as XRDClient - from XRootD.client.flags import ( - DirListFlags, - PrepareFlags, - OpenFlags, - QueryCode, - ) -except ModuleNotFoundError: - ignore_xrootd = True from nlds_processors.archiver.archive_base import ( BaseArchiveConsumer, ArchiveError, ) -from nlds_processors.archiver.adler32file import Adler32File + +from nlds.nlds_setup import USE_DISKTAPE, DISKTAPE_LOC + +if USE_DISKTAPE: + from nlds_processors.archiver.s3_to_tarfile_disk import S3ToTarfileDisk +else: + from nlds_processors.archiver.s3_to_tarfile_tape import S3ToTarfileTape + +from nlds_processors.archiver.s3_to_tarfile_stream import S3StreamError from nlds.rabbit.consumer import State from nlds.details import PathDetails -from nlds.errors import CallbackError - import nlds.rabbit.routing_keys as RK import nlds.rabbit.message_keys as MSG -class TarError(Exception): - """Exception class to distinguish problems with whole tar (i.e. missing - files) from other problems (i.e. broken buckets) - """ - - pass - - -class TarMemberError(Exception): - """Exception class to distinguish problems with individual tar members (i.e. - broken bucket, ) from larger tar-related problems - """ - - pass - - -class XRootDError(Exception): - """Exception class to distinguish the specific problem of a prepared file - not being able to be read by xrootd - """ - - pass - - -class FileAlreadyRetrieved(Exception): - """Exception class to distinguish the specific situation where a file has - already been retrieved and exists on the object store - """ - - pass - - class GetArchiveConsumer(BaseArchiveConsumer): DEFAULT_QUEUE_NAME = "archive_get_q" DEFAULT_ROUTING_KEY = f"{RK.ROOT}." f"{RK.TRANSFER_PUT}." f"{RK.WILD}" DEFAULT_STATE = State.ARCHIVE_GETTING - DEFAULT_CONSUMER_CONFIG = ( - BaseArchiveConsumer.DEFAULT_CONSUMER_CONFIG - ) - def __init__(self, queue=DEFAULT_QUEUE_NAME): super().__init__(queue=queue) - + @retry(S3Error, tries=5, delay=1, logger=None) def transfer( self, transaction_id: str, @@ -111,599 +59,682 @@ def transfer( rk_complete = ".".join([rk_origin, RK.ARCHIVE_GET, RK.COMPLETE]) rk_failed = ".".join([rk_origin, RK.ARCHIVE_GET, RK.FAILED]) - # let's reject all requests - for path_details in filelist: - path_details.failure_reason = "Testing reject" - self.failedlist.append(path_details) - - if len(self.failedlist) > 0: - # Send message back to worker so catalog can be scrubbed of failed puts - self.send_pathlist( - self.failedlist, - rk_failed, - body_json, - state=State.FAILED, - ) - return - - # print(rk_origin) - # retrieval_json = body_json[MSG.DATA][MSG.RETRIEVAL_FILELIST] - # print(retrieval_json) - # raise NotImplementedError - - # Can call this as the url has been verified previously - tape_server, tape_base_dir = self.split_tape_url(tape_url) - + # create the S3 to tape or disk streamer try: - raw_rd = dict(body_json[MSG.DATA][MSG.RETRIEVAL_FILELIST]) - retrieval_dict = { - tarname: [PathDetails.from_dict(pd_dict) for pd_dict in pd_dicts] - for tarname, pd_dicts in raw_rd.items() - } - except TypeError as e: - self.log( - "Failed to reformat retrieval filelist into PathDetails " - "objects. Retrievallist in message does not appear to be " - "in the correct format.", - RK.LOG_ERROR, - ) - raise e - - # Declare useful variables - bucket_name = None - rk_complete = ".".join([rk_origin, RK.ARCHIVE_GET, RK.COMPLETE]) - rk_retry = ".".join([rk_origin, RK.ARCHIVE_GET, RK.START]) - rk_failed = ".".join([rk_origin, RK.ARCHIVE_GET, RK.FAILED]) - - try: - prepare_id = body_json[MSG.DATA][MSG.PREPARE_ID] - except KeyError: - self.log( - "Could not get prepare_id from message info, continuing " "without", - RK.LOG_INFO, - ) - prepare_id = None - - # Create minio client - s3_client = Minio( - tenancy, - access_key=access_key, - secret_key=secret_key, - secure=self.require_secure_fl, - ) - - # Create the FileSystem client at this point to verify the tape_base_dir - fs_client = XRDClient.FileSystem(f"root://{tape_server}") - # Attempt to verify that the base-directory exists - self.verify_tape_server(fs_client, tape_server, tape_base_dir) - - # Ensure minimum part_size is met for put_object to function - chunk_size = max(5 * 1024 * 1024, self.chunk_size) - - ################# - # Pre-read loop: - # Verify the filelist contents and create the necessary lists for the - # tape preparation request - - # TODO: refactor this? - tar_list = [] - tar_originals_map = {} - for path_details in filelist: - # First check whether path_details has failed too many times - if path_details.retries.count > self.max_retries: - self.failedlist.append(path_details) - # TODO: do these actually get skipped? - continue - - try: - holding_prefix, filelist_hash = self.parse_tape_details(path_details) - except ArchiveError as e: - self.process_retry(str(e), path_details) - continue - - tar_filename = f"{filelist_hash}.tar" - try: - # Check that filelist hash is in the full retrieval list - assert tar_filename in retrieval_dict - except AssertionError as e: - reason = f"Tar file name not found in full retrieval list" - self.process_retry(reason, path_details) - continue - - holding_tape_path = ( - f"root://{tape_server}/{tape_base_dir}/" f"{holding_prefix}" - ) - full_tape_path = f"{holding_tape_path}/{tar_filename}" - - # Check bucket folder exists on tape - status, _ = fs_client.dirlist( - f"{tape_base_dir}/{holding_prefix}", DirListFlags.STAT - ) - if status.status != 0: - # If bucket tape-folder can't be found then pass for retry - reason = ( - f"Tape holding folder ({tape_base_dir}/{holding_prefix}) " - f"could not be found, cannot retrieve from archive" + if USE_DISKTAPE: + disk_loc = os.path.expanduser(DISKTAPE_LOC) + self.log( + f"Starting disk transfer between {disk_loc} and object store " + f"{tenancy}", + RK.LOG_INFO, ) - self.process_retry(reason, path_details) - continue - - # The tar_filenames must be encoded into a list of byte strings for - # prepare to work, as of pyxrootd v5.5.3. We group them together to - # ensure only a single transaction is passed to the tape server. - prepare_item = f"{tape_base_dir}/{holding_prefix}/{tar_filename}" - tar_list.append(prepare_item) - - # Create the tar_originals_map, which maps from tar file to the - # path_details in the original filelist (i.e. the files in the - # original request) for easy retry/failure if something goes wrong. - if tar_filename not in tar_originals_map: - tar_originals_map[tar_filename] = [ - path_details, - ] - else: - tar_originals_map[tar_filename].append(path_details) - - # If tar_list is empty at this point everything is either in need of - # retrying or failing so we can just start a TLR with an exception - if len(tar_list) == 0: - raise CallbackError( - "All paths failed verification, nothing to " "prepare or get." - ) - - ########### - # Prepare: - # Make a preparation request and/or check on the status of an existing - # preparation request - - # Deduplicate the list of tar files - tar_list = list(set(tar_list)) - # Convert to bytes (if necessary) - prepare_list = self.prepare_preparelist(tar_list) - - if prepare_id is None: - # Prepare all the tar files at once. This can take a while so requeue - # the message afterwards to free up the consumer and periodicaly check - # the status of the prepare command. - # TODO: Utilise the prepare result for eviction later? - status, prepare_result = fs_client.prepare(prepare_list, PrepareFlags.STAGE) - if status.status != 0: - # If tarfiles can't be staged then pass for transaction-level - # retry. TODO: Probably need to figure out which files have failed - # this step as it won't necessarily be all of them. - raise CallbackError(f"Tar files ({tar_list}) could not be " "prepared.") - - prepare_id = prepare_result.decode()[:-1] - body_json[MSG.DATA][MSG.PREPARE_ID] = prepare_id - - query_resp = self.query_prepare_request(prepare_id, tar_list, fs_client) - # Check whether the file is still offline, if so then requeue filelist - - for response in query_resp["responses"]: - tar_path = response["path"] - # First verify file integrity. - if not response["on_tape"] or not response["path_exists"]: - tarname = tar_path.split("/")[-1] - # Big problem if the file is not on tape as it will never be - # prepared properly. Need to fail it and remove it from filelist - # i.e split off the files from the workflow - failed_pds = retrieval_dict[tarname] - self.failedlist.extend(failed_pds) - del retrieval_dict[tarname] - - # Check if the preparation has finished for this file - if response["online"]: - self.log(f"Prepare has completed for {tar_path}", RK.LOG_INFO) - continue - elif not response["requested"]: - # If the file is not online and not requested anymore, - # another request needs to be made. - # NOTE: we can either throw an exception here and let this only - # happen a set number of times before failure, or we can just - # requeue without a preapre_id and let the whole process happen - # again. I don't really know the specifics of why something - # would refuse to be prepared, so I don't know what the best - # strategy would be... - raise CallbackError( - f"Prepare request could not be fulfilled for request " - f"{prepare_id}, sending for retry." + streamer = S3ToTarfileDisk( + s3_tenancy=tenancy, + s3_access_key=access_key, + s3_secret_key=secret_key, + disk_location=disk_loc, + logger=self.log, ) else: self.log( - f"Prepare request with id {prepare_id} has not completed " - f"for {tar_path}, requeuing message with a delay of " - f"{self.prepare_requeue_delay}", + f"Starting tape transfer between {tape_url} and object store " + f"{tenancy}", RK.LOG_INFO, ) - self.send_pathlist( - filelist, - rk_retry, - body_json, - state=State.CATALOG_ARCHIVE_AGGREGATING, + streamer = S3ToTarfileTape( + s3_tenancy=tenancy, + s3_access_key=access_key, + s3_secret_key=secret_key, + tape_url=tape_url, + logger=self.log, ) - return - - ############# - # Main loop: - # Loop through retrieval dict and put the contents of each tar file into - # its appropriate bucket on the objectstore. - for tarname, tar_filelist in retrieval_dict.items(): - original_filelist = tar_originals_map[tarname] - - # Get the holding_prefix from the first path_details in the - # original filelist. This is guaranteed not to error as it would - # have above otherwise? - holding_prefix, filelist_hash = self.parse_tape_details(original_filelist[0]) - - tar_filename = f"{filelist_hash}.tar" - if tar_filename != tarname: - raise TarError( - "Mismatch between tarname parsed from " - "retrieval_dict and from path_details" + except S3StreamError as e: + # if a S3StreamError occurs then all files have failed + for path_details in filelist: + path_details.failure_reason = e.message + self.failedlist.append(path_details) + checksum = None + else: + # For archive_get, the message is structured as a dictionary stored in + # ['data']['retrieval_dict'] and a filelist stored in ['data']['filelist'] + # there is also a filelist in ['data']['retrieval_dict']['filelist'] which + # contains the files to be retrieved from the tarfile / aggregate: + retrieval_dict = body_json[MSG.DATA][MSG.RETRIEVAL_DICT] + # looping over the aggregates + for tarfile, item in retrieval_dict.items(): + # get the holding id and build the holding_prefix + holding_id = item[MSG.HOLDING_ID] + holding_prefix = self.get_holding_prefix( + body_json, holding_id=holding_id ) + # get the list of files to retrieve from the tarfile / aggregate + aggregate_filelist = item[MSG.FILELIST] + # empty streamer.filelist for new aggregate + streamer.filelist.clear() + try: + self.completelist, self.failedlist = streamer.get( + holding_prefix, tarfile, aggregate_filelist, self.chunk_size + ) + except S3StreamError as e: + # if a S3StreamError occurs then all files have failed + for path_details in filelist: + path_details.failure_reason = e.message + self.failedlist.append(path_details) + checksum = None + + #raise NotImplementedError + for f in self.filelist: + f.failure_reason = "Test" + self.failedlist.append(f) - holding_tape_path = ( - f"root://{tape_server}/{tape_base_dir}/" f"{holding_prefix}" - ) - full_tape_path = f"{holding_tape_path}/{tar_filename}" - + if len(self.completelist) > 0: + # Send whatever remains after all items have been got self.log( - f"Attempting to stream contents of tar file " - f"{full_tape_path} directly from tape archive to object " - "storage", + "Archive get complete, passing lists back to worker for transfer.", RK.LOG_INFO, ) - - # Stream directly from the tar file to object store, one tar-member - # at a time - try: - path_details = None - with XRDClient.File() as f: - # Open the tar file with READ - status, _ = f.open(full_tape_path, OpenFlags.READ) - if status.status != 0: - raise XRootDError( - f"Failed to open file {full_tape_path}" - f" for reading. Status: {status}" - ) - - # Wrap the xrootd File handler so minio can use it - fw = Adler32File(f) - - self.log(f"Opening tar file {tar_filename}", RK.LOG_INFO) - with tarfile.open( - mode="r:", copybufsize=chunk_size, fileobj=fw - ) as tar: - # Check the tar file contains the originally requested - # files. NOTE: Commented out due to open questions about - # the workflow. - # self.validate_tar(tar, original_filelist) - - # for tarinfo in members: - for path_details in tar_filelist: - try: - _ = self.stream_tarmember( - path_details, tar, s3_client, chunk_size=chunk_size - ) - except (HTTPError, S3Error) as e: - # Handle objectstore exception. Check whether - # it's from the original list or not, retry/fail - # as appropriate - reason = ( - f"Stream-time exception occurred: " - f"({type(e).__name__}: {e})" - ) - self.log(reason, RK.LOG_DEBUG) - if path_details in original_filelist: - # Retry if one of the original files failed - self.process_retry(reason, path_details) - else: - # Fail and move on with our lives - self.failedlist.append(path_details) - except TarMemberError as e: - # This exception is only raised when retrying - # will not help, so fail the file (regardless of - # origin) - self.log( - "TarMemberError encountered at stream " - f"time {e}, failing whole tar file " - f"({tar_filename})", - RK.LOG_INFO, - ) - self.failedlist.append(path_details) - except FileAlreadyRetrieved as e: - # Handle specific case of a file already having - # been retrieved. - self.log( - f"Tar member {path_details.path} has " - "already been retrieved, passing to " - "completelist", - RK.LOG_INFO, - ) - self.completelist.append(path_details) - except Exception as e: - self.log( - "Unexpected exception occurred during " - f"stream time {e}", - RK.LOG_ERROR, - ) - raise e - else: - # Log successful - self.log( - f"Successfully retrieved {path_details.path}" - f" from the archive and streamed to object " - "store", - RK.LOG_INFO, - ) - self.completelist.append(path_details) - except (TarError, ArchiveError) as e: - # Handle whole-tar error, i.e. fail whole list - self.log( - f"Tar-level error raised: {e}, failing whole tar and " - "continuing to next.", - RK.LOG_ERROR, - ) - # Give each a failure reason for creating FailedFiles - for pd in tar_filelist: - pd.retries.increment(reason=str(e)) - self.failedlist.extend(tar_filelist) - except XRootDError as e: - # Handle xrootd error, where file failed to be read. Retry the - # whole tar. - reason = "Failed to open file with XRootD" - self.log(f"{e}. Passing for retry.", RK.LOG_ERROR) - for pd in tar_filelist: - self.process_retry(reason, pd) - except Exception as e: - self.log( - f"Unexpected exception occurred during stream time {e}", - RK.LOG_ERROR, - ) - raise e - else: - # Log successful retrieval - self.log(f"Successfully streamed tar file {tarname}", RK.LOG_INFO) - - # Evict the files at the end to ensure the EOS cache doesn't fill up. - # First need to check whether the files are in another prepare request - self.log( - "Querying prepare request to check whether it should be " - f"evicted from the disk-cache", - RK.LOG_INFO, - ) - query_resp = self.query_prepare_request(prepare_id, tar_list, fs_client) - evict_list = [ - response["path"] - for response in query_resp["responses"] - if not response["requested"] - ] - evict_list = self.prepare_preparelist(evict_list) - - status, _ = fs_client.prepare(evict_list, PrepareFlags.EVICT) - if status.status != 0: - self.log(f"Could not evict tar files from tape cache.", RK.LOG_WARNING) - - self.log( - "Archive read complete, passing lists back to worker for " - "re-routing and cataloguing.", - RK.LOG_INFO, - ) - self.log( - f"List lengths: complete:{len(self.completelist)}, " - f"retry:{len(self.retrylist)}, failed:{len(self.failedlist)}", - RK.LOG_DEBUG, - ) - - # Reorganise the lists and send the necessary message info for each mode - if len(self.completelist) > 0: - # Going to transfer, so only include original files in the sent - # pathlist. - originals_to_complete = [pd for pd in self.completelist if pd in filelist] self.send_pathlist( - originals_to_complete, - rk_complete, - body_json, - mode=FilelistType.archived, - ) - if len(self.retrylist) > 0: - # This will return to here, so we need to make sure the retrieval - # dict that's passed back is set correctly. Make a list of the - # originals that need retrying - originals_to_retry = [pd for pd in self.retrylist if pd in filelist] - - # Combine everything that has either completed or failed, i.e. which - # shouldn't be retried. - retry_retrieval_dict = {} - not_to_retry = self.completelist + self.failedlist - # Horrible nested bastard to rebuild the minimum version of the - # retrieval_dict. TODO: Re-look at this if we end up using it, - # could probably be more efficient - for tarname, tar_filelist in retrieval_dict: - for pd in originals_to_retry: - if pd in tar_filelist: - trimmed_tarlist = [ - pd for pd in tar_filelist if pd not in not_to_retry - ] - retry_retrieval_dict[tarname] = trimmed_tarlist - - body_json[MSG.DATA][MSG.RETRIEVAL_FILELIST] = retry_retrieval_dict - self.send_pathlist( - originals_to_retry, rk_retry, body_json, mode=FilelistType.retry + self.completelist, rk_complete, body_json, state=State.ARCHIVE_GETTING ) + if len(self.failedlist) > 0: - # This will eventually go to CATALOG_ARCHIVE_ROLLBACK, so can send - # all path_details in this. + # Send message back to worker so catalog can be scrubbed of failed puts self.send_pathlist( self.failedlist, rk_failed, body_json, - state=State.CATALOG_ARCHIVE_ROLLBACK, - ) - - def parse_tape_details(self, path_details): - """Get the tape information from a given path_details object - (holding_prefix and filelist_hash) for constructing the full tape path - """ - # Then parse tape path information in preparedness for file - # preparation - tape_path = path_details.tape_path - try: - # Split out the root and path, passed from the Location - tape_location_root, original_path = tape_path.split(":") - # Split further to get the holding_prefix and the tar filename - holding_prefix, filelist_hash = tape_location_root.split("_") - return holding_prefix, filelist_hash - except ValueError as e: - reason = f"Could not unpack mandatory info from path_details. {e}" - self.log(reason, RK.LOG_ERROR) - raise ArchiveError(reason) - - def prepare_preparelist(self, tar_list: List[str]) -> List[str | bytes]: - # Check the version of xrootd and convert the list to bytes if necessary - if version("XRootD") < "5.5.5": - tar_list = [i.decode("utf_8") for i in tar_list] - - return tar_list - - def query_prepare_request( - self, prepare_id: str, tar_list: List[str], fs_client: XRDClient.FileSystem - ) -> Dict[str, str]: - # Generate the arg string for the prepare query with new line characters - query_args = "\n".join([prepare_id, *tar_list]) - status, query_resp = fs_client.query(QueryCode.PREPARE, query_args) - if status.status != 0: - raise CallbackError( - f"Could not check status of prepare request " f"{prepare_id}." - ) - - # Convert into a dictionary - return json.loads(query_resp.decode()) - - def validate_tar(self, tar: TarFile, original_filelist: List[PathDetails]) -> None: - # Convert to sets for quick calculation of missing subset - members_set = {ti.name for ti in tar.getmembers()} - orig_fl_set = {pd.original_path for pd in original_filelist} - missing_files = orig_fl_set - members_set - if missing_files: - # Something has gone wrong if the file is not actually in the tar. - reason = ( - f"Some originally requested files ({missing_files}) do not " - f"appear to be in the tarfile at {tar.name}." + state=State.FAILED, ) - self.log(reason, RK.LOG_ERROR) - # NOTE: Does this close the file properly? - # NOTE: Do we want to try and read the files that are there? - # Probably... - raise TarError(reason) - - def get_bucket_info(self, path_details: PathDetails) -> Tuple[str]: - """Get the bucket_name and path from the object_name of a given - path_details object. - """ - if path_details.bucket_name is not None: - bucket_name = path_details.bucket_name - object_name = path_details.object_name - # Otherwise, log error and queue for retry - else: - reason = "Unable to get bucket_name from message info" - self.process_retry(reason, path_details) - raise TarMemberError(reason) - return bucket_name, object_name - - def validate_bucket(self, path_details: PathDetails, s3_client: Minio): - """Extract bucket info from path details object and then check it's - valid, i.e. that it exists and whether the object specified is already - in it. Will raise an Exception as necessary. - - :raises: FileAlreadyRetrieved, TarMemberError, HTTPError, S3Error - """ - bucket_name, object_name = self.get_bucket_info(path_details) + return - try: - # Check that bucket exists, and create if not - if not s3_client.bucket_exists(bucket_name): - self.log( - f"Creating bucket ({bucket_name}) for this tarfile " - f"member ({path_details})", - RK.LOG_INFO, - ) - s3_client.make_bucket(bucket_name) - else: - self.log( - f"Bucket for this transaction ({bucket_name}) already " f"exists", - RK.LOG_INFO, - ) - objects_iter = s3_client.list_objects( - bucket_name, prefix=f"/{object_name}" - ) - objects = [obj.object_name for obj in objects_iter] - # Look for object in bucket, continue if not present - assert object_name not in objects - except (HTTPError, S3Error) as e: - # If bucket can't be created then pass for retry and continue - self.log( - f"Bucket {bucket_name} could not be validated due to error " - f"connecting with tenancy. ({e})" - ) - raise e - except AssertionError as e: - # If it exists in the bucket then our job is done, we can just - # continue in the loop - self.log( - f"Object {object_name} already exists in {bucket_name}. " - f"Skipping to next archive retrieval.", - RK.LOG_WARNING, - ) - raise FileAlreadyRetrieved(f"File {path_details} already archived.") - return bucket_name, object_name - - @retry((S3Error, HTTPError), tries=5, delay=1, backoff=2) - def stream_tarmember( - self, path_details: PathDetails, tar: TarFile, s3_client: Minio, chunk_size=None - ) -> ObjectWriteResult: - """The inner loop of actions to be performed on each memeber of the tar - file""" - try: - tarinfo = tar.getmember(path_details.original_path) - except KeyError: - reason = ( - f"Could not find tar member for path details object " - f"{path_details}. Cannot continue." - ) - raise TarMemberError(reason) - - # Get bucket info and check that it exists/doesn't already contain the - # file - bucket_name, object_name = self.validate_bucket(path_details, s3_client) - - self.log( - f"Starting stream of {tarinfo.name} to object store bucket " - f"{bucket_name}.", - RK.LOG_INFO, - ) - - # Extract the file as a file object, this makes a thin wrapper around - # the filewrapper we're already using and maintains chunked reading - f = tar.extractfile(tarinfo) - write_result = s3_client.put_object( - bucket_name, - object_name, - f, - -1, - part_size=chunk_size, - ) - self.log(f"Finsihed stream of {tarinfo.name} to object store", RK.LOG_INFO) - - return write_result - - def process_retry(self, reason: str, path_details: PathDetails) -> None: - """Convenience function for logging and setting in motion a retry for a - given path_details object. - """ - self.log(f"{reason}. Adding {path_details.path} to retry list.", RK.LOG_ERROR) - path_details.retries.increment(reason=reason) - self.retrylist.append(path_details) + # def transfer_old( + # self, + # transaction_id: str, + # tenancy: str, + # access_key: str, + # secret_key: str, + # tape_url: str, + # filelist: List[PathDetails], + # rk_origin: str, + # body_json: Dict[str, Any], + # ): + # # Make the routing keys + # rk_complete = ".".join([rk_origin, RK.ARCHIVE_GET, RK.COMPLETE]) + # rk_failed = ".".join([rk_origin, RK.ARCHIVE_GET, RK.FAILED]) + + # # Can call this as the url has been verified previously + # tape_server, tape_base_dir = self.split_tape_url(tape_url) + + # try: + # raw_rd = dict(body_json[MSG.DATA][MSG.RETRIEVAL_FILELIST]) + # retrieval_dict = { + # tarname: [PathDetails.from_dict(pd_dict) for pd_dict in pd_dicts] + # for tarname, pd_dicts in raw_rd.items() + # } + # except TypeError as e: + # self.log( + # "Failed to reformat retrieval filelist into PathDetails " + # "objects. Retrievallist in message does not appear to be " + # "in the correct format.", + # RK.LOG_ERROR, + # ) + # raise e + + # # Declare useful variables + # bucket_name = None + # rk_complete = ".".join([rk_origin, RK.ARCHIVE_GET, RK.COMPLETE]) + # rk_retry = ".".join([rk_origin, RK.ARCHIVE_GET, RK.START]) + # rk_failed = ".".join([rk_origin, RK.ARCHIVE_GET, RK.FAILED]) + + # try: + # prepare_id = body_json[MSG.DATA][MSG.PREPARE_ID] + # except KeyError: + # self.log( + # "Could not get prepare_id from message info, continuing " "without", + # RK.LOG_INFO, + # ) + # prepare_id = None + + # # Create minio client + # s3_client = Minio( + # tenancy, + # access_key=access_key, + # secret_key=secret_key, + # secure=self.require_secure_fl, + # ) + + # # Create the FileSystem client at this point to verify the tape_base_dir + # fs_client = XRDClient.FileSystem(f"root://{tape_server}") + # # Attempt to verify that the base-directory exists + # self.verify_tape_server(fs_client, tape_server, tape_base_dir) + + # # Ensure minimum part_size is met for put_object to function + # chunk_size = max(5 * 1024 * 1024, self.chunk_size) + + # ################# + # # Pre-read loop: + # # Verify the filelist contents and create the necessary lists for the + # # tape preparation request + + # # TODO: refactor this? + # tar_list = [] + # tar_originals_map = {} + # for path_details in filelist: + # # First check whether path_details has failed too many times + # if path_details.retries.count > self.max_retries: + # self.failedlist.append(path_details) + # # TODO: do these actually get skipped? + # continue + + # try: + # holding_prefix, filelist_hash = self.parse_tape_details(path_details) + # except ArchiveError as e: + # self.process_retry(str(e), path_details) + # continue + + # tar_filename = f"{filelist_hash}.tar" + # try: + # # Check that filelist hash is in the full retrieval list + # assert tar_filename in retrieval_dict + # except AssertionError as e: + # reason = f"Tar file name not found in full retrieval list" + # self.process_retry(reason, path_details) + # continue + + # holding_tape_path = ( + # f"root://{tape_server}/{tape_base_dir}/" f"{holding_prefix}" + # ) + # full_tape_path = f"{holding_tape_path}/{tar_filename}" + + # # Check bucket folder exists on tape + # status, _ = fs_client.dirlist( + # f"{tape_base_dir}/{holding_prefix}", DirListFlags.STAT + # ) + # if status.status != 0: + # # If bucket tape-folder can't be found then pass for retry + # reason = ( + # f"Tape holding folder ({tape_base_dir}/{holding_prefix}) " + # f"could not be found, cannot retrieve from archive" + # ) + # self.process_retry(reason, path_details) + # continue + + # # The tar_filenames must be encoded into a list of byte strings for + # # prepare to work, as of pyxrootd v5.5.3. We group them together to + # # ensure only a single transaction is passed to the tape server. + # prepare_item = f"{tape_base_dir}/{holding_prefix}/{tar_filename}" + # tar_list.append(prepare_item) + + # # Create the tar_originals_map, which maps from tar file to the + # # path_details in the original filelist (i.e. the files in the + # # original request) for easy retry/failure if something goes wrong. + # if tar_filename not in tar_originals_map: + # tar_originals_map[tar_filename] = [ + # path_details, + # ] + # else: + # tar_originals_map[tar_filename].append(path_details) + + # # If tar_list is empty at this point everything is either in need of + # # retrying or failing so we can just start a TLR with an exception + # if len(tar_list) == 0: + # raise CallbackError( + # "All paths failed verification, nothing to " "prepare or get." + # ) + + # ########### + # # Prepare: + # # Make a preparation request and/or check on the status of an existing + # # preparation request + + # # Deduplicate the list of tar files + # tar_list = list(set(tar_list)) + # # Convert to bytes (if necessary) + # prepare_list = self.prepare_preparelist(tar_list) + + # if prepare_id is None: + # # Prepare all the tar files at once. This can take a while so requeue + # # the message afterwards to free up the consumer and periodicaly check + # # the status of the prepare command. + # # TODO: Utilise the prepare result for eviction later? + # status, prepare_result = fs_client.prepare(prepare_list, PrepareFlags.STAGE) + # if status.status != 0: + # # If tarfiles can't be staged then pass for transaction-level + # # retry. TODO: Probably need to figure out which files have failed + # # this step as it won't necessarily be all of them. + # raise CallbackError(f"Tar files ({tar_list}) could not be " "prepared.") + + # prepare_id = prepare_result.decode()[:-1] + # body_json[MSG.DATA][MSG.PREPARE_ID] = prepare_id + + # query_resp = self.query_prepare_request(prepare_id, tar_list, fs_client) + # # Check whether the file is still offline, if so then requeue filelist + + # for response in query_resp["responses"]: + # tar_path = response["path"] + # # First verify file integrity. + # if not response["on_tape"] or not response["path_exists"]: + # tarname = tar_path.split("/")[-1] + # # Big problem if the file is not on tape as it will never be + # # prepared properly. Need to fail it and remove it from filelist + # # i.e split off the files from the workflow + # failed_pds = retrieval_dict[tarname] + # self.failedlist.extend(failed_pds) + # del retrieval_dict[tarname] + + # # Check if the preparation has finished for this file + # if response["online"]: + # self.log(f"Prepare has completed for {tar_path}", RK.LOG_INFO) + # continue + # elif not response["requested"]: + # # If the file is not online and not requested anymore, + # # another request needs to be made. + # # NOTE: we can either throw an exception here and let this only + # # happen a set number of times before failure, or we can just + # # requeue without a preapre_id and let the whole process happen + # # again. I don't really know the specifics of why something + # # would refuse to be prepared, so I don't know what the best + # # strategy would be... + # raise CallbackError( + # f"Prepare request could not be fulfilled for request " + # f"{prepare_id}, sending for retry." + # ) + # else: + # self.log( + # f"Prepare request with id {prepare_id} has not completed " + # f"for {tar_path}, requeuing message with a delay of " + # f"{self.prepare_requeue_delay}", + # RK.LOG_INFO, + # ) + # self.send_pathlist( + # filelist, + # rk_retry, + # body_json, + # state=State.CATALOG_ARCHIVE_AGGREGATING, + # ) + # return + + # ############# + # # Main loop: + # # Loop through retrieval dict and put the contents of each tar file into + # # its appropriate bucket on the objectstore. + # for tarname, tar_filelist in retrieval_dict.items(): + # original_filelist = tar_originals_map[tarname] + + # # Get the holding_prefix from the first path_details in the + # # original filelist. This is guaranteed not to error as it would + # # have above otherwise? + # holding_prefix, filelist_hash = self.parse_tape_details(original_filelist[0]) + + # tar_filename = f"{filelist_hash}.tar" + # if tar_filename != tarname: + # raise TarError( + # "Mismatch between tarname parsed from " + # "retrieval_dict and from path_details" + # ) + + # holding_tape_path = ( + # f"root://{tape_server}/{tape_base_dir}/" f"{holding_prefix}" + # ) + # full_tape_path = f"{holding_tape_path}/{tar_filename}" + + # self.log( + # f"Attempting to stream contents of tar file " + # f"{full_tape_path} directly from tape archive to object " + # "storage", + # RK.LOG_INFO, + # ) + + # # Stream directly from the tar file to object store, one tar-member + # # at a time + # try: + # path_details = None + # with XRDClient.File() as f: + # # Open the tar file with READ + # status, _ = f.open(full_tape_path, OpenFlags.READ) + # if status.status != 0: + # raise XRootDError( + # f"Failed to open file {full_tape_path}" + # f" for reading. Status: {status}" + # ) + + # # Wrap the xrootd File handler so minio can use it + # fw = Adler32File(f) + + # self.log(f"Opening tar file {tar_filename}", RK.LOG_INFO) + # with tarfile.open( + # mode="r:", copybufsize=chunk_size, fileobj=fw + # ) as tar: + # # Check the tar file contains the originally requested + # # files. NOTE: Commented out due to open questions about + # # the workflow. + # # self.validate_tar(tar, original_filelist) + + # # for tarinfo in members: + # for path_details in tar_filelist: + # try: + # _ = self.stream_tarmember( + # path_details, tar, s3_client, chunk_size=chunk_size + # ) + # except (HTTPError, S3Error) as e: + # # Handle objectstore exception. Check whether + # # it's from the original list or not, retry/fail + # # as appropriate + # reason = ( + # f"Stream-time exception occurred: " + # f"({type(e).__name__}: {e})" + # ) + # self.log(reason, RK.LOG_DEBUG) + # if path_details in original_filelist: + # # Retry if one of the original files failed + # self.process_retry(reason, path_details) + # else: + # # Fail and move on with our lives + # self.failedlist.append(path_details) + # except TarMemberError as e: + # # This exception is only raised when retrying + # # will not help, so fail the file (regardless of + # # origin) + # self.log( + # "TarMemberError encountered at stream " + # f"time {e}, failing whole tar file " + # f"({tar_filename})", + # RK.LOG_INFO, + # ) + # self.failedlist.append(path_details) + # except FileAlreadyRetrieved as e: + # # Handle specific case of a file already having + # # been retrieved. + # self.log( + # f"Tar member {path_details.path} has " + # "already been retrieved, passing to " + # "completelist", + # RK.LOG_INFO, + # ) + # self.completelist.append(path_details) + # except Exception as e: + # self.log( + # "Unexpected exception occurred during " + # f"stream time {e}", + # RK.LOG_ERROR, + # ) + # raise e + # else: + # # Log successful + # self.log( + # f"Successfully retrieved {path_details.path}" + # f" from the archive and streamed to object " + # "store", + # RK.LOG_INFO, + # ) + # self.completelist.append(path_details) + # except (TarError, ArchiveError) as e: + # # Handle whole-tar error, i.e. fail whole list + # self.log( + # f"Tar-level error raised: {e}, failing whole tar and " + # "continuing to next.", + # RK.LOG_ERROR, + # ) + # # Give each a failure reason for creating FailedFiles + # for pd in tar_filelist: + # pd.retries.increment(reason=str(e)) + # self.failedlist.extend(tar_filelist) + # except XRootDError as e: + # # Handle xrootd error, where file failed to be read. Retry the + # # whole tar. + # reason = "Failed to open file with XRootD" + # self.log(f"{e}. Passing for retry.", RK.LOG_ERROR) + # for pd in tar_filelist: + # self.process_retry(reason, pd) + # except Exception as e: + # self.log( + # f"Unexpected exception occurred during stream time {e}", + # RK.LOG_ERROR, + # ) + # raise e + # else: + # # Log successful retrieval + # self.log(f"Successfully streamed tar file {tarname}", RK.LOG_INFO) + + # # Evict the files at the end to ensure the EOS cache doesn't fill up. + # # First need to check whether the files are in another prepare request + # self.log( + # "Querying prepare request to check whether it should be " + # f"evicted from the disk-cache", + # RK.LOG_INFO, + # ) + # query_resp = self.query_prepare_request(prepare_id, tar_list, fs_client) + # evict_list = [ + # response["path"] + # for response in query_resp["responses"] + # if not response["requested"] + # ] + # evict_list = self.prepare_preparelist(evict_list) + + # status, _ = fs_client.prepare(evict_list, PrepareFlags.EVICT) + # if status.status != 0: + # self.log(f"Could not evict tar files from tape cache.", RK.LOG_WARNING) + + # self.log( + # "Archive read complete, passing lists back to worker for " + # "re-routing and cataloguing.", + # RK.LOG_INFO, + # ) + # self.log( + # f"List lengths: complete:{len(self.completelist)}, " + # f"retry:{len(self.retrylist)}, failed:{len(self.failedlist)}", + # RK.LOG_DEBUG, + # ) + + # # Reorganise the lists and send the necessary message info for each mode + # if len(self.completelist) > 0: + # # Going to transfer, so only include original files in the sent + # # pathlist. + # originals_to_complete = [pd for pd in self.completelist if pd in filelist] + # self.send_pathlist( + # originals_to_complete, + # rk_complete, + # body_json, + # mode=FilelistType.archived, + # ) + # if len(self.retrylist) > 0: + # # This will return to here, so we need to make sure the retrieval + # # dict that's passed back is set correctly. Make a list of the + # # originals that need retrying + # originals_to_retry = [pd for pd in self.retrylist if pd in filelist] + + # # Combine everything that has either completed or failed, i.e. which + # # shouldn't be retried. + # retry_retrieval_dict = {} + # not_to_retry = self.completelist + self.failedlist + # # Horrible nested bastard to rebuild the minimum version of the + # # retrieval_dict. TODO: Re-look at this if we end up using it, + # # could probably be more efficient + # for tarname, tar_filelist in retrieval_dict: + # for pd in originals_to_retry: + # if pd in tar_filelist: + # trimmed_tarlist = [ + # pd for pd in tar_filelist if pd not in not_to_retry + # ] + # retry_retrieval_dict[tarname] = trimmed_tarlist + + # body_json[MSG.DATA][MSG.RETRIEVAL_FILELIST] = retry_retrieval_dict + # self.send_pathlist( + # originals_to_retry, rk_retry, body_json, mode=FilelistType.retry + # ) + # if len(self.failedlist) > 0: + # # This will eventually go to CATALOG_ARCHIVE_ROLLBACK, so can send + # # all path_details in this. + # self.send_pathlist( + # self.failedlist, + # rk_failed, + # body_json, + # state=State.CATALOG_ARCHIVE_ROLLBACK, + # ) + + # def parse_tape_details(self, path_details): + # """Get the tape information from a given path_details object + # (holding_prefix and filelist_hash) for constructing the full tape path + # """ + # # Then parse tape path information in preparedness for file + # # preparation + # tape_path = path_details.tape_path + # try: + # # Split out the root and path, passed from the Location + # tape_location_root, original_path = tape_path.split(":") + # # Split further to get the holding_prefix and the tar filename + # holding_prefix, filelist_hash = tape_location_root.split("_") + # return holding_prefix, filelist_hash + # except ValueError as e: + # reason = f"Could not unpack mandatory info from path_details. {e}" + # self.log(reason, RK.LOG_ERROR) + # raise ArchiveError(reason) + + # def prepare_preparelist(self, tar_list: List[str]) -> List[str | bytes]: + # # Check the version of xrootd and convert the list to bytes if necessary + # if version("XRootD") < "5.5.5": + # tar_list = [i.decode("utf_8") for i in tar_list] + + # return tar_list + + # def query_prepare_request( + # self, prepare_id: str, tar_list: List[str], fs_client: XRDClient.FileSystem + # ) -> Dict[str, str]: + # # Generate the arg string for the prepare query with new line characters + # query_args = "\n".join([prepare_id, *tar_list]) + # status, query_resp = fs_client.query(QueryCode.PREPARE, query_args) + # if status.status != 0: + # raise CallbackError( + # f"Could not check status of prepare request " f"{prepare_id}." + # ) + + # # Convert into a dictionary + # return json.loads(query_resp.decode()) + + # def validate_tar(self, tar: TarFile, original_filelist: List[PathDetails]) -> None: + # # Convert to sets for quick calculation of missing subset + # members_set = {ti.name for ti in tar.getmembers()} + # orig_fl_set = {pd.original_path for pd in original_filelist} + # missing_files = orig_fl_set - members_set + # if missing_files: + # # Something has gone wrong if the file is not actually in the tar. + # reason = ( + # f"Some originally requested files ({missing_files}) do not " + # f"appear to be in the tarfile at {tar.name}." + # ) + # self.log(reason, RK.LOG_ERROR) + # # NOTE: Does this close the file properly? + # # NOTE: Do we want to try and read the files that are there? + # # Probably... + # raise TarError(reason) + + # def get_bucket_info(self, path_details: PathDetails) -> Tuple[str]: + # """Get the bucket_name and path from the object_name of a given + # path_details object. + # """ + # if path_details.bucket_name is not None: + # bucket_name = path_details.bucket_name + # object_name = path_details.object_name + # # Otherwise, log error and queue for retry + # else: + # reason = "Unable to get bucket_name from message info" + # self.process_retry(reason, path_details) + # raise TarMemberError(reason) + # return bucket_name, object_name + + # def validate_bucket(self, path_details: PathDetails, s3_client: Minio): + # """Extract bucket info from path details object and then check it's + # valid, i.e. that it exists and whether the object specified is already + # in it. Will raise an Exception as necessary. + + # :raises: FileAlreadyRetrieved, TarMemberError, HTTPError, S3Error + # """ + # bucket_name, object_name = self.get_bucket_info(path_details) + + # try: + # # Check that bucket exists, and create if not + # if not s3_client.bucket_exists(bucket_name): + # self.log( + # f"Creating bucket ({bucket_name}) for this tarfile " + # f"member ({path_details})", + # RK.LOG_INFO, + # ) + # s3_client.make_bucket(bucket_name) + # else: + # self.log( + # f"Bucket for this transaction ({bucket_name}) already " f"exists", + # RK.LOG_INFO, + # ) + # objects_iter = s3_client.list_objects( + # bucket_name, prefix=f"/{object_name}" + # ) + # objects = [obj.object_name for obj in objects_iter] + # # Look for object in bucket, continue if not present + # assert object_name not in objects + # except (HTTPError, S3Error) as e: + # # If bucket can't be created then pass for retry and continue + # self.log( + # f"Bucket {bucket_name} could not be validated due to error " + # f"connecting with tenancy. ({e})" + # ) + # raise e + # except AssertionError as e: + # # If it exists in the bucket then our job is done, we can just + # # continue in the loop + # self.log( + # f"Object {object_name} already exists in {bucket_name}. " + # f"Skipping to next archive retrieval.", + # RK.LOG_WARNING, + # ) + # raise FileAlreadyRetrieved(f"File {path_details} already archived.") + # return bucket_name, object_name + + # @retry((S3Error, HTTPError), tries=5, delay=1, backoff=2) + # def stream_tarmember( + # self, path_details: PathDetails, tar: TarFile, s3_client: Minio, chunk_size=None + # ) -> ObjectWriteResult: + # """The inner loop of actions to be performed on each memeber of the tar + # file""" + # try: + # tarinfo = tar.getmember(path_details.original_path) + # except KeyError: + # reason = ( + # f"Could not find tar member for path details object " + # f"{path_details}. Cannot continue." + # ) + # raise TarMemberError(reason) + + # # Get bucket info and check that it exists/doesn't already contain the + # # file + # bucket_name, object_name = self.validate_bucket(path_details, s3_client) + + # self.log( + # f"Starting stream of {tarinfo.name} to object store bucket " + # f"{bucket_name}.", + # RK.LOG_INFO, + # ) + + # # Extract the file as a file object, this makes a thin wrapper around + # # the filewrapper we're already using and maintains chunked reading + # f = tar.extractfile(tarinfo) + # write_result = s3_client.put_object( + # bucket_name, + # object_name, + # f, + # -1, + # part_size=chunk_size, + # ) + # self.log(f"Finsihed stream of {tarinfo.name} to object store", RK.LOG_INFO) + + # return write_result + + # def process_retry(self, reason: str, path_details: PathDetails) -> None: + # """Convenience function for logging and setting in motion a retry for a + # given path_details object. + # """ + # self.log(f"{reason}. Adding {path_details.path} to retry list.", RK.LOG_ERROR) + # path_details.retries.increment(reason=reason) + # self.retrylist.append(path_details) def main(): diff --git a/nlds_processors/archiver/archive_put.py b/nlds_processors/archiver/archive_put.py index eaff8939..a956bc58 100644 --- a/nlds_processors/archiver/archive_put.py +++ b/nlds_processors/archiver/archive_put.py @@ -20,11 +20,12 @@ ArchiveError, ) -USE_DISKTAPE = True +from nlds.nlds_setup import USE_DISKTAPE, DISKTAPE_LOC if USE_DISKTAPE: from nlds_processors.archiver.s3_to_tarfile_disk import S3ToTarfileDisk else: from nlds_processors.archiver.s3_to_tarfile_tape import S3ToTarfileTape + from nlds_processors.archiver.s3_to_tarfile_stream import S3StreamError from nlds.rabbit.consumer import State @@ -60,9 +61,9 @@ def transfer( # Create the S3 to tape or disk streamer try: if USE_DISKTAPE: - disk_loc = os.path.expanduser("~/DISKTAPE") + disk_loc = os.path.expanduser(DISKTAPE_LOC) self.log( - f"Starting disk transfer between {disk_loc} and object store " + f"Starting disk transfer between {disk_loc} and object store " f"{tenancy}", RK.LOG_INFO, ) @@ -75,7 +76,7 @@ def transfer( ) else: self.log( - f"Starting tape transfer between {tape_url} and object store " + f"Starting tape transfer between {tape_url} and object store " f"{tenancy}", RK.LOG_INFO, ) @@ -115,7 +116,7 @@ def transfer( # Send whatever remains after all items have been put if len(self.completelist) > 0: self.log( - "Archive complete, passing lists back to worker for cataloguing.", + "Archive put complete, passing lists back to worker for cataloguing.", RK.LOG_INFO, ) self.send_pathlist( @@ -131,21 +132,6 @@ def transfer( state=State.FAILED, ) - @classmethod - def get_holding_prefix(cls, body: Dict[str, Any]) -> str: - """Get the uneditable holding information from the message body to - reproduce the holding prefix made in the catalog""" - try: - holding_id = body[MSG.META][MSG.HOLDING_ID] - user = body[MSG.DETAILS][MSG.USER] - group = body[MSG.DETAILS][MSG.GROUP] - except KeyError as e: - raise ArchiveError( - f"Could not make holding prefix, original error: {e}" - ) - - return f"nlds.{holding_id}.{user}.{group}" - def main(): consumer = PutArchiveConsumer() diff --git a/nlds_processors/archiver/s3_to_tarfile_disk.py b/nlds_processors/archiver/s3_to_tarfile_disk.py index 323e72d3..ee28f195 100644 --- a/nlds_processors/archiver/s3_to_tarfile_disk.py +++ b/nlds_processors/archiver/s3_to_tarfile_disk.py @@ -105,9 +105,9 @@ def put( raise S3StreamError(msg) except S3StreamError as e: msg = ( - f"Exception occurred during write of tarfile " - f"{self.tarfile_diskpath}. This file will now be deleted from the" - f"DISKTAPE. Original exception: {e}" + f"Exception occurred during write of tarfile {self.tarfile_diskpath}. " + f"This file will now be deleted from the DISKTAPE. " + f"Original exception: {e}" ) self.log(msg, RK.LOG_ERROR) self._remove_tarfile_from_disktape() @@ -136,6 +136,37 @@ def put( ) return completelist, failedlist, self.tarfile_diskpath, checksum + def get( + self, + holding_prefix: str, + tarfile: str, + filelist: List[PathDetails], + chunk_size: int, + ) -> tuple[List[PathDetails], List[PathDetails], str, int]: + """Stream from a tarfile on disk to Object Store""" + if self.filelist != []: + raise ValueError(f"self.filelist is not Empty: {self.filelist[0]}") + self.filelist = filelist + self.holding_prefix = holding_prefix + try: + # open the tarfile to read from + with open(tarfile, "rb") as file: + file_object = Adler32File(file, debug_fl=False) + completelist, failedlist, checksum = self._stream_to_s3object( + file_object, self.filelist, chunk_size + ) + except FileNotFoundError: + msg = f"Couldn't open tarfile ({self.tarfile_diskpath})." + self.log(msg, RK.LOG_ERROR) + raise S3StreamError(msg) + except S3StreamError as e: + msg = f"Exception occurred during read of tarfile {self.tarfile_diskpath}." + self.log(msg, RK.LOG_ERROR) + raise S3StreamError(msg) + + #raise NotImplementedError + return [], [] + @property def holding_diskpath(self): """Get the holding diskpath (i.e. the enclosing directory) on the DISKTAPE""" diff --git a/nlds_processors/archiver/s3_to_tarfile_stream.py b/nlds_processors/archiver/s3_to_tarfile_stream.py index 71233a4e..de9ae323 100644 --- a/nlds_processors/archiver/s3_to_tarfile_stream.py +++ b/nlds_processors/archiver/s3_to_tarfile_stream.py @@ -94,7 +94,7 @@ def _check_files_exist(self): ) failed_list.append(path_details) continue - + try: # Check that the object is in the bucket and the names match obj_stat_result = self.s3_client.stat_object(check_bucket, check_object) @@ -185,3 +185,26 @@ def _stream_to_fileobject( # If it can't be closed then dw pass return completelist, failedlist, file_object.checksum + + def _stream_to_s3object( + self, file_object, filelist: List[PathDetails], chunk_size: int + ): + # Stream from the a tar file to the S3 Object Store that is created via the + # file_object - this is usually an Adler32File + with tarfile.open(mode="r", fileobj=file_object, copybufsize=chunk_size) as tar: + # local versions of the completelist and failedlist + completelist = [] + failedlist = [] + + for path_details in filelist: + pd = PathDetails.from_dict(path_details) + self.log( + f"Streaming file {pd.path} from tape archive to object store", + RK.LOG_DEBUG, + ) + bucket_name = pd.bucket_name + object_name = pd.object_name + # tar_info = tarfile.TarInfo(name=object_name) + # tar_info.size = int(pd.size) + + return [], [], 0 \ No newline at end of file diff --git a/nlds_processors/catalog/catalog_worker.py b/nlds_processors/catalog/catalog_worker.py index 5b8d1ae6..eea67529 100644 --- a/nlds_processors/catalog/catalog_worker.py +++ b/nlds_processors/catalog/catalog_worker.py @@ -209,11 +209,13 @@ def _parse_transaction_id(self, body: Dict, mandatory: bool = False) -> str: def _parse_tenancy(self, body: Dict) -> str: # Get the tenancy from message, if none found then use the configured default - tenancy = self.default_tenancy - try: + if ( + MSG.TENANCY in body[MSG.DETAILS] + and body[MSG.DETAILS][MSG.TENANCY] is not None + ): tenancy = body[MSG.DETAILS][MSG.TENANCY] - except KeyError: - pass + else: + tenancy = self.default_tenancy return tenancy def _parse_metadata_vars(self, body: Dict) -> Tuple: @@ -235,11 +237,13 @@ def _parse_groupall(self, body: Dict) -> str: def _parse_tape_url(self, body: Dict) -> str: # Get the tape_url from message, if none found then use the configured default - tape_url = self.default_tape_url - try: + if ( + MSG.TAPE_URL in body[MSG.DETAILS] + and body[MSG.DETAILS][MSG.TAPE_URL] is not None + ): tape_url = body[MSG.DETAILS][MSG.TAPE_URL] - except KeyError: - pass + else: + tape_url = self.default_tape_url return tape_url def _parse_aggregation_id(self, body: Dict) -> str: @@ -531,23 +535,23 @@ def _catalog_update(self, body: Dict, rk_origin: str) -> None: access_time = datetime.now() else: access_time = pl.access_time - # check if location exists - this can happen on archive-restore, but it + # check if location exists - this can happen on archive-restore, but it # should be empty st = Storage.from_str(pl.storage_type) location = self.catalog.get_location(file, st) if location: # check empty - if location.url_scheme !="" or location.url_netloc != "": + if location.url_scheme != "" or location.url_netloc != "": raise CatalogError( f"{pl.storage_type} for file {pd.original_path} will be " f"overwritten, the Storage Location should be empty" ) # otherwise update if exists and not empty - location.url_scheme=pl.url_scheme - location.url_netloc=pl.url_netloc - location.root=pl.root - location.path=pl.path - location.access_time=access_time + location.url_scheme = pl.url_scheme + location.url_netloc = pl.url_netloc + location.root = pl.root + location.path = pl.path + location.access_time = access_time else: # create if it doesn't exist location = self.catalog.create_location( @@ -623,6 +627,9 @@ def _catalog_get(self, body: Dict, rk_origin: str) -> None: # start the database transactions, reset lists and self.catalog.start_session() + # what's the tenancy kenneth? + print(f"!!!! {tenancy}") + for filepath in filelist: filepath_details = PathDetails.from_dict(filepath) try: @@ -649,6 +656,8 @@ def _catalog_get(self, body: Dict, rk_origin: str) -> None: for file in files: pd = PathDetails.from_filemodel(file) if pd.locations.count == 0: + # empty storage location denotes that it is still in its initial + # transfer to OBJECT STORAGE pd.failure_reason = ( f"No Storage Location found for file with original path: " f"{pd.original_path}. Has it completed transfer?" @@ -665,9 +674,9 @@ def _catalog_get(self, body: Dict, rk_origin: str) -> None: "Storage." ) self.log(pd.failure_reason, RK.LOG_ERROR) + self.failedlist.append(pd) else: self.completelist.append(pd) - self.failedlist.append(pd) elif pd.locations.has_storage_type(MSG.TAPE): # get the aggregation @@ -695,9 +704,14 @@ def _catalog_get(self, body: Dict, rk_origin: str) -> None: self.tapedict[agg.tarname][MSG.FILELIST].append(pd) else: self.tapedict[agg.tarname] = { + # Holding id required for updating holding on return MSG.HOLDING_ID: tr.holding_id, + MSG.CHECKSUM: agg.checksum, MSG.FILELIST: [pd], } + # create the OBJECT STORAGE Path Location for the message (not + # the database) + pd.set_object_store(tenancy=tenancy, bucket=tr.transaction_id) self.tapelist.append(pd) else: @@ -735,9 +749,10 @@ def _catalog_get(self, body: Dict, rk_origin: str) -> None: # NEED RETRIEVING FROM TAPE if len(self.tapedict) > 0 and len(self.tapelist) > 0: rk_restore = ".".join([rk_origin, RK.ROUTE, RK.ARCHIVE_RESTORE]) - # Include the original files requested in the message body so they can be - # moved to disk after retrieval - body[MSG.DATA][MSG.RETRIEVAL_FILELIST] = self.tapedict + # Include the original files requested in the message body (tapelist) + # so they can be moved to disk after retrieval, as well as the aggregate + # dictionary (tapedict) + body[MSG.DATA][MSG.RETRIEVAL_DICT] = self.tapedict self.log( f"Rerouting PathList from CATALOG_GET to ARCHIVE_GET for archive " f"retrieval ({self.tapelist})", @@ -842,8 +857,8 @@ def _catalog_archive_put(self, body: Dict, rk_origin: str) -> None: self.catalog.end_session() def _catalog_archive_update( - self, body: Dict, rk_origin: str, storage_type: Storage - ) -> None: + self, body: Dict, rk_origin: str, storage_type: Storage + ) -> None: """Update the aggregation record following successful archive write to fill in the missing checksum information. """ @@ -1269,9 +1284,7 @@ def _catalog_find(self, body: Dict, properties: Header) -> None: ) for f in files: # get the transaction and the holding: - t = self.catalog.get_transaction( - id=f.transaction_id - ) + t = self.catalog.get_transaction(id=f.transaction_id) h = self.catalog.get_holding( user, group, groupall=groupall, holding_id=t.holding_id )[0] @@ -1469,9 +1482,9 @@ def callback( # can get very long. if not api_method == RK.STAT: self.log( - f"Received {json.dumps(body, indent=4)} from " - f"{self.queues[0].name} ({method.routing_key})", + f"Received from {self.queues[0].name} ({method.routing_key})", RK.LOG_DEBUG, + body_json=body ) self.log( diff --git a/nlds_processors/index.py b/nlds_processors/index.py index dd940363..bffdd96d 100644 --- a/nlds_processors/index.py +++ b/nlds_processors/index.py @@ -113,9 +113,9 @@ def callback(self, ch, method, properties, body, connection): body_json = json.loads(body) self.log( - f"Received {json.dumps(body_json, indent=4)} from " - f"{self.queues[0].name} ({method.routing_key})", + f"Received from {self.queues[0].name} ({method.routing_key})", RK.LOG_DEBUG, + body_json=body_json ) # Check for system status diff --git a/nlds_processors/monitor/monitor_worker.py b/nlds_processors/monitor/monitor_worker.py index 10bc15e8..5e973997 100644 --- a/nlds_processors/monitor/monitor_worker.py +++ b/nlds_processors/monitor/monitor_worker.py @@ -492,7 +492,7 @@ def _monitor_get(self, body: Dict[str, str], properties: Header) -> None: t_rec["sub_records"].append(s_rec) self.monitor.end_session() - + ret_list = [] for id_ in trecs_dict: if len(trecs_dict[id_]["sub_records"]) > 0: @@ -522,9 +522,9 @@ def callback( body = json.loads(body) self.log( - f"Received {json.dumps(body, indent=4)} from " - f"{self.queues[0].name} ({method.routing_key})", + f"Received from {self.queues[0].name} ({method.routing_key})", RK.LOG_INFO, + body_json=body, ) if self._is_system_status_check(body_json=body, properties=properties): diff --git a/nlds_processors/nlds_worker.py b/nlds_processors/nlds_worker.py index d0a04499..6e8d2db5 100644 --- a/nlds_processors/nlds_worker.py +++ b/nlds_processors/nlds_worker.py @@ -307,9 +307,9 @@ def callback( body_json = json.loads(body) self.log( - f"Received {json.dumps(body_json, indent=4)} \nwith " - f"routing_key: {method.routing_key}", + f"Received with routing_key: {method.routing_key}", RK.LOG_INFO, + body_json=body_json ) # If received system test message, reply to it (this is for system status check) diff --git a/nlds_processors/transferers/base_transfer.py b/nlds_processors/transferers/base_transfer.py index 464e5b45..a2f04924 100644 --- a/nlds_processors/transferers/base_transfer.py +++ b/nlds_processors/transferers/base_transfer.py @@ -62,9 +62,9 @@ def _callback_common(self, cm, method, properties, body, connection): return False self.log( - f"Received {json.dumps(self.body_json, indent=4)} from " - f"{self.queues[0].name} ({method.routing_key})", + f"Received from {self.queues[0].name} ({method.routing_key})", RK.LOG_DEBUG, + body_json=self.body_json ) # Verify routing key is appropriate