Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement mpu file upload #1190

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 161 additions & 41 deletions TM1py/Services/FileService.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
# -*- coding: utf-8 -*-
import concurrent.futures
import json
import time
import warnings
from io import BytesIO
from pathlib import Path
from typing import List, Iterable, Union
from typing import List, Iterable, Union, Tuple

from TM1py.Exceptions import TM1pyVersionException
from TM1py.Services import RestService
from TM1py.Services.ObjectService import ObjectService
from TM1py.Utils import format_url
from TM1py.Utils.Utils import verify_version, require_version
from TM1py.Exceptions import TM1pyVersionException


class FileService(ObjectService):
SUBFOLDER_REQUIRED_VERSION = "12"
MPU_REQUIRED_VERSION = "12"

def __init__(self, tm1_rest: RestService):
"""
Expand All @@ -37,7 +42,7 @@ def get_names(self, **kwargs) -> bytes:
version_content_path=self.version_content_path)

return self._rest.GET(url, **kwargs).content

@require_version(version="11.4")
def get_all_names(self, path: Union[str, Path] = "", **kwargs) -> List[str]:
""" return list of blob file names
Expand All @@ -49,16 +54,16 @@ def get_all_names(self, path: Union[str, Path] = "", **kwargs) -> List[str]:

response = self._rest.GET(url, **kwargs).content
return [file['Name'] for file in json.loads(response)['value']]

@require_version(version="11.4")
def get(self, file_name: str, **kwargs) -> bytes:
""" Get file

:param file_name: file name in root or path to file
"""
path = Path(file_name)
self._check_subfolder_support(path = path, function="FileService.get")
self._check_subfolder_support(path=path, function="FileService.get")

