From 8dd66a387b3a35251e687387f3f47becc2a64aa6 Mon Sep 17 00:00:00 2001 From: Neil Massey Date: Sun, 20 Oct 2024 17:16:13 +0100 Subject: [PATCH] Get from DiskTape to Object Store complete --- nlds_processors/archiver/archive_get.py | 9 +- .../archiver/s3_to_tarfile_disk.py | 5 +- .../archiver/s3_to_tarfile_stream.py | 103 ++++++++++++++++-- 3 files changed, 99 insertions(+), 18 deletions(-) diff --git a/nlds_processors/archiver/archive_get.py b/nlds_processors/archiver/archive_get.py index b1d801b0..199b67f4 100644 --- a/nlds_processors/archiver/archive_get.py +++ b/nlds_processors/archiver/archive_get.py @@ -109,6 +109,10 @@ def transfer( ) # get the list of files to retrieve from the tarfile / aggregate aggregate_filelist = item[MSG.FILELIST] + # convert to PathDetails object + aggregate_filelist = [ + PathDetails.from_dict(ag) for ag in aggregate_filelist + ] # empty streamer.filelist for new aggregate streamer.filelist.clear() try: @@ -122,11 +126,6 @@ def transfer( self.failedlist.append(path_details) checksum = None - #raise NotImplementedError - for f in self.filelist: - f.failure_reason = "Test" - self.failedlist.append(f) - if len(self.completelist) > 0: # Send whatever remains after all items have been got self.log( diff --git a/nlds_processors/archiver/s3_to_tarfile_disk.py b/nlds_processors/archiver/s3_to_tarfile_disk.py index ee28f195..321aba15 100644 --- a/nlds_processors/archiver/s3_to_tarfile_disk.py +++ b/nlds_processors/archiver/s3_to_tarfile_disk.py @@ -152,7 +152,7 @@ def get( # open the tarfile to read from with open(tarfile, "rb") as file: file_object = Adler32File(file, debug_fl=False) - completelist, failedlist, checksum = self._stream_to_s3object( + completelist, failedlist = self._stream_to_s3object( file_object, self.filelist, chunk_size ) except FileNotFoundError: @@ -164,8 +164,7 @@ def get( self.log(msg, RK.LOG_ERROR) raise S3StreamError(msg) - #raise NotImplementedError - return [], [] + return completelist, failedlist @property def holding_diskpath(self): diff --git a/nlds_processors/archiver/s3_to_tarfile_stream.py b/nlds_processors/archiver/s3_to_tarfile_stream.py index de9ae323..a28cc1ed 100644 --- a/nlds_processors/archiver/s3_to_tarfile_stream.py +++ b/nlds_processors/archiver/s3_to_tarfile_stream.py @@ -12,6 +12,7 @@ from typing import List from urllib3.exceptions import HTTPError import tarfile +from datetime import datetime import minio from minio.error import S3Error @@ -137,6 +138,9 @@ def _check_files_exist(self): def _stream_to_fileobject( self, file_object, filelist: List[PathDetails], chunk_size: int ): + if self.s3_client is None: + raise RuntimeError("self.s3_client is None") + # Stream from the S3 Object Store to a tar file that is created using the # file_object - this is usually an Adler32File with tarfile.open(mode="w", fileobj=file_object, copybufsize=chunk_size) as tar: @@ -164,7 +168,7 @@ def _stream_to_fileobject( tar.addfile(tar_info, fileobj=stream) except (HTTPError, S3Error) as e: - # Catch error, log and then rethrow error to ensure file is deleted + # Catch error, add to failed list reason = ( f"Stream-time exception occurred: " f"{type(e).__name__}: {e}" ) @@ -186,10 +190,28 @@ def _stream_to_fileobject( pass return completelist, failedlist, file_object.checksum + def _make_bucket(self, bucket_name, create: bool = False): + """Check bucket exists and create it if it doesn't""" + try: + if not self.s3_client.bucket_exists(bucket_name): + if not create: + return False + else: + self.s3_client.make_bucket(bucket_name) + except minio.error.S3Error as e: + raise S3StreamError(message=str(e)) + return True + def _stream_to_s3object( self, file_object, filelist: List[PathDetails], chunk_size: int ): - # Stream from the a tar file to the S3 Object Store that is created via the + if self.s3_client is None: + raise RuntimeError("self.s3_client is None") + + # Ensure minimum part_size is met for put_object to function + chunk_size = max(5 * 1024 * 1024, chunk_size) + + # Stream from the a tar file to the S3 Object Store that is created via the # file_object - this is usually an Adler32File with tarfile.open(mode="r", fileobj=file_object, copybufsize=chunk_size) as tar: # local versions of the completelist and failedlist @@ -197,14 +219,75 @@ def _stream_to_s3object( failedlist = [] for path_details in filelist: - pd = PathDetails.from_dict(path_details) self.log( - f"Streaming file {pd.path} from tape archive to object store", + f"Streaming file {path_details.path} from tape archive to object store", RK.LOG_DEBUG, ) - bucket_name = pd.bucket_name - object_name = pd.object_name - # tar_info = tarfile.TarInfo(name=object_name) - # tar_info.size = int(pd.size) - - return [], [], 0 \ No newline at end of file + bucket_name = path_details.bucket_name + object_name = path_details.object_name + # Stream the object directly from the tarfile object to s3 + # create bucket first if it doesn't exist + try: + tarinfo = tar.getmember(path_details.original_path) + except KeyError: + # not found in tar so add to failed list + reason = ( + f"Could not find tar member for path details object " + f"{path_details}" + ) + path_details.failure_reason = reason + failedlist.append(path_details) + continue + + try: + # get or create the bucket + if not self._make_bucket(bucket_name): + raise S3StreamError(f"Cannot make bucket {bucket_name}") + self.log( + f"Starting stream of {tarinfo.name} to object store bucket " + f"{bucket_name}.", + RK.LOG_INFO, + ) + # Extract the file as a file object + f = tar.extractfile(tarinfo) + write_result = self.s3_client.put_object( + bucket_name, + object_name, + f, + -1, + part_size=chunk_size, + ) + self.log( + f"Finished stream of {tarinfo.name} to object store", + RK.LOG_INFO, + ) + except (HTTPError, S3Error) as e: + reason = ( + f"Stream-time exception occurred: ({type(e).__name__}: {e})" + ) + path_details.failure_reason = reason + self.log(reason, RK.LOG_DEBUG) + failedlist.append(path_details) + except S3StreamError as e: + path_details.failure_reason = e.message + failedlist.append(path_details) + except Exception as e: + reason = ( + f"Unexpected exception occurred during stream {e}", + RK.LOG_ERROR, + ) + self.log(reason, RK.LOG_DEBUG) + failedlist.append(path_details) + else: + # success + self.log( + f"Successfully retrieved {path_details.path} from the archive " + "and streamed to object store", + RK.LOG_INFO, + ) + # set access time as now + path_details.get_object_store().access_time = ( + datetime.now().timestamp() + ) + completelist.append(path_details) + return completelist, failedlist