Skip to content

Commit

Permalink
Get from DiskTape to Object Store complete
Browse files Browse the repository at this point in the history
  • Loading branch information
nmassey001 committed Oct 20, 2024
1 parent 1751b2e commit 8dd66a3
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 18 deletions.
9 changes: 4 additions & 5 deletions nlds_processors/archiver/archive_get.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ def transfer(
)
# get the list of files to retrieve from the tarfile / aggregate
aggregate_filelist = item[MSG.FILELIST]
# convert to PathDetails object
aggregate_filelist = [
PathDetails.from_dict(ag) for ag in aggregate_filelist
]
# empty streamer.filelist for new aggregate
streamer.filelist.clear()
try:
Expand All @@ -122,11 +126,6 @@ def transfer(
self.failedlist.append(path_details)
checksum = None

#raise NotImplementedError
for f in self.filelist:
f.failure_reason = "Test"
self.failedlist.append(f)

if len(self.completelist) > 0:
# Send whatever remains after all items have been got
self.log(
Expand Down
5 changes: 2 additions & 3 deletions nlds_processors/archiver/s3_to_tarfile_disk.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def get(
# open the tarfile to read from
with open(tarfile, "rb") as file:
file_object = Adler32File(file, debug_fl=False)
completelist, failedlist, checksum = self._stream_to_s3object(
completelist, failedlist = self._stream_to_s3object(
file_object, self.filelist, chunk_size
)
except FileNotFoundError:
Expand All @@ -164,8 +164,7 @@ def get(
self.log(msg, RK.LOG_ERROR)
raise S3StreamError(msg)

#raise NotImplementedError
return [], []
return completelist, failedlist

@property
def holding_diskpath(self):
Expand Down
103 changes: 93 additions & 10 deletions nlds_processors/archiver/s3_to_tarfile_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import List
from urllib3.exceptions import HTTPError
import tarfile
from datetime import datetime

import minio
from minio.error import S3Error
Expand Down Expand Up @@ -137,6 +138,9 @@ def _check_files_exist(self):
def _stream_to_fileobject(
self, file_object, filelist: List[PathDetails], chunk_size: int
):
if self.s3_client is None:
raise RuntimeError("self.s3_client is None")

# Stream from the S3 Object Store to a tar file that is created using the
# file_object - this is usually an Adler32File
with tarfile.open(mode="w", fileobj=file_object, copybufsize=chunk_size) as tar:
Expand Down Expand Up @@ -164,7 +168,7 @@ def _stream_to_fileobject(
tar.addfile(tar_info, fileobj=stream)

except (HTTPError, S3Error) as e:
# Catch error, log and then rethrow error to ensure file is deleted
# Catch error, add to failed list
reason = (
f"Stream-time exception occurred: " f"{type(e).__name__}: {e}"
)
Expand All @@ -186,25 +190,104 @@ def _stream_to_fileobject(
pass
return completelist, failedlist, file_object.checksum

def _make_bucket(self, bucket_name, create: bool = False):
"""Check bucket exists and create it if it doesn't"""
try:
if not self.s3_client.bucket_exists(bucket_name):
if not create:
return False
else:
self.s3_client.make_bucket(bucket_name)
except minio.error.S3Error as e:
raise S3StreamError(message=str(e))
return True

def _stream_to_s3object(
self, file_object, filelist: List[PathDetails], chunk_size: int
):
# Stream from the a tar file to the S3 Object Store that is created via the
if self.s3_client is None:
raise RuntimeError("self.s3_client is None")

# Ensure minimum part_size is met for put_object to function
chunk_size = max(5 * 1024 * 1024, chunk_size)

# Stream from the a tar file to the S3 Object Store that is created via the
# file_object - this is usually an Adler32File
with tarfile.open(mode="r", fileobj=file_object, copybufsize=chunk_size) as tar:
# local versions of the completelist and failedlist
completelist = []
failedlist = []

for path_details in filelist:
pd = PathDetails.from_dict(path_details)
self.log(
f"Streaming file {pd.path} from tape archive to object store",
f"Streaming file {path_details.path} from tape archive to object store",
RK.LOG_DEBUG,
)
bucket_name = pd.bucket_name
object_name = pd.object_name
# tar_info = tarfile.TarInfo(name=object_name)
# tar_info.size = int(pd.size)

return [], [], 0
bucket_name = path_details.bucket_name
object_name = path_details.object_name
# Stream the object directly from the tarfile object to s3
# create bucket first if it doesn't exist
try:
tarinfo = tar.getmember(path_details.original_path)
except KeyError:
# not found in tar so add to failed list
reason = (
f"Could not find tar member for path details object "
f"{path_details}"
)
path_details.failure_reason = reason
failedlist.append(path_details)
continue

try:
# get or create the bucket
if not self._make_bucket(bucket_name):
raise S3StreamError(f"Cannot make bucket {bucket_name}")
self.log(
f"Starting stream of {tarinfo.name} to object store bucket "
f"{bucket_name}.",
RK.LOG_INFO,
)
# Extract the file as a file object
f = tar.extractfile(tarinfo)
write_result = self.s3_client.put_object(
bucket_name,
object_name,
f,
-1,
part_size=chunk_size,
)
self.log(
f"Finished stream of {tarinfo.name} to object store",
RK.LOG_INFO,
)
except (HTTPError, S3Error) as e:
reason = (
f"Stream-time exception occurred: ({type(e).__name__}: {e})"
)
path_details.failure_reason = reason
self.log(reason, RK.LOG_DEBUG)
failedlist.append(path_details)
except S3StreamError as e:
path_details.failure_reason = e.message
failedlist.append(path_details)
except Exception as e:
reason = (
f"Unexpected exception occurred during stream {e}",
RK.LOG_ERROR,
)
self.log(reason, RK.LOG_DEBUG)
failedlist.append(path_details)
else:
# success
self.log(
f"Successfully retrieved {path_details.path} from the archive "
"and streamed to object store",
RK.LOG_INFO,
)
# set access time as now
path_details.get_object_store().access_time = (
datetime.now().timestamp()
)
completelist.append(path_details)
return completelist, failedlist

0 comments on commit 8dd66a3

Please sign in to comment.