Skip to content

Commit

Permalink
Removed the assert anti-pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
nmassey001 committed Oct 10, 2024
1 parent b3c5577 commit 32b63d6
Show file tree
Hide file tree
Showing 18 changed files with 234 additions and 153 deletions.
48 changes: 16 additions & 32 deletions nlds/details.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

# encoding: utf-8
"""
details.py
Expand All @@ -17,7 +16,6 @@
import stat
import os
from os import stat_result
from urllib.parse import urlunsplit

from pydantic import BaseModel

Expand Down Expand Up @@ -94,7 +92,11 @@ class PathLocations(BaseModel):
locations: Optional[List[LocationType]] = []

def add(self, location: PathLocation) -> None:
assert location.storage_type not in self.locations
if location.storage_type in self.locations:
raise PathDetailsError(
f"PathLocations already contains a PathLocation of the type "
f"{location.storage_type}"
)
self.count += 1
self.locations.append(location)

Expand All @@ -107,10 +109,10 @@ def to_json(self) -> Dict:
for l in self.locations:
out_dict[l.storage_type] = l.to_dict()
return {MSG.STORAGE_LOCATIONS: out_dict}

def has_storage_type(self, storage_type):
"""Determine whether the path locations contains a specific storage_type
storage_type = MSG.OBJECT_STORAGE | MSG.TAPE"""
storage_type = MSG.OBJECT_STORAGE | MSG.TAPE"""
for l in self.locations:
if l.storage_type == storage_type:
return True
Expand Down Expand Up @@ -190,7 +192,7 @@ def from_filemodel(cls, file: Enum):
pd.user = file.user
pd.group = file.group
pd.permissions = file.file_permissions

# copy the storage locations
pd.locations = PathLocations()
for fl in file.locations:
Expand All @@ -216,8 +218,9 @@ def stat(self, stat_result: stat_result = None):
if not stat_result:
stat_result = self.path.lstat()

# Include this assertion so mypy knows it's definitely a stat_result
assert stat_result is not None
# Include this check so mypy knows it's definitely a stat_result
if stat_result is None:
raise ValueError(f"stat_result is None for path {self.path}")

self.mode = stat_result.st_mode # only for internal use

Expand Down Expand Up @@ -275,7 +278,7 @@ def _get_location(self, location_type: str):
if pl.storage_type == location_type:
return pl
return None

def set_object_store(self, tenancy: str, bucket: str) -> None:
"""Set the OBJECT_STORAGE details for the file.
This allows the object name to then be derived programmatically using a
Expand Down Expand Up @@ -325,26 +328,7 @@ def object_name(self) -> str | None:
else:
object_name = f"{pl.path}"
return object_name

@property
def url(self) -> str | None:
"""Get the 1st object storage location and return the url
url = f"{}
"""
pl = self._get_location(MSG.OBJECT_STORAGE)
if pl is None:
return None
else:
return urlunsplit(
(
pl.url_scheme,
pl.url_netloc,
f"nlds.{pl.root}/{self.path}",
"",
"",
)
)


def set_tape(self, server: str, tapepath: str, tarfile: str) -> None:
"""Set the TAPE details for the file.
This allows the tape name to then be derived programmatically using a
Expand All @@ -362,11 +346,11 @@ def set_tape(self, server: str, tapepath: str, tarfile: str) -> None:
url_scheme="root",
url_netloc=server,
root=tapepath,
path=tarfile
path=tarfile,
)
self.locations.add(pl)
return pl

def get_tape(self) -> PathLocation | None:
"""Get the PathLocation for the tape file."""
# note - this only returns the first object - this is fine for now, but might
Expand All @@ -375,7 +359,7 @@ def get_tape(self) -> PathLocation | None:
if pl.storage_type == MSG.TAPE:
return pl
return None

@property
def tape_name(self) -> str | None:
pl = self._get_location(MSG.TAPE)
Expand Down
18 changes: 3 additions & 15 deletions nlds/rabbit/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
import traceback
from typing import Dict, List, Any
import pathlib as pth
from datetime import datetime, timedelta
import uuid
import json
from json.decoder import JSONDecodeError
from urllib3.exceptions import HTTPError
import signal
import threading as thr

Expand All @@ -30,14 +26,14 @@
from pika.amqp_object import Method
from pika.spec import Channel
from pydantic import BaseModel
from minio.error import S3Error

