Skip to content

Commit 3c3f69a

Browse files
committed
implement mpu file upload
1 parent fad99fb commit 3c3f69a

File tree

2 files changed

+200
-45
lines changed

2 files changed

+200
-45
lines changed

TM1py/Services/FileService.py

+161-41
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
# -*- coding: utf-8 -*-
2+
import concurrent.futures
23
import json
4+
import time
35
import warnings
6+
from io import BytesIO
47
from pathlib import Path
5-
from typing import List, Iterable, Union
8+
from typing import List, Iterable, Union, Tuple
69

10+
from TM1py.Exceptions import TM1pyVersionException
711
from TM1py.Services import RestService
812
from TM1py.Services.ObjectService import ObjectService
913
from TM1py.Utils import format_url
1014
from TM1py.Utils.Utils import verify_version, require_version
11-
from TM1py.Exceptions import TM1pyVersionException
1215

1316

1417
class FileService(ObjectService):
18+
SUBFOLDER_REQUIRED_VERSION = "12"
19+
MPU_REQUIRED_VERSION = "12"
1520

1621
def __init__(self, tm1_rest: RestService):
1722
"""
@@ -37,7 +42,7 @@ def get_names(self, **kwargs) -> bytes:
3742
version_content_path=self.version_content_path)
3843

3944
return self._rest.GET(url, **kwargs).content
40-
45+
4146
@require_version(version="11.4")
4247
def get_all_names(self, path: Union[str, Path] = "", **kwargs) -> List[str]:
4348
""" return list of blob file names
@@ -49,16 +54,16 @@ def get_all_names(self, path: Union[str, Path] = "", **kwargs) -> List[str]:
4954

