Skip to content

Commit

Permalink
Merge branch 'main' into development
Browse files Browse the repository at this point in the history
  • Loading branch information
nmassey001 authored Feb 17, 2025
2 parents 0395457 + 430b053 commit fa61e10
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 20 deletions.
2 changes: 1 addition & 1 deletion nlds/routers/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ async def put(
status_code=status.HTTP_400_BAD_REQUEST, detail=response_error.json()
)

if user not in ("jleland", "nrmassey", "nlds"):
if user not in ("nlds"):
response_error = ResponseError(
loc=["files", "archive"],
msg=("archive action is admin-only"),
Expand Down
11 changes: 10 additions & 1 deletion nlds_processors/archive/archive_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,17 @@ def callback(self, ch, method, properties, body, connection):
scraping, then runs the appropriate transfer function.
"""
if not self._callback_common(ch, method, properties, body, connection):
# fail all files if callback common fails
rk_transfer_failed = ".".join(
[self.rk_parts[0], self.rk_parts[1], RK.FAILED]
)
for file in self.filelist:
file.failure_reason = 'Failed in archive transfer init'

self.send_pathlist(
self.filelist, rk_transfer_failed, self.body_json, state=State.FAILED
)
return

# get tape_url for those routes that need it
if self.rk_parts[2] in [RK.START, RK.PREPARE, RK.PREPARE_CHECK]:
try:
Expand Down
16 changes: 8 additions & 8 deletions nlds_processors/archive/s3_to_tarfile_tape.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,26 +377,26 @@ def _verify_tape_server(self):
status, _ = self.tape_client.ping()
if status.status != 0:
msg = (
f"Failed status message: {status.message}. ",
f"Could not ping cta server at {self.tape_server_url}.",
f"Failed status message: {status.message}. "
f"Could not ping cta server at {self.tape_server_url}."
)
raise S3StreamError(msg)

# Stat the base directory and check it's a directory.
status, resp = self.tape_client.stat(self.tape_base_dir)
if status.status != 0:
msg = (
f"Failed status message: {status.message}. ",
f"Base dir {self.tape_base_dir} could not be statted",
f"Failed status message: {status.message}. "
f"Base dir {self.tape_base_dir} could not be statted"
)
raise S3StreamError(msg)
# Check whether the flag indicates it's a directory
elif not bool(resp.flags & StatInfoFlags.IS_DIR):
msg = (
f"Failed status message: {status.message}. ",
f"Full status object: {status}. ",
f"Stat result for base dir {self.tape_base_dir} ",
f"indicates it is not a directory.",
f"Failed status message: {status.message}. "
f"Full status object: {status}. "
f"Stat result for base dir {self.tape_base_dir} "
f"indicates it is not a directory."
)
raise S3StreamError(msg)

Expand Down
4 changes: 2 additions & 2 deletions nlds_processors/archive/send_archive_next.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ def send_archive_next():
MSG.SECRET_KEY: None,
MSG.TAPE_URL: None,
MSG.TENANCY: None,
MSG.USER: "admin-placeholder",
MSG.GROUP: "admin-placeholder"
}
# Load any cronjob config, if present
cronjob_config = DEFAULT_CONFIG
Expand All @@ -40,8 +42,6 @@ def send_archive_next():
MSG.DETAILS: {
MSG.TRANSACT_ID: str(uuid4()),
MSG.SUB_ID: str(uuid4()),
MSG.USER: "admin-placeholder",
MSG.GROUP: "admin-placeholder",
MSG.TARGET: None,
MSG.API_ACTION: "archive-put",
MSG.JOB_LABEL: "archive-next",
Expand Down
1 change: 1 addition & 0 deletions nlds_processors/catalog/catalog_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -901,6 +901,7 @@ def _catalog_archive_put(self, body: Dict, rk_origin: str) -> None:
# Forward successful file details to archive for tape write
rk_complete = ".".join([rk_origin, RK.CATALOG_ARCHIVE_NEXT, RK.COMPLETE])

# the user and group need to be set here
body[MSG.DETAILS][MSG.USER] = next_holding.user
body[MSG.DETAILS][MSG.GROUP] = next_holding.group
body[MSG.META][MSG.HOLDING_ID] = next_holding.id
Expand Down
6 changes: 5 additions & 1 deletion nlds_processors/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ def _scan(
body_json: Dict[str, Any],
) -> None:
# First change user and group so file permissions can be checked
self.set_ids(body_json)
try:
self.set_ids(body_json)
except KeyError:
self.log("Problem running set_ids, exiting _scan.", RK.LOG_ERROR)
return

# Append routing info and then run the index
body_json = self.append_route_info(body_json)
Expand Down
22 changes: 18 additions & 4 deletions nlds_processors/transfer/base_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,25 +83,29 @@ def _callback_common(self, cm, method, properties, body, connection):
self.transaction_id = self.body_json[MSG.DETAILS][MSG.TRANSACT_ID]
except KeyError:
self.log("Transaction id unobtainable, exiting callback.", RK.LOG_ERROR)
return
return False

try:
self.filelist = self.parse_filelist(self.body_json)
except TypeError as e:
self.log("Filelist not parseable, exiting callback", RK.LOG_ERROR)
return
return False

try:
(self.access_key, self.secret_key, self.tenancy) = (
self.get_objectstore_config(self.body_json)
)
except TransferError:
self.log("Objectstore config unobtainable, exiting callback.", RK.LOG_ERROR)
return
return False

# Set uid and gid from message contents
self.log("Setting uid and gids now.", RK.LOG_INFO)
self.set_ids(self.body_json)
try:
self.set_ids(self.body_json)
except KeyError as e:
self.log("Problem running set_ids, exiting callback", RK.LOG_ERROR)
return False

# Append route info to message to track the route of the message
self.body_json = self.append_route_info(self.body_json)
Expand All @@ -110,6 +114,16 @@ def _callback_common(self, cm, method, properties, body, connection):
def callback(self, ch, method, properties, body, connection):

if not self._callback_common(ch, method, properties, body, connection):
# fail all files if callback common fails
rk_transfer_failed = ".".join(
[self.rk_parts[0], self.rk_parts[1], RK.FAILED]
)
for file in self.filelist:
file.failure_reason = 'Failed in transfer init'

self.send_pathlist(
self.filelist, rk_transfer_failed, self.body_json, state=State.FAILED
)
return

# API-methods that have an INITIATE phase will split the files across
Expand Down
9 changes: 6 additions & 3 deletions nlds_processors/transfer/get_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import minio
from minio.error import S3Error
from retry import retry
from urllib3.exceptions import HTTPError

from nlds_processors.transfer.base_transfer import BaseTransferConsumer
from nlds.rabbit.consumer import State
Expand Down Expand Up @@ -203,10 +204,12 @@ def _transfer_files(
# build the routing keys
rk_complete = ".".join([rk_origin, RK.TRANSFER_GET, RK.COMPLETE])
rk_failed = ".".join([rk_origin, RK.TRANSFER_GET, RK.FAILED])
# set the ids for the
# set the ids for the files
if self.chown_fl:
self.set_ids(body_json)

try:
self.set_ids(body_json)
except KeyError as e:
self.log("Problem running set_ids in _transfer_files", RK.LOG_ERROR)
# Create client
self.client = minio.Minio(
tenancy,
Expand Down

0 comments on commit fa61e10

Please sign in to comment.