import nlds.rabbit.routing_keys as RK
import nlds.rabbit.message_keys as MSG
from nlds.rabbit.state import State
from nlds.rabbit.publisher import RabbitMQPublisher as RMQP
import nlds.server_config as CFG
from nlds.details import PathDetails
from nlds.errors import MessageError

logger = logging.getLogger("nlds.root")

Expand Down Expand Up @@ -101,15 +97,6 @@ def __setup_queues(self, queue: str = None):

except ValueError as e:
raise Exception(e)
print("Using default queue config - only fit for testing purposes.")
self.name = self.DEFAULT_QUEUE_NAME
self.queues = [
RabbitQueue.from_defaults(
self.DEFAULT_QUEUE_NAME,
self.DEFAULT_EXCHANGE_NAME,
self.DEFAULT_ROUTING_KEY,
)
]

def __init__(self, queue: str = None, setup_logging_fl=False):
super().__init__(name=queue, setup_logging_fl=False)
Expand Down Expand Up @@ -211,7 +198,8 @@ def load_config_value(self, config_option: str, path_listify_fl: bool = False):
# Make sure returned value is a list and not a string
# Note: it can't be any other iterable because it's loaded
# from a json
assert isinstance(return_val_list, list)
if not isinstance(return_val_list, list):
raise MessageError("Return value is not a valid list")
return_val = [pth.Path(item) for item in return_val_list]
except KeyError:
self.log(
Expand Down
7 changes: 4 additions & 3 deletions nlds/utils/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ def check_permissions(
stat_result = os.lstat(path)
elif stat_result is None and path is None:
raise ValueError(
"Neither path nor a valid stat result of a path were "
"given so cannot continue. One is required."
"Neither path nor a valid stat result of a path were given so cannot "
"continue. One is required."
)

assert stat_result is not None
if stat_result is None:
raise ValueError(f"stat_result is None for path: {path}")

# Get file permissions mask from stat result
mode = stat_result.st_mode & 0o777
Expand Down
27 changes: 23 additions & 4 deletions nlds_processors/archiver/archive_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,29 @@ def transfer(
rk_origin: str,
body_json: Dict[str, Any],
):
print(rk_origin)
retrieval_json = body_json[MSG.DATA][MSG.RETRIEVAL_FILELIST]
print(retrieval_json)
raise NotImplementedError
# Make the routing keys
rk_complete = ".".join([rk_origin, RK.ARCHIVE_GET, RK.COMPLETE])
rk_failed = ".".join([rk_origin, RK.ARCHIVE_GET, RK.FAILED])

# let's reject all requests
for path_details in filelist:
path_details.failure_reason = "Testing reject"
self.failedlist.append(path_details)

if len(self.failedlist) > 0:
# Send message back to worker so catalog can be scrubbed of failed puts
self.send_pathlist(
self.failedlist,
rk_failed,
body_json,
state=State.FAILED,
)
return

# print(rk_origin)
# retrieval_json = body_json[MSG.DATA][MSG.RETRIEVAL_FILELIST]
# print(retrieval_json)
# raise NotImplementedError

# Can call this as the url has been verified previously
tape_server, tape_base_dir = self.split_tape_url(tape_url)
Expand Down
22 changes: 13 additions & 9 deletions nlds_processors/archiver/s3_to_tarfile_disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def put(
self, holding_prefix: str, filelist: List[PathDetails], chunk_size: int
) -> tuple[List[PathDetails], List[PathDetails], str, int]:
"""Stream from Object Store to a tarfile on disk"""
assert self.filelist == []
if self.filelist != []:
raise ValueError(f"self.filelist is not None: {self.filelist[0]}")
self.filelist = filelist
self.holding_prefix = holding_prefix
# self._generate_filelist_hash and self._check_files_exist use the member
Expand Down Expand Up @@ -138,16 +139,21 @@ def put(
@property
def holding_diskpath(self):
"""Get the holding diskpath (i.e. the enclosing directory) on the DISKTAPE"""
assert self.disk_loc
assert self.holding_prefix
if not self.disk_loc:
raise ValueError("self.disk_lock is None")
if not self.holding_prefix:
raise ValueError("self.holding_prefix is None")
return f"{self.disk_loc}/{self.holding_prefix}"

@property
def tarfile_diskpath(self):
"""Get the holding diskpath (i.e. the enclosing directory) on the DISKTAPE"""
assert self.disk_loc
assert self.holding_prefix
assert self.filelist_hash
if not self.disk_loc:
raise ValueError("self.disk_lock is None")
if not self.holding_prefix:
raise ValueError("self.holding_prefix is None")
if not self.filelist_hash:
raise ValueError("self.filelist_hash is None")
return f"{self.disk_loc}/{self.holding_prefix}/{self.filelist_hash}.tar"

def _validate_tarfile_checksum(self, tarfile_checksum: str):
Expand All @@ -157,9 +163,7 @@ def _validate_tarfile_checksum(self, tarfile_checksum: str):
with open(self.tarfile_diskpath, "rb") as fh:
while data := fh.read():
asum = adler32(data, asum)
try:
assert asum == tarfile_checksum
except AssertionError as e:
if asum != tarfile_checksum:
reason = (
f"Checksum {asum} differs from that calculated during streaming "
f"upload {tarfile_checksum}."
Expand Down
58 changes: 32 additions & 26 deletions nlds_processors/archiver/s3_to_tarfile_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def _generate_filelist_hash(self):
# Generate a name for the tarfile by hashing the combined filelist.
# Length of the hash will be 16.
# NOTE: this breaks if a problem file is removed from an aggregation
assert self.filelist != []
if self.filelist == []:
raise ValueError("self.filelist is empty")
filenames = [f.original_path for f in self.filelist]
filelist_hash = shake_256("".join(filenames).encode()).hexdigest(8)
return filelist_hash
Expand All @@ -75,57 +76,62 @@ def _check_files_exist(self):
"Could not unpack bucket and object info from path_details"
)
failed_list.append(path_details)
continue

try:
# Check the bucket exists
assert self.s3_client.bucket_exists(check_bucket)
except AssertionError as e:
path_details.failure_reason = (
f"Could not verify that bucket {check_bucket} exists before "
f"writing to tape."
)
failed_list.append(path_details)
if not self.s3_client.bucket_exists(check_bucket):
path_details.failure_reason = (
f"Could not verify that bucket {check_bucket} exists before "
f"writing to tape."
)
failed_list.append(path_details)
continue
except (S3Error, HTTPError) as e:
path_details.failure_reason = (
f"Could not verify that bucket {check_bucket} exists before "
f"writing to tape. Original exception: {e}"
)
failed_list.append(path_details)

continue

try:
# Check that the object is in the bucket and the names match
obj_stat_result = self.s3_client.stat_object(check_bucket, check_object)
assert check_object == obj_stat_result.object_name
except AssertionError:
path_details.failure_reason = (
f"Could not verify file {check_bucket}:{check_object} before "
f"writing to tape. File name differs between original name and "
f"object name."
)
failed_list.append(path_details)
if check_object != obj_stat_result.object_name:
path_details.failure_reason = (
f"Could not verify file {check_bucket}:{check_object} before "
f"writing to tape. File name differs between original name and "
f"object name."
)
failed_list.append(path_details)
continue
except (S3Error, HTTPError) as e:
path_details.failure_reason = (
f"Could not verify file {check_bucket}:{check_object} exists "
f"before writing to tape. Original exception {e}."
)
failed_list.append(path_details)

continue

try:
# Check that the object is in the bucket and the names match
obj_stat_result = self.s3_client.stat_object(check_bucket, check_object)
assert path_details.size == obj_stat_result.size
except AssertionError:
path_details.failure_reason = (
f"Could not verify file {check_bucket}:{check_object} before "
f"writing to tape. File size differs between original size and "
f"object size."
)
failed_list.append(path_details)
if path_details.size != obj_stat_result.size:
path_details.failure_reason = (
f"Could not verify file {check_bucket}:{check_object} before "
f"writing to tape. File size differs between original size and "
f"object size."
)
failed_list.append(path_details)
continue
except (S3Error, HTTPError) as e:
path_details.failure_reason = (
f"Could not verify that file {check_bucket}:{check_object} exists "
f"before writing to tape. Original exception {e}."
)
failed_list.append(path_details)
continue
return [], failed_list

def _stream_to_fileobject(
Expand Down
Loading

0 comments on commit 32b63d6

Please sign in to comment.