url = self._construct_content_url(
path=path,
exclude_path_end=False,
Expand Down Expand Up @@ -108,23 +113,126 @@ def _construct_content_url(self, path: Path, exclude_path_end: bool = True, exte
**parent_folders)

return url.rstrip("/")

def _check_subfolder_support(self, path: Path, function: str) -> None:
REQUIRED_VERSION = "12"
if len(path.parts) > 1 and not verify_version(required_version=REQUIRED_VERSION, version=self.version):
raise TM1pyVersionException(function=function, required_version=REQUIRED_VERSION , feature='Subfolder')

if not len(path.parts) > 1:
return

if not verify_version(required_version=self.SUBFOLDER_REQUIRED_VERSION, version=self.version):
raise TM1pyVersionException(
function=function,
required_version=self.SUBFOLDER_REQUIRED_VERSION,
feature='Subfolder')

def _check_mpu_support(self, function: str) -> None:
if not verify_version(required_version=self.MPU_REQUIRED_VERSION, version=self.version):
raise TM1pyVersionException(
function=function,
required_version=self.MPU_REQUIRED_VERSION,
feature='MultiProcessUpload')

def _upload_file_content(
self,
path: Path,
file_content: bytes,
multi_part_upload: bool = False,
max_mb_per_part: float = 200,
max_workers: int = 1,
**kwargs):

url = self._construct_content_url(path, exclude_path_end=False, extension="Content")

if multi_part_upload:
return self.upload_file_content_with_mpu(url, file_content, max_mb_per_part, max_workers, **kwargs)

return self._rest.PUT(
url=url,
data=file_content,
headers=self.binary_http_header,
**kwargs)

def upload_file_content_with_mpu(self, content_url: str, file_content: bytes, max_mb_per_part: float,
max_workers: int = 1, **kwargs):
# Initiate multipart upload
response = self._rest.POST(
url=content_url + "/mpu.CreateMultipartUpload",
data="{}",
async_requests_mode=False,
**kwargs)
upload_id = response.json()['UploadID']

# Split the file content into parts
parts_to_upload = self._split_into_parts(
data=file_content,
max_chunk_size=int(max_mb_per_part * 1024 * 1024)
)

part_numbers_and_etags = []

# helper function for uploading each part
def upload_part_with_retry(index: int, data: bytes, retries: int = 3) -> Tuple[int, int, str]:
for attempt in range(retries):
try:
part_response = self._rest.POST(
url=content_url + f"/!uploads('{upload_id}')/Parts",
data=data,
headers={**self.binary_http_header, 'Accept': 'application/json,text/plain'},
async_requests_mode=False,
**kwargs)
return index, part_response.json()["PartNumber"], part_response.json()["@odata.etag"]
except Exception as e:
if attempt < retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
raise e from None

if max_workers > 1:
# upload parts concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:

futures = {
executor.submit(upload_part_with_retry, i, part, 3): i
for i, part
in enumerate(parts_to_upload)}

for future in concurrent.futures.as_completed(futures):
part_index, part_number, odata_etag = future.result()
part_numbers_and_etags.append((part_index, part_number, odata_etag))

else:
# Sequential upload
for i, bytes_part in enumerate(parts_to_upload):
part_index, part_number, odata_etag = upload_part_with_retry(i, bytes_part)
part_numbers_and_etags.append((part_index, part_number, odata_etag))

# Complete the multipart upload
self._rest.POST(
url=content_url + f"/!uploads('{upload_id}')/mpu.Complete",
data=json.dumps(
{"Parts": [
{"PartNumber": part_number, "ETag": etag}
for _, part_number, etag in sorted(part_numbers_and_etags)
]}
)
)

@require_version(version="11.4")
def create(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
def create(self, file_name: Union[str, Path], file_content: bytes, multi_part_upload: bool = False,
max_mb_per_part: float = 200, max_workers: int = 1, **kwargs):
""" Create file

Folders in file_name (e.g. folderA/folderB/file.csv) will be created implicitly

:param file_name: file name in root or path to file
:param file_content: file_content as bytes or BytesIO
:param multi_part_upload: boolean use multipart upload or not (only available from TM1 12 onwards)
:param max_mb_per_part: max megabyte per part in multipart upload (only available from TM1 12 onwards)
:param max_workers: max parallel workers for multipart upload (only available from TM1 12 onwards)
"""
path = Path(file_name)
self._check_subfolder_support(path = path, function = "FileService.create")
self._check_subfolder_support(path=path, function="FileService.create")
if multi_part_upload:
self._check_mpu_support(function="FileService.create")

# Create folder structure iteratively
if path.parents:
Expand All @@ -141,46 +249,42 @@ def create(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
}
self._rest.POST(url, json.dumps(body), **kwargs)

url = self._construct_content_url(path, exclude_path_end=False, extension="Content")
return self._rest.PUT(
url=url,
data=file_content,
headers=self.binary_http_header,
**kwargs)

return self._upload_file_content(path, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)

@require_version(version="11.4")
def update(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
def update(self, file_name: Union[str, Path], file_content: bytes, multi_part_upload: bool = False,
max_mb_per_part: float = 200, max_workers: int = 1, **kwargs):
""" Update existing file

:param file_name: file name in root or path to file
:param file_content: file_content as bytes or BytesIO
:param multi_part_upload: boolean use multipart upload or not (only available from TM1 12 onwards)
:param max_mb_per_part: max megabyte per part in multipart upload (only available from TM1 12 onwards)
:param max_workers: max parallel workers for multipart upload (only available from TM1 12 onwards)
"""
path = Path(file_name)
self._check_subfolder_support(path = path, function = "FileService.update")
self._check_subfolder_support(path=path, function="FileService.update")
if multi_part_upload:
self._check_mpu_support(function="FileService.create")

url = self._construct_content_url(
path=path,
exclude_path_end=False,
extension="Content")
return self._upload_file_content(path, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)

return self._rest.PUT(
url=url,
data=file_content,
headers=self.binary_http_header,
**kwargs)

@require_version(version="11.4")
def update_or_create(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
def update_or_create(self, file_name: Union[str, Path], file_content: bytes, multi_part_upload: bool = False,
max_mb_per_part: float = 200, max_workers: int = 1, **kwargs):
""" Create file or update file if it already exists

:param file_name: file name in root or path to file
:param file_content: file_content as bytes or BytesIO
:param multi_part_upload: boolean use multipart upload or not (only available from TM1 12 onwards)
:param max_mb_per_part: max megabyte per part in multipart upload (only available from TM1 12 onwards)
:param max_workers: max parallel workers for multipart upload (only available from TM1 12 onwards)
"""
if self.exists(file_name, **kwargs):
return self.update(file_name, file_content, **kwargs)
return self.update(file_name, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)

return self.create(file_name, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)

return self.create(file_name, file_content, **kwargs)

@require_version(version="11.4")
def exists(self, file_name: Union[str, Path], **kwargs):
""" Check if file exists
Expand All @@ -193,23 +297,23 @@ def exists(self, file_name: Union[str, Path], **kwargs):
extension="")

return self._exists(url, **kwargs)

@require_version(version="11.4")
def delete(self, file_name: Union[str, Path], **kwargs):
""" Delete file

:param file_name: file name in root or path to file
"""
path = Path(file_name)
self._check_subfolder_support(path = path, function = "FileService.delete")
self._check_subfolder_support(path=path, function="FileService.delete")

url = self._construct_content_url(
path=path,
exclude_path_end=False,
extension="")

return self._rest.DELETE(url, **kwargs)

@require_version(version="11.4")
def search_string_in_name(self, name_startswith: str = None, name_contains: Iterable = None,
name_contains_operator: str = 'and', path: Union[Path, str] = "",
Expand Down Expand Up @@ -242,9 +346,9 @@ def search_string_in_name(self, name_startswith: str = None, name_contains: Iter

else:
raise ValueError("'name_contains' must be str or iterable")

path = Path(path)
self._check_subfolder_support(path = path, function = "FileService.search_string_in_name")
self._check_subfolder_support(path=path, function="FileService.search_string_in_name")

url = self._construct_content_url(
path=Path(path),
Expand All @@ -253,3 +357,19 @@ def search_string_in_name(self, name_startswith: str = None, name_contains: Iter
response = self._rest.GET(url, **kwargs).content

return list(file['Name'] for file in json.loads(response)['value'])

@staticmethod
def _split_into_parts(data: Union[bytes, BytesIO], max_chunk_size: int = 200 * 1024 * 1024):
# Convert data to bytes if it's a BytesIO object
if isinstance(data, BytesIO):
data = data.getvalue()

# List to store chunks
parts = []

# Split data into chunks
for i in range(0, len(data), max_chunk_size):
part = data[i:i + max_chunk_size]
parts.append(part)

return parts
43 changes: 39 additions & 4 deletions Tests/FileService_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,37 @@ def setUpV12(self):
if self.tm1.files.exists(self.FILE_NAME2_IN_FOLDER):
self.tm1.files.delete(self.FILE_NAME2_IN_FOLDER)

@skip_if_version_lower_than(version="11.4")
def test_create_get(self):

def run_create_get(self, mpu, max_mb_per_part=None, max_workers=1):
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
self.tm1.files.update_or_create(self.FILE_NAME1, original_file.read())
self.tm1.files.update_or_create(
self.FILE_NAME1,
original_file.read(),
multi_part_upload=mpu,
max_mb_per_part=max_mb_per_part,
max_workers=max_workers)

created_file = self.tm1.files.get(self.FILE_NAME1)

with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
self.assertEqual(original_file.read(), created_file)

@skip_if_version_lower_than(version="11.4")
def test_create_get(self):
self.run_create_get(mpu=False)

@skip_if_version_lower_than(version="12")
def test_create_get_with_mpu_1_byte_per_part(self):
self.run_create_get(mpu=True, max_mb_per_part=1/(1024*1024))


@skip_if_version_lower_than(version="12")
def test_create_get_with_mpu_1_byte_per_part_10_max_workers(self):
self.run_create_get(mpu=True, max_mb_per_part=1/(1024*1024), max_workers=10)

@skip_if_version_lower_than(version="12")
def test_create_get_with_mpu_200_megabyte_per_part(self):
self.run_create_get(mpu=True, max_mb_per_part=200)

@skip_if_version_lower_than(version="12")
def test_create_get_in_folder(self):
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
Expand All @@ -61,6 +82,20 @@ def test_create_get_in_folder(self):
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
self.assertEqual(original_file.read(), created_file)

@skip_if_version_lower_than(version="12")
def test_create_get_in_folder_with_mpu(self):
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
self.tm1.files.update_or_create(
self.FILE_NAME1_IN_FOLDER,
original_file.read(),
mpu=True,
max_mb_per_part=1/(1024*1024))

created_file = self.tm1.files.get(self.FILE_NAME1_IN_FOLDER)

with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
self.assertEqual(original_file.read(), created_file)

@skip_if_version_lower_than(version="11.4")
@skip_if_version_higher_or_equal_than(version="12")
def test_create_in_folder_exception(self):
Expand Down