5055
response = self._rest.GET(url, **kwargs).content
5156
return [file['Name'] for file in json.loads(response)['value']]
52-
57+
5358
@require_version(version="11.4")
5459
def get(self, file_name: str, **kwargs) -> bytes:
5560
""" Get file
5661
5762
:param file_name: file name in root or path to file
5863
"""
5964
path = Path(file_name)
60-
self._check_subfolder_support(path = path, function="FileService.get")
61-
65+
self._check_subfolder_support(path=path, function="FileService.get")
66+
6267
url = self._construct_content_url(
6368
path=path,
6469
exclude_path_end=False,
@@ -108,23 +113,126 @@ def _construct_content_url(self, path: Path, exclude_path_end: bool = True, exte
108113
**parent_folders)
109114

110115
return url.rstrip("/")
111-
116+
112117
def _check_subfolder_support(self, path: Path, function: str) -> None:
113-
REQUIRED_VERSION = "12"
114-
if len(path.parts) > 1 and not verify_version(required_version=REQUIRED_VERSION, version=self.version):
115-
raise TM1pyVersionException(function=function, required_version=REQUIRED_VERSION , feature='Subfolder')
116-
118+
if not len(path.parts) > 1:
119+
return
120+
121+
if not verify_version(required_version=self.SUBFOLDER_REQUIRED_VERSION, version=self.version):
122+
raise TM1pyVersionException(
123+
function=function,
124+
required_version=self.SUBFOLDER_REQUIRED_VERSION,
125+
feature='Subfolder')
126+
127+
def _check_mpu_support(self, function: str) -> None:
128+
if not verify_version(required_version=self.MPU_REQUIRED_VERSION, version=self.version):
129+
raise TM1pyVersionException(
130+
function=function,
131+
required_version=self.MPU_REQUIRED_VERSION,
132+
feature='MultiProcessUpload')
133+
134+
def _upload_file_content(
135+
self,
136+
path: Path,
137+
file_content: bytes,
138+
multi_part_upload: bool = False,
139+
max_mb_per_part: float = 200,
140+
max_workers: int = 1,
141+
**kwargs):
142+
143+
url = self._construct_content_url(path, exclude_path_end=False, extension="Content")
144+
145+
if multi_part_upload:
146+
return self.upload_file_content_with_mpu(url, file_content, max_mb_per_part, max_workers, **kwargs)
147+
148+
return self._rest.PUT(
149+
url=url,
150+
data=file_content,
151+
headers=self.binary_http_header,
152+
**kwargs)
153+
154+
def upload_file_content_with_mpu(self, content_url: str, file_content: bytes, max_mb_per_part: float,
155+
max_workers: int = 1, **kwargs):
156+
# Initiate multipart upload
157+
response = self._rest.POST(
158+
url=content_url + "/mpu.CreateMultipartUpload",
159+
data="{}",
160+
async_requests_mode=False,
161+
**kwargs)
162+
upload_id = response.json()['UploadID']
163+
164+
# Split the file content into parts
165+
parts_to_upload = self._split_into_parts(
166+
data=file_content,
167+
max_chunk_size=int(max_mb_per_part * 1024 * 1024)
168+
)
169+
170+
part_numbers_and_etags = []
171+
172+
# helper function for uploading each part
173+
def upload_part_with_retry(index: int, data: bytes, retries: int = 3) -> Tuple[int, int, str]:
174+
for attempt in range(retries):
175+
try:
176+
part_response = self._rest.POST(
177+
url=content_url + f"/!uploads('{upload_id}')/Parts",
178+
data=data,
179+
headers={**self.binary_http_header, 'Accept': 'application/json,text/plain'},
180+
async_requests_mode=False,
181+
**kwargs)
182+
return index, part_response.json()["PartNumber"], part_response.json()["@odata.etag"]
183+
except Exception as e:
184+
if attempt < retries - 1:
185+
time.sleep(2 ** attempt) # Exponential backoff
186+
else:
187+
raise e from None
188+
189+
if max_workers > 1:
190+
# upload parts concurrently
191+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
192+
193+
futures = {
194+
executor.submit(upload_part_with_retry, i, part, 3): i
195+
for i, part
196+
in enumerate(parts_to_upload)}
197+
198+
for future in concurrent.futures.as_completed(futures):
199+
part_index, part_number, odata_etag = future.result()
200+
part_numbers_and_etags.append((part_index, part_number, odata_etag))
201+
202+
else:
203+
# Sequential upload
204+
for i, bytes_part in enumerate(parts_to_upload):
205+
part_index, part_number, odata_etag = upload_part_with_retry(i, bytes_part)
206+
part_numbers_and_etags.append((part_index, part_number, odata_etag))
207+
208+
# Complete the multipart upload
209+
self._rest.POST(
210+
url=content_url + f"/!uploads('{upload_id}')/mpu.Complete",
211+
data=json.dumps(
212+
{"Parts": [
213+
{"PartNumber": part_number, "ETag": etag}
214+
for _, part_number, etag in sorted(part_numbers_and_etags)
215+
]}
216+
)
217+
)
218+
117219
@require_version(version="11.4")
118-
def create(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
220+
def create(self, file_name: Union[str, Path], file_content: bytes, multi_part_upload: bool = False,
221+
max_mb_per_part: float = 200, max_workers: int = 1, **kwargs):
119222
""" Create file
120223
121224
Folders in file_name (e.g. folderA/folderB/file.csv) will be created implicitly
122225
123226
:param file_name: file name in root or path to file
124227
:param file_content: file_content as bytes or BytesIO
228+
:param multi_part_upload: boolean use multipart upload or not (only available from TM1 12 onwards)
229+
:param max_mb_per_part: max megabyte per part in multipart upload (only available from TM1 12 onwards)
230+
:param max_workers: max parallel workers for multipart upload (only available from TM1 12 onwards)
125231
"""
126232
path = Path(file_name)
127-
self._check_subfolder_support(path = path, function = "FileService.create")
233+
self._check_subfolder_support(path=path, function="FileService.create")
234+
if multi_part_upload:
235+
self._check_mpu_support(function="FileService.create")
128236

129237
# Create folder structure iteratively
130238
if path.parents:
@@ -141,46 +249,42 @@ def create(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
141249
}
142250
self._rest.POST(url, json.dumps(body), **kwargs)
143251

144-
url = self._construct_content_url(path, exclude_path_end=False, extension="Content")
145-
return self._rest.PUT(
146-
url=url,
147-
data=file_content,
148-
headers=self.binary_http_header,
149-
**kwargs)
150-
252+
return self._upload_file_content(path, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)
253+
151254
@require_version(version="11.4")
152-
def update(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
255+
def update(self, file_name: Union[str, Path], file_content: bytes, multi_part_upload: bool = False,
256+
max_mb_per_part: float = 200, max_workers: int = 1, **kwargs):
153257
""" Update existing file
154258
155259
:param file_name: file name in root or path to file
156260
:param file_content: file_content as bytes or BytesIO
261+
:param multi_part_upload: boolean use multipart upload or not (only available from TM1 12 onwards)
262+
:param max_mb_per_part: max megabyte per part in multipart upload (only available from TM1 12 onwards)
263+
:param max_workers: max parallel workers for multipart upload (only available from TM1 12 onwards)
157264
"""
158265
path = Path(file_name)
159-
self._check_subfolder_support(path = path, function = "FileService.update")
266+
self._check_subfolder_support(path=path, function="FileService.update")
267+
if multi_part_upload:
268+
self._check_mpu_support(function="FileService.create")
160269

161-
url = self._construct_content_url(
162-
path=path,
163-
exclude_path_end=False,
164-
extension="Content")
270+
return self._upload_file_content(path, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)
165271

166-
return self._rest.PUT(
167-
url=url,
168-
data=file_content,
169-
headers=self.binary_http_header,
170-
**kwargs)
171-
172272
@require_version(version="11.4")
173-
def update_or_create(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
273+
def update_or_create(self, file_name: Union[str, Path], file_content: bytes, multi_part_upload: bool = False,
274+
max_mb_per_part: float = 200, max_workers: int = 1, **kwargs):
174275
""" Create file or update file if it already exists
175276
176277
:param file_name: file name in root or path to file
177278
:param file_content: file_content as bytes or BytesIO
279+
:param multi_part_upload: boolean use multipart upload or not (only available from TM1 12 onwards)
280+
:param max_mb_per_part: max megabyte per part in multipart upload (only available from TM1 12 onwards)
281+
:param max_workers: max parallel workers for multipart upload (only available from TM1 12 onwards)
178282
"""
179283
if self.exists(file_name, **kwargs):
180-
return self.update(file_name, file_content, **kwargs)
284+
return self.update(file_name, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)
285+
286+
return self.create(file_name, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)
181287

182-
return self.create(file_name, file_content, **kwargs)
183-
184288
@require_version(version="11.4")
185289
def exists(self, file_name: Union[str, Path], **kwargs):
186290
""" Check if file exists
@@ -193,23 +297,23 @@ def exists(self, file_name: Union[str, Path], **kwargs):
193297
extension="")
194298

195299
return self._exists(url, **kwargs)
196-
300+
197301
@require_version(version="11.4")
198302
def delete(self, file_name: Union[str, Path], **kwargs):
199303
""" Delete file
200304
201305
:param file_name: file name in root or path to file
202306
"""
203307
path = Path(file_name)
204-
self._check_subfolder_support(path = path, function = "FileService.delete")
308+
self._check_subfolder_support(path=path, function="FileService.delete")
205309

206310
url = self._construct_content_url(
207311
path=path,
208312
exclude_path_end=False,
209313
extension="")
210314

211315
return self._rest.DELETE(url, **kwargs)
212-
316+
213317
@require_version(version="11.4")
214318
def search_string_in_name(self, name_startswith: str = None, name_contains: Iterable = None,
215319
name_contains_operator: str = 'and', path: Union[Path, str] = "",
@@ -242,9 +346,9 @@ def search_string_in_name(self, name_startswith: str = None, name_contains: Iter
242346

243347
else:
244348
raise ValueError("'name_contains' must be str or iterable")
245-
349+
246350
path = Path(path)
247-
self._check_subfolder_support(path = path, function = "FileService.search_string_in_name")
351+
self._check_subfolder_support(path=path, function="FileService.search_string_in_name")
248352

249353
url = self._construct_content_url(
250354
path=Path(path),
@@ -253,3 +357,19 @@ def search_string_in_name(self, name_startswith: str = None, name_contains: Iter
253357
response = self._rest.GET(url, **kwargs).content
254358

255359
return list(file['Name'] for file in json.loads(response)['value'])
360+
361+
@staticmethod
362+
def _split_into_parts(data: Union[bytes, BytesIO], max_chunk_size: int = 200 * 1024 * 1024):
363+
# Convert data to bytes if it's a BytesIO object
364+
if isinstance(data, BytesIO):
365+
data = data.getvalue()
366+
367+
# List to store chunks
368+
parts = []
369+
370+
# Split data into chunks
371+
for i in range(0, len(data), max_chunk_size):
372+
part = data[i:i + max_chunk_size]
373+
parts.append(part)
374+
375+
return parts

Tests/FileService_test.py

+39-4
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,37 @@ def setUpV12(self):
4141
if self.tm1.files.exists(self.FILE_NAME2_IN_FOLDER):
4242
self.tm1.files.delete(self.FILE_NAME2_IN_FOLDER)
4343

44-
@skip_if_version_lower_than(version="11.4")
45-
def test_create_get(self):
44+
45+
def run_create_get(self, mpu, max_mb_per_part=None, max_workers=1):
4646
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
47-
self.tm1.files.update_or_create(self.FILE_NAME1, original_file.read())
47+
self.tm1.files.update_or_create(
48+
self.FILE_NAME1,
49+
original_file.read(),
50+
multi_part_upload=mpu,
51+
max_mb_per_part=max_mb_per_part,
52+
max_workers=max_workers)
4853

4954
created_file = self.tm1.files.get(self.FILE_NAME1)
50-
5155
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
5256
self.assertEqual(original_file.read(), created_file)
5357

58+
@skip_if_version_lower_than(version="11.4")
59+
def test_create_get(self):
60+
self.run_create_get(mpu=False)
61+
62+
@skip_if_version_lower_than(version="12")
63+
def test_create_get_with_mpu_1_byte_per_part(self):
64+
self.run_create_get(mpu=True, max_mb_per_part=1/(1024*1024))
65+
66+
67+
@skip_if_version_lower_than(version="12")
68+
def test_create_get_with_mpu_1_byte_per_part_10_max_workers(self):
69+
self.run_create_get(mpu=True, max_mb_per_part=1/(1024*1024), max_workers=10)
70+
71+
@skip_if_version_lower_than(version="12")
72+
def test_create_get_with_mpu_200_megabyte_per_part(self):
73+
self.run_create_get(mpu=True, max_mb_per_part=200)
74+
5475
@skip_if_version_lower_than(version="12")
5576
def test_create_get_in_folder(self):
5677
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
@@ -61,6 +82,20 @@ def test_create_get_in_folder(self):
6182
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
6283
self.assertEqual(original_file.read(), created_file)
6384

85+
@skip_if_version_lower_than(version="12")
86+
def test_create_get_in_folder_with_mpu(self):
87+
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
88+
self.tm1.files.update_or_create(
89+
self.FILE_NAME1_IN_FOLDER,
90+
original_file.read(),
91+
mpu=True,
92+
max_mb_per_part=1/(1024*1024))
93+
94+
created_file = self.tm1.files.get(self.FILE_NAME1_IN_FOLDER)
95+
96+
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
97+
self.assertEqual(original_file.read(), created_file)
98+
6499
@skip_if_version_lower_than(version="11.4")
65100
@skip_if_version_higher_or_equal_than(version="12")
66101
def test_create_in_folder_exception(self):

0 commit comments

Comments
 (0)