From af19cb82732954fdd8d1016d84ecf11447c7510a Mon Sep 17 00:00:00 2001 From: Neil Massey Date: Mon, 3 Feb 2025 15:53:49 +0000 Subject: [PATCH 1/8] Added extra exception handling on bucket_exists --- nlds_processors/transfer/get_transfer.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/nlds_processors/transfer/get_transfer.py b/nlds_processors/transfer/get_transfer.py index eafd5f6..db69fea 100644 --- a/nlds_processors/transfer/get_transfer.py +++ b/nlds_processors/transfer/get_transfer.py @@ -16,6 +16,7 @@ import minio from minio.error import S3Error from retry import retry +from urllib3.exceptions import HTTPError from nlds_processors.transfer.base_transfer import BaseTransferConsumer from nlds.rabbit.consumer import State @@ -76,14 +77,29 @@ def _get_and_check_bucket_name_object_name(self, path_details): ) raise TransferError(message=reason) - if bucket_name and not self.client.bucket_exists(bucket_name): - # If bucket doesn't exist then pass for failure - reason = f"Bucket {bucket_name} does not exist" + # try to get the bucket - may throw exception if user does not have access + # permissions + try: + bucket_exists = self.client.bucket_exists(bucket_name) + except (S3Error, HTTPError) as e: + reason = ( + f"Could not verify that bucket {bucket_name} exists during get " + f"transfer. Original exception: {e}" + ) self.log( - f"{reason}. Adding {path_details.object_name} to failed list.", + f"{reason}. Adding {path_details.object_name} to failed list. ", RK.LOG_ERROR, ) raise TransferError(message=reason) + else: + if bucket_name and not bucket_exists: + # If bucket doesn't exist then pass for failure + reason = f"Bucket {bucket_name} does not exist" + self.log( + f"{reason}. Adding {path_details.object_name} to failed list.", + RK.LOG_ERROR, + ) + raise TransferError(message=reason) return bucket_name, object_name From d71472b90c1da9176dfc8a040c3811cbc5b38184 Mon Sep 17 00:00:00 2001 From: Neil Massey Date: Tue, 4 Feb 2025 11:40:29 +0000 Subject: [PATCH 2/8] Send user name during send_archive_next from server config file, and don't overwrite in the catalog --- nlds_processors/archive/send_archive_next.py | 4 ++-- nlds_processors/catalog/catalog_worker.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/nlds_processors/archive/send_archive_next.py b/nlds_processors/archive/send_archive_next.py index b181b1d..cd25915 100644 --- a/nlds_processors/archive/send_archive_next.py +++ b/nlds_processors/archive/send_archive_next.py @@ -30,6 +30,8 @@ def send_archive_next(): MSG.SECRET_KEY: None, MSG.TAPE_URL: None, MSG.TENANCY: None, + MSG.USER: "admin-placeholder", + MSG.GROUP: "admin-placeholder" } # Load any cronjob config, if present cronjob_config = DEFAULT_CONFIG @@ -40,8 +42,6 @@ def send_archive_next(): MSG.DETAILS: { MSG.TRANSACT_ID: str(uuid4()), MSG.SUB_ID: str(uuid4()), - MSG.USER: "admin-placeholder", - MSG.GROUP: "admin-placeholder", MSG.TARGET: None, MSG.API_ACTION: "archive-put", MSG.JOB_LABEL: "archive-next", diff --git a/nlds_processors/catalog/catalog_worker.py b/nlds_processors/catalog/catalog_worker.py index b8c0771..3aa194b 100644 --- a/nlds_processors/catalog/catalog_worker.py +++ b/nlds_processors/catalog/catalog_worker.py @@ -898,8 +898,8 @@ def _catalog_archive_put(self, body: Dict, rk_origin: str) -> None: # Forward successful file details to archive for tape write rk_complete = ".".join([rk_origin, RK.CATALOG_ARCHIVE_NEXT, RK.COMPLETE]) - body[MSG.DETAILS][MSG.USER] = next_holding.user - body[MSG.DETAILS][MSG.GROUP] = next_holding.group + # the user and group should be set as the NLDS user by the send_archive_next + # cronjob publisher body[MSG.META][MSG.HOLDING_ID] = next_holding.id if len(self.completelist) > 0: self.log( From d98f51c75f6b6b1600bbb57fa339e781822b39b1 Mon Sep 17 00:00:00 2001 From: Neil Massey Date: Tue, 4 Feb 2025 13:47:17 +0000 Subject: [PATCH 3/8] Removed neil and jack from list of admins --- nlds/routers/files.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nlds/routers/files.py b/nlds/routers/files.py index d9e3565..adde44a 100644 --- a/nlds/routers/files.py +++ b/nlds/routers/files.py @@ -412,7 +412,7 @@ async def put( status_code=status.HTTP_400_BAD_REQUEST, detail=response_error.json() ) - if user not in ("jleland", "nrmassey", "nlds"): + if user not in ("nlds"): response_error = ResponseError( loc=["files", "archive"], msg=("archive action is admin-only"), From af453f727341863d2f21b9d7e50a3b876a598136 Mon Sep 17 00:00:00 2001 From: Neil Massey Date: Tue, 4 Feb 2025 13:48:15 +0000 Subject: [PATCH 4/8] removed commas from error messages as it was creating lists --- nlds_processors/archive/s3_to_tarfile_tape.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/nlds_processors/archive/s3_to_tarfile_tape.py b/nlds_processors/archive/s3_to_tarfile_tape.py index 4328e1e..ecaf0e1 100644 --- a/nlds_processors/archive/s3_to_tarfile_tape.py +++ b/nlds_processors/archive/s3_to_tarfile_tape.py @@ -377,8 +377,8 @@ def _verify_tape_server(self): status, _ = self.tape_client.ping() if status.status != 0: msg = ( - f"Failed status message: {status.message}. ", - f"Could not ping cta server at {self.tape_server_url}.", + f"Failed status message: {status.message}. " + f"Could not ping cta server at {self.tape_server_url}." ) raise S3StreamError(msg) @@ -386,17 +386,17 @@ def _verify_tape_server(self): status, resp = self.tape_client.stat(self.tape_base_dir) if status.status != 0: msg = ( - f"Failed status message: {status.message}. ", - f"Base dir {self.tape_base_dir} could not be statted", + f"Failed status message: {status.message}. " + f"Base dir {self.tape_base_dir} could not be statted" ) raise S3StreamError(msg) # Check whether the flag indicates it's a directory elif not bool(resp.flags & StatInfoFlags.IS_DIR): msg = ( - f"Failed status message: {status.message}. ", - f"Full status object: {status}. ", - f"Stat result for base dir {self.tape_base_dir} ", - f"indicates it is not a directory.", + f"Failed status message: {status.message}. " + f"Full status object: {status}. " + f"Stat result for base dir {self.tape_base_dir} " + f"indicates it is not a directory." ) raise S3StreamError(msg) From 130e7d37c3b7e9f7a91bc43b69ceaed1c28aa949 Mon Sep 17 00:00:00 2001 From: Neil Massey Date: Wed, 5 Feb 2025 09:42:11 +0000 Subject: [PATCH 5/8] Better error handling if set_ids fails --- nlds_processors/transfer/base_transfer.py | 6 +++++- nlds_processors/transfer/get_transfer.py | 8 +++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/nlds_processors/transfer/base_transfer.py b/nlds_processors/transfer/base_transfer.py index 9a1d1d1..6849b63 100644 --- a/nlds_processors/transfer/base_transfer.py +++ b/nlds_processors/transfer/base_transfer.py @@ -101,7 +101,11 @@ def _callback_common(self, cm, method, properties, body, connection): # Set uid and gid from message contents self.log("Setting uid and gids now.", RK.LOG_INFO) - self.set_ids(self.body_json) + try: + self.set_ids(self.body_json) + except KeyError as e: + self.log("Problem running set_ids, exiting callback", RK.LOG_ERROR) + return # Append route info to message to track the route of the message self.body_json = self.append_route_info(self.body_json) diff --git a/nlds_processors/transfer/get_transfer.py b/nlds_processors/transfer/get_transfer.py index db69fea..a0af21a 100644 --- a/nlds_processors/transfer/get_transfer.py +++ b/nlds_processors/transfer/get_transfer.py @@ -210,10 +210,12 @@ def _transfer_files( # build the routing keys rk_complete = ".".join([rk_origin, RK.TRANSFER_GET, RK.COMPLETE]) rk_failed = ".".join([rk_origin, RK.TRANSFER_GET, RK.FAILED]) - # set the ids for the + # set the ids for the files if self.chown_fl: - self.set_ids(body_json) - + try: + self.set_ids(body_json) + except KeyError as e: + self.log("Problem running set_ids in _transfer_files", RK.LOG_ERROR) # Create client self.client = minio.Minio( tenancy, From da2caca76fe9ab5c813d7e0365d7ca88a579874f Mon Sep 17 00:00:00 2001 From: Neil Massey Date: Wed, 5 Feb 2025 12:06:30 +0000 Subject: [PATCH 6/8] Better error handling in transfer init --- nlds_processors/index.py | 6 +++++- nlds_processors/transfer/base_transfer.py | 15 +++++++++++---- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/nlds_processors/index.py b/nlds_processors/index.py index 274dc9e..4a534fa 100644 --- a/nlds_processors/index.py +++ b/nlds_processors/index.py @@ -95,7 +95,11 @@ def _scan( body_json: Dict[str, Any], ) -> None: # First change user and group so file permissions can be checked - self.set_ids(body_json) + try: + self.set_ids(body_json) + except KeyError: + self.log("Problem running set_ids, exiting _scan.", RK.LOG_ERROR) + return # Append routing info and then run the index body_json = self.append_route_info(body_json) diff --git a/nlds_processors/transfer/base_transfer.py b/nlds_processors/transfer/base_transfer.py index 6849b63..2272b15 100644 --- a/nlds_processors/transfer/base_transfer.py +++ b/nlds_processors/transfer/base_transfer.py @@ -83,13 +83,13 @@ def _callback_common(self, cm, method, properties, body, connection): self.transaction_id = self.body_json[MSG.DETAILS][MSG.TRANSACT_ID] except KeyError: self.log("Transaction id unobtainable, exiting callback.", RK.LOG_ERROR) - return + return False try: self.filelist = self.parse_filelist(self.body_json) except TypeError as e: self.log("Filelist not parseable, exiting callback", RK.LOG_ERROR) - return + return False try: (self.access_key, self.secret_key, self.tenancy) = ( @@ -97,7 +97,7 @@ def _callback_common(self, cm, method, properties, body, connection): ) except TransferError: self.log("Objectstore config unobtainable, exiting callback.", RK.LOG_ERROR) - return + return False # Set uid and gid from message contents self.log("Setting uid and gids now.", RK.LOG_INFO) @@ -105,7 +105,7 @@ def _callback_common(self, cm, method, properties, body, connection): self.set_ids(self.body_json) except KeyError as e: self.log("Problem running set_ids, exiting callback", RK.LOG_ERROR) - return + return False # Append route info to message to track the route of the message self.body_json = self.append_route_info(self.body_json) @@ -114,6 +114,13 @@ def _callback_common(self, cm, method, properties, body, connection): def callback(self, ch, method, properties, body, connection): if not self._callback_common(ch, method, properties, body, connection): + # fail all files if callback common fails + rk_transfer_failed = ".".join( + [self.rk_parts[0], self.rk_parts[1], RK.FAILED] + ) + self.send_pathlist( + self.filelist, rk_transfer_failed, self.body_json, state=State.FAILED + ) return # API-methods that have an INITIATE phase will split the files across From fee7be119f41af7d7e08823a7b0d9b69db65cf08 Mon Sep 17 00:00:00 2001 From: Neil Massey Date: Wed, 5 Feb 2025 12:31:33 +0000 Subject: [PATCH 7/8] Better error handling in transfer init --- nlds_processors/archive/archive_base.py | 11 ++++++++++- nlds_processors/transfer/base_transfer.py | 3 +++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/nlds_processors/archive/archive_base.py b/nlds_processors/archive/archive_base.py index 895aae7..0691294 100644 --- a/nlds_processors/archive/archive_base.py +++ b/nlds_processors/archive/archive_base.py @@ -103,8 +103,17 @@ def callback(self, ch, method, properties, body, connection): scraping, then runs the appropriate transfer function. """ if not self._callback_common(ch, method, properties, body, connection): + # fail all files if callback common fails + rk_transfer_failed = ".".join( + [self.rk_parts[0], self.rk_parts[1], RK.FAILED] + ) + for file in self.filelist: + file.failure_reason = 'Failed in archive transfer init' + + self.send_pathlist( + self.filelist, rk_transfer_failed, self.body_json, state=State.FAILED + ) return - # get tape_url for those routes that need it if self.rk_parts[2] in [RK.START, RK.PREPARE, RK.PREPARE_CHECK]: try: diff --git a/nlds_processors/transfer/base_transfer.py b/nlds_processors/transfer/base_transfer.py index 2272b15..a4ab410 100644 --- a/nlds_processors/transfer/base_transfer.py +++ b/nlds_processors/transfer/base_transfer.py @@ -118,6 +118,9 @@ def callback(self, ch, method, properties, body, connection): rk_transfer_failed = ".".join( [self.rk_parts[0], self.rk_parts[1], RK.FAILED] ) + for file in self.filelist: + file.failure_reason = 'Failed in transfer init' + self.send_pathlist( self.filelist, rk_transfer_failed, self.body_json, state=State.FAILED ) From 430b0535fe5cfed3424a8ac6442d47cb233a610e Mon Sep 17 00:00:00 2001 From: Neil Massey Date: Mon, 10 Feb 2025 14:19:08 +0000 Subject: [PATCH 8/8] Added user and group back into message when archiving --- nlds_processors/catalog/catalog_worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nlds_processors/catalog/catalog_worker.py b/nlds_processors/catalog/catalog_worker.py index 3aa194b..5148b11 100644 --- a/nlds_processors/catalog/catalog_worker.py +++ b/nlds_processors/catalog/catalog_worker.py @@ -898,8 +898,9 @@ def _catalog_archive_put(self, body: Dict, rk_origin: str) -> None: # Forward successful file details to archive for tape write rk_complete = ".".join([rk_origin, RK.CATALOG_ARCHIVE_NEXT, RK.COMPLETE]) - # the user and group should be set as the NLDS user by the send_archive_next - # cronjob publisher + # the user and group need to be set here + body[MSG.DETAILS][MSG.USER] = next_holding.user + body[MSG.DETAILS][MSG.GROUP] = next_holding.group body[MSG.META][MSG.HOLDING_ID] = next_holding.id if len(self.completelist) > 0: self.log(