diff --git a/nlds/details.py b/nlds/details.py index 3a1ae3db..5fba1799 100644 --- a/nlds/details.py +++ b/nlds/details.py @@ -1,4 +1,3 @@ - # encoding: utf-8 """ details.py @@ -17,7 +16,6 @@ import stat import os from os import stat_result -from urllib.parse import urlunsplit from pydantic import BaseModel @@ -94,7 +92,11 @@ class PathLocations(BaseModel): locations: Optional[List[LocationType]] = [] def add(self, location: PathLocation) -> None: - assert location.storage_type not in self.locations + if location.storage_type in self.locations: + raise PathDetailsError( + f"PathLocations already contains a PathLocation of the type " + f"{location.storage_type}" + ) self.count += 1 self.locations.append(location) @@ -107,10 +109,10 @@ def to_json(self) -> Dict: for l in self.locations: out_dict[l.storage_type] = l.to_dict() return {MSG.STORAGE_LOCATIONS: out_dict} - + def has_storage_type(self, storage_type): """Determine whether the path locations contains a specific storage_type - storage_type = MSG.OBJECT_STORAGE | MSG.TAPE""" + storage_type = MSG.OBJECT_STORAGE | MSG.TAPE""" for l in self.locations: if l.storage_type == storage_type: return True @@ -190,7 +192,7 @@ def from_filemodel(cls, file: Enum): pd.user = file.user pd.group = file.group pd.permissions = file.file_permissions - + # copy the storage locations pd.locations = PathLocations() for fl in file.locations: @@ -216,8 +218,9 @@ def stat(self, stat_result: stat_result = None): if not stat_result: stat_result = self.path.lstat() - # Include this assertion so mypy knows it's definitely a stat_result - assert stat_result is not None + # Include this check so mypy knows it's definitely a stat_result + if stat_result is None: + raise ValueError(f"stat_result is None for path {self.path}") self.mode = stat_result.st_mode # only for internal use @@ -275,7 +278,7 @@ def _get_location(self, location_type: str): if pl.storage_type == location_type: return pl return None - + def set_object_store(self, tenancy: str, bucket: str) -> None: """Set the OBJECT_STORAGE details for the file. This allows the object name to then be derived programmatically using a @@ -325,26 +328,7 @@ def object_name(self) -> str | None: else: object_name = f"{pl.path}" return object_name - - @property - def url(self) -> str | None: - """Get the 1st object storage location and return the url - url = f"{} - """ - pl = self._get_location(MSG.OBJECT_STORAGE) - if pl is None: - return None - else: - return urlunsplit( - ( - pl.url_scheme, - pl.url_netloc, - f"nlds.{pl.root}/{self.path}", - "", - "", - ) - ) - + def set_tape(self, server: str, tapepath: str, tarfile: str) -> None: """Set the TAPE details for the file. This allows the tape name to then be derived programmatically using a @@ -362,11 +346,11 @@ def set_tape(self, server: str, tapepath: str, tarfile: str) -> None: url_scheme="root", url_netloc=server, root=tapepath, - path=tarfile + path=tarfile, ) self.locations.add(pl) return pl - + def get_tape(self) -> PathLocation | None: """Get the PathLocation for the tape file.""" # note - this only returns the first object - this is fine for now, but might @@ -375,7 +359,7 @@ def get_tape(self) -> PathLocation | None: if pl.storage_type == MSG.TAPE: return pl return None - + @property def tape_name(self) -> str | None: pl = self._get_location(MSG.TAPE) diff --git a/nlds/rabbit/consumer.py b/nlds/rabbit/consumer.py index 261eae05..dcf5f37c 100644 --- a/nlds/rabbit/consumer.py +++ b/nlds/rabbit/consumer.py @@ -15,11 +15,7 @@ import traceback from typing import Dict, List, Any import pathlib as pth -from datetime import datetime, timedelta import uuid -import json -from json.decoder import JSONDecodeError -from urllib3.exceptions import HTTPError import signal import threading as thr @@ -30,7 +26,6 @@ from pika.amqp_object import Method from pika.spec import Channel from pydantic import BaseModel -from minio.error import S3Error import nlds.rabbit.routing_keys as RK import nlds.rabbit.message_keys as MSG @@ -38,6 +33,7 @@ from nlds.rabbit.publisher import RabbitMQPublisher as RMQP import nlds.server_config as CFG from nlds.details import PathDetails +from nlds.errors import MessageError logger = logging.getLogger("nlds.root") @@ -101,15 +97,6 @@ def __setup_queues(self, queue: str = None): except ValueError as e: raise Exception(e) - print("Using default queue config - only fit for testing purposes.") - self.name = self.DEFAULT_QUEUE_NAME - self.queues = [ - RabbitQueue.from_defaults( - self.DEFAULT_QUEUE_NAME, - self.DEFAULT_EXCHANGE_NAME, - self.DEFAULT_ROUTING_KEY, - ) - ] def __init__(self, queue: str = None, setup_logging_fl=False): super().__init__(name=queue, setup_logging_fl=False) @@ -211,7 +198,8 @@ def load_config_value(self, config_option: str, path_listify_fl: bool = False): # Make sure returned value is a list and not a string # Note: it can't be any other iterable because it's loaded # from a json - assert isinstance(return_val_list, list) + if not isinstance(return_val_list, list): + raise MessageError("Return value is not a valid list") return_val = [pth.Path(item) for item in return_val_list] except KeyError: self.log( diff --git a/nlds/utils/permissions.py b/nlds/utils/permissions.py index 46c3a4da..9b7bb324 100644 --- a/nlds/utils/permissions.py +++ b/nlds/utils/permissions.py @@ -37,11 +37,12 @@ def check_permissions( stat_result = os.lstat(path) elif stat_result is None and path is None: raise ValueError( - "Neither path nor a valid stat result of a path were " - "given so cannot continue. One is required." + "Neither path nor a valid stat result of a path were given so cannot " + "continue. One is required." ) - assert stat_result is not None + if stat_result is None: + raise ValueError(f"stat_result is None for path: {path}") # Get file permissions mask from stat result mode = stat_result.st_mode & 0o777 diff --git a/nlds_processors/archiver/archive_get.py b/nlds_processors/archiver/archive_get.py index 84523306..c6558848 100644 --- a/nlds_processors/archiver/archive_get.py +++ b/nlds_processors/archiver/archive_get.py @@ -107,10 +107,29 @@ def transfer( rk_origin: str, body_json: Dict[str, Any], ): - print(rk_origin) - retrieval_json = body_json[MSG.DATA][MSG.RETRIEVAL_FILELIST] - print(retrieval_json) - raise NotImplementedError + # Make the routing keys + rk_complete = ".".join([rk_origin, RK.ARCHIVE_GET, RK.COMPLETE]) + rk_failed = ".".join([rk_origin, RK.ARCHIVE_GET, RK.FAILED]) + + # let's reject all requests + for path_details in filelist: + path_details.failure_reason = "Testing reject" + self.failedlist.append(path_details) + + if len(self.failedlist) > 0: + # Send message back to worker so catalog can be scrubbed of failed puts + self.send_pathlist( + self.failedlist, + rk_failed, + body_json, + state=State.FAILED, + ) + return + + # print(rk_origin) + # retrieval_json = body_json[MSG.DATA][MSG.RETRIEVAL_FILELIST] + # print(retrieval_json) + # raise NotImplementedError # Can call this as the url has been verified previously tape_server, tape_base_dir = self.split_tape_url(tape_url) diff --git a/nlds_processors/archiver/s3_to_tarfile_disk.py b/nlds_processors/archiver/s3_to_tarfile_disk.py index 96e3da4d..323e72d3 100644 --- a/nlds_processors/archiver/s3_to_tarfile_disk.py +++ b/nlds_processors/archiver/s3_to_tarfile_disk.py @@ -60,7 +60,8 @@ def put( self, holding_prefix: str, filelist: List[PathDetails], chunk_size: int ) -> tuple[List[PathDetails], List[PathDetails], str, int]: """Stream from Object Store to a tarfile on disk""" - assert self.filelist == [] + if self.filelist != []: + raise ValueError(f"self.filelist is not None: {self.filelist[0]}") self.filelist = filelist self.holding_prefix = holding_prefix # self._generate_filelist_hash and self._check_files_exist use the member @@ -138,16 +139,21 @@ def put( @property def holding_diskpath(self): """Get the holding diskpath (i.e. the enclosing directory) on the DISKTAPE""" - assert self.disk_loc - assert self.holding_prefix + if not self.disk_loc: + raise ValueError("self.disk_lock is None") + if not self.holding_prefix: + raise ValueError("self.holding_prefix is None") return f"{self.disk_loc}/{self.holding_prefix}" @property def tarfile_diskpath(self): """Get the holding diskpath (i.e. the enclosing directory) on the DISKTAPE""" - assert self.disk_loc - assert self.holding_prefix - assert self.filelist_hash + if not self.disk_loc: + raise ValueError("self.disk_lock is None") + if not self.holding_prefix: + raise ValueError("self.holding_prefix is None") + if not self.filelist_hash: + raise ValueError("self.filelist_hash is None") return f"{self.disk_loc}/{self.holding_prefix}/{self.filelist_hash}.tar" def _validate_tarfile_checksum(self, tarfile_checksum: str): @@ -157,9 +163,7 @@ def _validate_tarfile_checksum(self, tarfile_checksum: str): with open(self.tarfile_diskpath, "rb") as fh: while data := fh.read(): asum = adler32(data, asum) - try: - assert asum == tarfile_checksum - except AssertionError as e: + if asum != tarfile_checksum: reason = ( f"Checksum {asum} differs from that calculated during streaming " f"upload {tarfile_checksum}." diff --git a/nlds_processors/archiver/s3_to_tarfile_stream.py b/nlds_processors/archiver/s3_to_tarfile_stream.py index d9651f09..71233a4e 100644 --- a/nlds_processors/archiver/s3_to_tarfile_stream.py +++ b/nlds_processors/archiver/s3_to_tarfile_stream.py @@ -54,7 +54,8 @@ def _generate_filelist_hash(self): # Generate a name for the tarfile by hashing the combined filelist. # Length of the hash will be 16. # NOTE: this breaks if a problem file is removed from an aggregation - assert self.filelist != [] + if self.filelist == []: + raise ValueError("self.filelist is empty") filenames = [f.original_path for f in self.filelist] filelist_hash = shake_256("".join(filenames).encode()).hexdigest(8) return filelist_hash @@ -75,57 +76,62 @@ def _check_files_exist(self): "Could not unpack bucket and object info from path_details" ) failed_list.append(path_details) + continue + try: # Check the bucket exists - assert self.s3_client.bucket_exists(check_bucket) - except AssertionError as e: - path_details.failure_reason = ( - f"Could not verify that bucket {check_bucket} exists before " - f"writing to tape." - ) - failed_list.append(path_details) + if not self.s3_client.bucket_exists(check_bucket): + path_details.failure_reason = ( + f"Could not verify that bucket {check_bucket} exists before " + f"writing to tape." + ) + failed_list.append(path_details) + continue except (S3Error, HTTPError) as e: path_details.failure_reason = ( f"Could not verify that bucket {check_bucket} exists before " f"writing to tape. Original exception: {e}" ) failed_list.append(path_details) - + continue + try: # Check that the object is in the bucket and the names match obj_stat_result = self.s3_client.stat_object(check_bucket, check_object) - assert check_object == obj_stat_result.object_name - except AssertionError: - path_details.failure_reason = ( - f"Could not verify file {check_bucket}:{check_object} before " - f"writing to tape. File name differs between original name and " - f"object name." - ) - failed_list.append(path_details) + if check_object != obj_stat_result.object_name: + path_details.failure_reason = ( + f"Could not verify file {check_bucket}:{check_object} before " + f"writing to tape. File name differs between original name and " + f"object name." + ) + failed_list.append(path_details) + continue except (S3Error, HTTPError) as e: path_details.failure_reason = ( f"Could not verify file {check_bucket}:{check_object} exists " f"before writing to tape. Original exception {e}." ) failed_list.append(path_details) - + continue + try: # Check that the object is in the bucket and the names match obj_stat_result = self.s3_client.stat_object(check_bucket, check_object) - assert path_details.size == obj_stat_result.size - except AssertionError: - path_details.failure_reason = ( - f"Could not verify file {check_bucket}:{check_object} before " - f"writing to tape. File size differs between original size and " - f"object size." - ) - failed_list.append(path_details) + if path_details.size != obj_stat_result.size: + path_details.failure_reason = ( + f"Could not verify file {check_bucket}:{check_object} before " + f"writing to tape. File size differs between original size and " + f"object size." + ) + failed_list.append(path_details) + continue except (S3Error, HTTPError) as e: path_details.failure_reason = ( f"Could not verify that file {check_bucket}:{check_object} exists " f"before writing to tape. Original exception {e}." ) failed_list.append(path_details) + continue return [], failed_list def _stream_to_fileobject( diff --git a/nlds_processors/archiver/s3_to_tarfile_tape.py b/nlds_processors/archiver/s3_to_tarfile_tape.py index b5c4c73c..a8aa63d0 100644 --- a/nlds_processors/archiver/s3_to_tarfile_tape.py +++ b/nlds_processors/archiver/s3_to_tarfile_tape.py @@ -64,7 +64,8 @@ def put(self, holding_prefix: str, filelist: List[PathDetails]): The holding_prefix is calculated from the message body in the archive_put or archive_get worker process. """ - assert self.filelist == [] + if self.filelist != []: + raise ValueError("self.filelist is not empty") self.filelist = filelist self.holding_prefix = holding_prefix @@ -127,26 +128,35 @@ def put(self, holding_prefix: str, filelist: List[PathDetails]): def holding_tapepath(self): """Get the holding tapepath (i.e. the enclosing directory), to be used with XRDClient functions on that directory.""" - assert self.tape_base_dir - assert self.holding_prefix + if not self.tape_base_dir: + raise ValueError("self.tape_base_dir is None") + if not self.holding_prefix: + raise ValueError("self.holding_prefix is None") return f"{self.tape_base_dir}/{self.holding_prefix}" @property def tarfile_tapepath(self): """Get the tapepath of the tar file, to be used with the XRDClient functions.""" - assert self.tape_base_dir - assert self.holding_prefix - assert self.filelist_hash + if not self.tape_base_dir: + raise ValueError("self.tape_base_dir is None") + if not self.holding_prefix: + raise ValueError("self.holding_prefix is None") + if not self.filelist_hash: + raise ValueError("self.filelist_hash is None") return f"{self.tape_base_dir}/{self.holding_prefix}/{self.filelist_hash}.tar" @property def tarfile_absolute_tapepath(self): """Get the absolute tapepath of the tar file, to be used with the XRDClient. File functions / constructor, i.e. for the object that is to be streamed to.""" - assert self.tape_base_dir - assert self.holding_prefix - assert self.filelist_hash - assert self.tape_server_url + if not self.tape_base_dir: + raise ValueError("self.tape_base_dir is None") + if not self.holding_prefix: + raise ValueError("self.holding_prefix is None") + if not self.filelist_hash: + raise ValueError("self.filelist_hash is None") + if not self.tape_server_url: + raise ValueError("self.tape_server_url is None") return ( f"root://{self.tape_server_url}/{self.tape_base_dir}/" f"{self.holding_prefix}/{self.filelist_hash}.tar" @@ -240,25 +250,25 @@ def _validate_tarfile_checksum(self, tarfile_checksum: str): else: try: method, value = result.decode().split() - assert method == "adler32" + if method != "adler32": + raise ValueError("method is not adler32") # Convert checksum from hex to int for comparison checksum = int(value[:8], 16) - assert(checksum == tarfile_checksum) + if checksum != tarfile_checksum: + # If it fails at this point then attempt to delete. It will be + # scheduled to happend again, so long as the files are added to + # failed_list + reason = ( + f"XRootD checksum {checksum} differs from that calculated during " + f"streaming upload {tarfile_checksum}." + ) + self.log(reason, RK.LOG_ERROR) + raise S3StreamError( + f"Failure occurred during tape-write " f"({reason})." + ) except ValueError as e: self.log( f"Exception {e} when attempting to parse tarfile checksum from " f"xrootd", RK.LOG_ERROR, - ) - except AssertionError as e: - # If it fails at this point then attempt to delete. It will be - # scheduled to happend again, so long as the files are added to - # failed_list - reason = ( - f"XRootD checksum {checksum} differs from that calculated during " - f"streaming upload {tarfile_checksum}." - ) - self.log(reason, RK.LOG_ERROR) - raise S3StreamError( - f"Failure occurred during tape-write " f"({reason})." - ) + ) \ No newline at end of file diff --git a/nlds_processors/catalog/catalog.py b/nlds_processors/catalog/catalog.py index 6c8c2924..d07ada00 100644 --- a/nlds_processors/catalog/catalog.py +++ b/nlds_processors/catalog/catalog.py @@ -68,7 +68,8 @@ def get_holding( tag: dict = None, ) -> List[Holding]: """Get a holding from the database""" - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") try: # build holding query bit by bit holding_q = self.session.query(Holding).filter(Holding.group == group) @@ -156,7 +157,8 @@ def get_holding( def create_holding(self, user: str, group: str, label: str) -> Holding: """Create the new Holding with the label, user, group""" - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") try: holding = Holding(label=label, user=user, group=group) self.session.add(holding) @@ -175,7 +177,8 @@ def modify_holding( del_tags: dict = None, ) -> Holding: """Find a holding and modify the information in it""" - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") if not isinstance(holding, Holding): raise CatalogError( f"Cannot modify holding, it does not appear to be a valid " @@ -219,7 +222,8 @@ def get_transaction( self, id: int = None, transaction_id: str = None ) -> Transaction: """Get a transaction from the database""" - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") try: if transaction_id: transaction = ( @@ -246,7 +250,8 @@ def get_location_file(self, location: Location) -> File: """Get a File but from the other end of the database tree, starting from a location. """ - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") try: file_ = ( self.session.query(File) @@ -259,7 +264,8 @@ def get_location_file(self, location: Location) -> File: def create_transaction(self, holding: Holding, transaction_id: str) -> Transaction: """Create a transaction that belongs to a holding and will contain files""" - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") try: transaction = Transaction( holding_id=holding.id, @@ -279,7 +285,8 @@ def _user_has_get_file_permission(self, user: str, group: str, file: File) -> bo """Check whether a user has permission to access a file. Later, when we implement the ROLES this function will be a lot more complicated!""" - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") holding = ( self.session.query(Holding) .filter( @@ -310,7 +317,8 @@ def get_files( ) -> list: """Get a multitude of file details from the database, given the user, group, label, holding_id, path (can be regex) or tag(s)""" - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") # Nones are set to .* in the regexp matching # get the matching holdings first, these match all but the path holding = self.get_holding( @@ -390,7 +398,8 @@ def create_file( ) -> File: """Create a file that belongs to a transaction and will contain locations""" - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") try: new_file = File( transaction_id=transaction.id, @@ -425,7 +434,8 @@ def delete_files( matching files will. Utilises get_files(). """ - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") files = self.get_files( user, @@ -458,7 +468,8 @@ def delete_files( def get_location(self, file: File, storage_type: Enum) -> Location: """Get a storage location for a file, given the file and the storage type""" - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") try: location = ( self.session.query(Location) @@ -486,7 +497,8 @@ def create_location( aggregation: Aggregation = None, ) -> Location: """Add the storage location for either object storage or tape""" - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") if aggregation is None: aggregation_id = None else: @@ -534,7 +546,8 @@ def delete_location(self, file: File, storage_type: Enum) -> None: def create_tag(self, holding: Holding, key: str, value: str): """Create a tag and add it to a holding""" - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") try: tag = Tag(key=key, value=value, holding_id=holding.id) self.session.add(tag) @@ -545,7 +558,8 @@ def create_tag(self, holding: Holding, key: str, value: str): def get_tag(self, holding: Holding, key: str): """Get the tag with a specific key""" - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") try: tag = ( self.session.query(Tag) @@ -559,7 +573,8 @@ def get_tag(self, holding: Holding, key: str): def modify_tag(self, holding: Holding, key: str, value: str): """Modify a tag that has the key, with a new value. Tag has to exist, current value will be overwritten.""" - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") try: tag = ( self.session.query(Tag) @@ -573,7 +588,8 @@ def modify_tag(self, holding: Holding, key: str, value: str): def delete_tag(self, holding: Holding, key: str): """Delete a tag that has the key""" - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") # use a checkpoint as the tags are being deleted in an external loop and # using a checkpoint will ensure that any completed deletes are committed checkpoint = self.session.begin_nested() @@ -593,7 +609,8 @@ def create_aggregation( self, tarname: str, checksum: str = None, algorithm: str = None ) -> Aggregation: """Create an aggregation of files to write to tape as a tar file""" - assert self.session is not None + if self.session is None: + raise RuntimeError("self.session is None") try: aggregation = Aggregation( tarname=tarname, @@ -612,7 +629,8 @@ def create_aggregation( def get_aggregation(self, aggregation_id: int) -> Aggregation: """Simple function for getting of Aggregation from aggregation_id.""" - assert self.session is not None + if self.session is None: + raise RuntimeError("self.session is None") try: # Get the aggregation for a particular file via it's tape location aggregation = ( @@ -642,7 +660,8 @@ def delete_aggregation(self, aggregation: Aggregation) -> None: def get_next_unarchived_holding(self) -> Holding: """The principal function for getting the next unarchived holding to archive aggregate.""" - assert self.session is not None + if self.session is None: + raise RuntimeError("self.session is None") try: # Get all archived holdings archived_holdings_q = self.session.query(Holding.id).filter( @@ -672,7 +691,8 @@ def get_next_unarchived_holding(self) -> Holding: def get_unarchived_files(self, holding: Holding) -> List[File]: """The principal function for getting unarchived files to aggregate and send to archive put.""" - assert self.session is not None + if self.session is None: + raise RuntimeError("self.session is None") try: # Get all files for the given holding. Again we have to ensure that the # transfer to object storage has completed and the files are not diff --git a/nlds_processors/catalog/catalog_models.py b/nlds_processors/catalog/catalog_models.py index 59436033..0f1b9ace 100644 --- a/nlds_processors/catalog/catalog_models.py +++ b/nlds_processors/catalog/catalog_models.py @@ -12,6 +12,9 @@ """Declare the SQLAlchemy ORM models for the NLDS Catalog database""" +import enum +from urllib.parse import urlunsplit + from sqlalchemy import ( Integer, String, @@ -20,12 +23,13 @@ Enum, BigInteger, UniqueConstraint, - Boolean, + Boolean ) + from sqlalchemy import ForeignKey from sqlalchemy.orm import declarative_base, relationship -import enum + from nlds.details import PathType from nlds_processors.catalog.catalog_error import CatalogError import nlds.rabbit.message_keys as MSG @@ -153,6 +157,9 @@ def from_str(cls, storage_type: str): return cls(cls.TAPE) else: raise CatalogError(f"{storage_type}: unknown Storage_Type in Storage") + + def __str__(self): + return [MSG.OBJECT_STORAGE, MSG.TAPE][self.value-1] class Location(CatalogBase): @@ -183,6 +190,25 @@ class Location(CatalogBase): # each location __table_args__ = (UniqueConstraint("storage_type", "file_id"),) + @property + def url(self) -> str | None: + """Get the 1st object storage location and return the url + url = f"{} + """ + if self.storage_type == Storage.OBJECT_STORAGE: + # only object storage returns a URL + return urlunsplit( + ( + self.url_scheme, + self.url_netloc, + f"nlds.{self.root}/{self.path}", + "", + "", + ) + ) + else: + return "" + class Checksum(CatalogBase): """Class containing checksum and algorithm used to calculate checksum""" diff --git a/nlds_processors/catalog/catalog_worker.py b/nlds_processors/catalog/catalog_worker.py index fe23bd04..5b8d1ae6 100644 --- a/nlds_processors/catalog/catalog_worker.py +++ b/nlds_processors/catalog/catalog_worker.py @@ -963,7 +963,7 @@ def _catalog_archive_remove( # functions above handled message logging, here we just return return - if holding_id is None and filelist is None: + if holding_id is None and filelist_ is None: self.log( "No method for identifying a filelist provided, exit callback.", RK.LOG_ERROR, @@ -1003,7 +1003,7 @@ def _catalog_archive_remove( self.completelist.append(f) else: f.failure_reason = ( - f"{storage_type.value} location not empty details" + f"{str(storage_type.value)} location not empty details" ) self.failedlist.append(f) @@ -1306,7 +1306,7 @@ def _catalog_find(self, body: Dict, properties: Header) -> None: "root": l.root, "path": l.path, "access_time": format_datetime(l.access_time), - "url": pd.url, + "url": l.url, } locations.append(l_rec) # build the file record diff --git a/nlds_processors/monitor/monitor.py b/nlds_processors/monitor/monitor.py index 0fa66b0a..48b793a4 100644 --- a/nlds_processors/monitor/monitor.py +++ b/nlds_processors/monitor/monitor.py @@ -270,7 +270,8 @@ def create_warning( self, transaction_record: TransactionRecord, warning: str ) -> Warning: """Create a warning and add it to the TransactionRecord""" - assert self.session != None + if self.session is None: + raise RuntimeError("self.session is None") try: warning = Warning( warning=warning, transaction_record_id=transaction_record.id diff --git a/nlds_processors/monitor/monitor_worker.py b/nlds_processors/monitor/monitor_worker.py index 15dd961a..10bc15e8 100644 --- a/nlds_processors/monitor/monitor_worker.py +++ b/nlds_processors/monitor/monitor_worker.py @@ -90,8 +90,9 @@ def _parse_user(self, body): # get the user id from the details section of the message try: user = body[MSG.DETAILS][MSG.USER] - assert user is not None - except (KeyError, AssertionError): + if user is None: + raise ValueError + except (KeyError, ValueError): msg = "User not in message, exiting callback." self.log(msg, RK.LOG_ERROR) raise MonitorError(message=msg) @@ -101,8 +102,9 @@ def _parse_group(self, body): # get the group from the details section of the message try: group = body[MSG.DETAILS][MSG.GROUP] - assert group is not None - except (KeyError, AssertionError): + if group is None: + raise ValueError + except (KeyError, ValueError): msg = "Group not in message, exiting callback." self.log(msg, RK.LOG_ERROR) raise MonitorError(message=msg) diff --git a/nlds_processors/nlds_worker.py b/nlds_processors/nlds_worker.py index 7ee84df3..d0a04499 100644 --- a/nlds_processors/nlds_worker.py +++ b/nlds_processors/nlds_worker.py @@ -196,10 +196,15 @@ def _process_rk_catalog_get_archive_restore( new_routing_key = ".".join([RK.ROOT, RK.MONITOR_PUT, RK.START]) self.publish_and_log_message(new_routing_key, body_json) - # forward to archive_get - new_routing_key = ".".join([RK.ROOT, RK.ARCHIVE_GET, RK.INITIATE]) + # forward to archive_get - use START rather than INITIATE as we don't want + # and splitting to take place, as the messages are already sub-divided on + # aggregate and we want to pull the whole aggregate back from tape in a single + # call. (we don't want to pull the aggregate back multiple times, which is what + # would happen if we split the messages here) + queue = RK.ARCHIVE_GET + new_routing_key = ".".join([RK.ROOT, queue, RK.START]) self.log( - f"Sending message to {RK.ARCHIVE_GET} queue with routing key " + f"Sending message to {queue} queue with routing key " f"{new_routing_key}", RK.LOG_INFO, ) diff --git a/nlds_processors/transferers/base_transfer.py b/nlds_processors/transferers/base_transfer.py index 8e670076..464e5b45 100644 --- a/nlds_processors/transferers/base_transfer.py +++ b/nlds_processors/transferers/base_transfer.py @@ -111,6 +111,16 @@ def callback(self, ch, method, properties, body, connection): if not self._callback_common(ch, method, properties, body, connection): return + # API-methods that have an INITIATE phase will split the files across + # sub-messages to parallelise upload and download. + # These methods are: + # transfer-put : to parallelise upload to the object storage + # transfer-get : to parallelise download from the object storage + # archive-put : to form the aggregates on the tape + # Note: archive-get does not have an INITIATE phase. This is because the + # messages are already split across aggregates by catalog-get, and we only + # want to recall an aggregate once. + if self.rk_parts[2] == RK.INITIATE: self.log( "Aggregating list into more appropriately sized sub-lists for " diff --git a/nlds_processors/transferers/get_transfer.py b/nlds_processors/transferers/get_transfer.py index 71c83e69..ca4deb4b 100644 --- a/nlds_processors/transferers/get_transfer.py +++ b/nlds_processors/transferers/get_transfer.py @@ -61,7 +61,8 @@ def _get_target_path(self, body_json: Dict): def _get_and_check_bucket_name_object_name(self, path_details): """Get the bucket and object name and perform an existence check on the bucket""" - assert self.client is not None + if self.client is None: + raise RuntimeError("self.client is None") if path_details.bucket_name is not None: bucket_name = path_details.bucket_name @@ -116,7 +117,8 @@ def _get_download_path(self, path_details, target_path): return download_path def _transfer(self, bucket_name, object_name, download_path): - assert self.client is not None + if self.client is None: + raise RuntimeError("self.client is None") download_path_str = str(download_path) # Attempt the download! try: diff --git a/nlds_processors/transferers/put_transfer.py b/nlds_processors/transferers/put_transfer.py index 1372a707..52942dc9 100644 --- a/nlds_processors/transferers/put_transfer.py +++ b/nlds_processors/transferers/put_transfer.py @@ -37,7 +37,8 @@ def _get_bucket_name(transaction_id: str): return bucket_name def _make_bucket(self, transaction_id: str): - assert self.client is not None + if self.client is None: + raise RuntimeError("self.client is None") bucket_name = self._get_bucket_name(transaction_id) # Check that bucket exists, and create if not @@ -70,7 +71,8 @@ def _transfer_files( rk_failed = ".".join([rk_origin, RK.TRANSFER_PUT, RK.FAILED]) bucket_name = self._get_bucket_name(transaction_id) - assert self.client is not None + if self.client is None: + raise RuntimeError("self.client is None") for path_details in filelist: item_path = path_details.path diff --git a/nlds_processors/utils/aggregations.py b/nlds_processors/utils/aggregations.py index d43a723b..459828e7 100644 --- a/nlds_processors/utils/aggregations.py +++ b/nlds_processors/utils/aggregations.py @@ -58,7 +58,8 @@ def bin_files( # into our total size. target_bin_count = int(total_size / target_bin_size) - assert(target_bin_count is not None) + if target_bin_count is None: + raise ValueError("target_bin_count is None") # Make 2 lists, one being a list of lists dictating the bins, the # other being their sizes, so we're not continually recalculating it diff --git a/nlds_utils/view_holding_full.py b/nlds_utils/view_holding_full.py old mode 100644 new mode 100755 index 0fd2ce58..9e4c712c --- a/nlds_utils/view_holding_full.py +++ b/nlds_utils/view_holding_full.py @@ -96,7 +96,7 @@ def print_file(file: File): def print_location(location: Location): click.echo(f"{'':<8}+-+ {'id':<16}: {location.id}") - click.echo(f"{'':<8}{'':<4}{'type':<16}: {str(location.storage_type)[8:]}") + click.echo(f"{'':<8}{'':<4}{'type':<16}: {str(location.storage_type)}") click.echo(f"{'':<8}{'':<4}{'url scheme':<16}: {location.url_scheme}") click.echo(f"{'':<8}{'':<4}{'url netloc':<16}: {location.url_netloc}") click.echo(f"{'':<8}{'':<4}{'root':<16}: {location.root}")