Skip to content

Commit 50bbf58

Browse files
committed
implement mpu file upload
1 parent fad99fb commit 50bbf58

File tree

2 files changed

+176
-41
lines changed

2 files changed

+176
-41
lines changed

TM1py/Services/FileService.py

+137-37
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
# -*- coding: utf-8 -*-
2+
import concurrent.futures
23
import json
4+
import time
35
import warnings
6+
from io import BytesIO
47
from pathlib import Path
5-
from typing import List, Iterable, Union
8+
from typing import List, Iterable, Union, Tuple
69

10+
from TM1py.Exceptions import TM1pyVersionException
711
from TM1py.Services import RestService
812
from TM1py.Services.ObjectService import ObjectService
913
from TM1py.Utils import format_url
1014
from TM1py.Utils.Utils import verify_version, require_version
11-
from TM1py.Exceptions import TM1pyVersionException
1215

1316

1417
class FileService(ObjectService):
@@ -37,7 +40,7 @@ def get_names(self, **kwargs) -> bytes:
3740
version_content_path=self.version_content_path)
3841

3942
return self._rest.GET(url, **kwargs).content
40-
43+
4144
@require_version(version="11.4")
4245
def get_all_names(self, path: Union[str, Path] = "", **kwargs) -> List[str]:
4346
""" return list of blob file names
@@ -49,16 +52,16 @@ def get_all_names(self, path: Union[str, Path] = "", **kwargs) -> List[str]:
4952

5053
response = self._rest.GET(url, **kwargs).content
5154
return [file['Name'] for file in json.loads(response)['value']]
52-
55+
5356
@require_version(version="11.4")
5457
def get(self, file_name: str, **kwargs) -> bytes:
5558
""" Get file
5659
5760
:param file_name: file name in root or path to file
5861
"""
5962
path = Path(file_name)
60-
self._check_subfolder_support(path = path, function="FileService.get")
61-
63+
self._check_subfolder_support(path=path, function="FileService.get")
64+
6265
url = self._construct_content_url(
6366
path=path,
6467
exclude_path_end=False,
@@ -108,23 +111,27 @@ def _construct_content_url(self, path: Path, exclude_path_end: bool = True, exte
108111
**parent_folders)
109112

110113
return url.rstrip("/")
111-
114+
112115
def _check_subfolder_support(self, path: Path, function: str) -> None:
113116
REQUIRED_VERSION = "12"
114117
if len(path.parts) > 1 and not verify_version(required_version=REQUIRED_VERSION, version=self.version):
115-
raise TM1pyVersionException(function=function, required_version=REQUIRED_VERSION , feature='Subfolder')
116-
118+
raise TM1pyVersionException(function=function, required_version=REQUIRED_VERSION, feature='Subfolder')
119+
117120
@require_version(version="11.4")
118-
def create(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
121+
def create(self, file_name: Union[str, Path], file_content: bytes, multi_part_upload: bool = False,
122+
max_mb_per_part: float = 200, max_workers: int = 1, **kwargs):
119123
""" Create file
120124
121125
Folders in file_name (e.g. folderA/folderB/file.csv) will be created implicitly
122126
123127
:param file_name: file name in root or path to file
124128
:param file_content: file_content as bytes or BytesIO
129+
:param multi_part_upload: boolean use multipart upload or not (only available from TM1 12 onwards)
130+
:param max_mb_per_part: max megabyte per part in multipart upload (only available from TM1 12 onwards)
131+
:param max_workers: max parallel workers for multipart upload (only available from TM1 12 onwards)
125132
"""
126133
path = Path(file_name)
127-
self._check_subfolder_support(path = path, function = "FileService.create")
134+
self._check_subfolder_support(path=path, function="FileService.create")
128135

129136
# Create folder structure iteratively
130137
if path.parents:
@@ -141,46 +148,139 @@ def create(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
141148
}
142149
self._rest.POST(url, json.dumps(body), **kwargs)
143150

151+
return self._upload_file_content(path, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)
152+
153+
def _upload_file_content(
154+
self,
155+
path: Path,
156+
file_content: bytes,
157+
multi_part_upload: bool = False,
158+
max_mb_per_part: float = 200,
159+
max_workers: int = 1,
160+
**kwargs):
161+
144162
url = self._construct_content_url(path, exclude_path_end=False, extension="Content")
145-
return self._rest.PUT(
146-
url=url,
147-
data=file_content,
148-
headers=self.binary_http_header,
163+
164+
if not multi_part_upload:
165+
return self._rest.PUT(
166+
url=url,
167+
data=file_content,
168+
headers=self.binary_http_header,
169+
**kwargs)
170+
171+
return self.upload_file_content_with_mpu(url, file_content, max_mb_per_part, max_workers, **kwargs)
172+
173+
def upload_file_content_with_mpu(self, content_url: str, file_content: bytes, max_mb_per_part: float,
174+
max_workers: int = 1, **kwargs):
175+
# Initiate multipart upload
176+
response = self._rest.POST(
177+
url=content_url + "/mpu.CreateMultipartUpload",
178+
data="{}",
179+
async_requests_mode=False,
149180
**kwargs)
150-
181+
upload_id = response.json()['UploadID']
182+
183+
# Split the file content into parts
184+
parts_to_upload = self._split_into_parts(
185+
data=file_content,
186+
max_chunk_size=int(max_mb_per_part * 1024 * 1024)
187+
)
188+
189+
part_numbers_and_etags = []
190+
191+
# helper function for uploading each part
192+
def upload_part_with_retry(part_index: int, bytes_part: bytes, retries: int = 3) -> Tuple[int, int, str]:
193+
for attempt in range(retries):
194+
try:
195+
response = self._rest.POST(
196+
url=content_url + f"/!uploads('{upload_id}')/Parts",
197+
data=bytes_part,
198+
headers={**self.binary_http_header, 'Accept': 'application/json,text/plain'},
199+
async_requests_mode=False,
200+
**kwargs)
201+
return part_index, response.json()["PartNumber"], response.json()["@odata.etag"]
202+
except Exception as e:
203+
if attempt < retries - 1:
204+
time.sleep(2 ** attempt) # Exponential backoff
205+
else:
206+
raise e from None # Raise the exception if all retries fail
207+
if max_workers > 1:
208+
# upload parts concurrently
209+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
210+
211+
futures = {
212+
executor.submit(upload_part_with_retry, i, part, 3): i
213+
for i, part
214+
in enumerate(parts_to_upload)}
215+
216+
for future in concurrent.futures.as_completed(futures):
217+
part_index, part_number, etag = future.result()
218+
part_numbers_and_etags.append((part_index, part_number, etag))
219+
220+
else:
221+
# Sequential upload
222+
for i, bytes_part in enumerate(parts_to_upload):
223+
part_index, part_number, etag = upload_part(i, bytes_part)
224+
part_numbers_and_etags.append((part_index, part_number, etag))
225+
226+
# Complete the multipart upload
227+
self._rest.POST(
228+
url=content_url + f"/!uploads('{upload_id}')/mpu.Complete",
229+
data=json.dumps(
230+
{"Parts": [
231+
{"PartNumber": part_number, "ETag": etag}
232+
for _, part_number, etag in sorted(part_numbers_and_etags)
233+
]}
234+
)
235+
)
236+
237+
def _split_into_parts(self, data: Union[bytes, BytesIO], max_chunk_size: int = 200 * 1024 * 1024):
238+
# Convert data to bytes if it's a BytesIO object
239+
if isinstance(data, BytesIO):
240+
data = data.getvalue()
241+
242+
# List to store chunks
243+
parts = []
244+
245+
# Split data into chunks
246+
for i in range(0, len(data), max_chunk_size):
247+
part = data[i:i + max_chunk_size]
248+
parts.append(part)
249+
250+
return parts
251+
151252
@require_version(version="11.4")
152-
def update(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
253+
def update(self, file_name: Union[str, Path], file_content: bytes, multi_part_upload: bool = False,
254+
max_mb_per_part: float = 200, max_workers: int = 1, **kwargs):
153255
""" Update existing file
154256
155257
:param file_name: file name in root or path to file
156258
:param file_content: file_content as bytes or BytesIO
259+
:param multi_part_upload: boolean use multipart upload or not (only available from TM1 12 onwards)
260+
:param max_mb_per_part: max megabyte per part in multipart upload (only available from TM1 12 onwards)
261+
:param max_workers: max parallel workers for multipart upload (only available from TM1 12 onwards)
157262
"""
158263
path = Path(file_name)
159-
self._check_subfolder_support(path = path, function = "FileService.update")
264+
self._check_subfolder_support(path=path, function="FileService.update")
160265

161-
url = self._construct_content_url(
162-
path=path,
163-
exclude_path_end=False,
164-
extension="Content")
266+
return self._upload_file_content(path, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)
165267

166-
return self._rest.PUT(
167-
url=url,
168-
data=file_content,
169-
headers=self.binary_http_header,
170-
**kwargs)
171-
172268
@require_version(version="11.4")
173-
def update_or_create(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
269+
def update_or_create(self, file_name: Union[str, Path], file_content: bytes, multi_part_upload: bool = False,
270+
max_mb_per_part: float = 200, max_workers: int = 1, **kwargs):
174271
""" Create file or update file if it already exists
175272
176273
:param file_name: file name in root or path to file
177274
:param file_content: file_content as bytes or BytesIO
275+
:param multi_part_upload: boolean use multipart upload or not (only available from TM1 12 onwards)
276+
:param max_mb_per_part: max megabyte per part in multipart upload (only available from TM1 12 onwards)
277+
:param max_workers: max parallel workers for multipart upload (only available from TM1 12 onwards)
178278
"""
179279
if self.exists(file_name, **kwargs):
180-
return self.update(file_name, file_content, **kwargs)
280+
return self.update(file_name, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)
281+
282+
return self.create(file_name, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)
181283

182-
return self.create(file_name, file_content, **kwargs)
183-
184284
@require_version(version="11.4")
185285
def exists(self, file_name: Union[str, Path], **kwargs):
186286
""" Check if file exists
@@ -193,23 +293,23 @@ def exists(self, file_name: Union[str, Path], **kwargs):
193293
extension="")
194294

195295
return self._exists(url, **kwargs)
196-
296+
197297
@require_version(version="11.4")
198298
def delete(self, file_name: Union[str, Path], **kwargs):
199299
""" Delete file
200300
201301
:param file_name: file name in root or path to file
202302
"""
203303
path = Path(file_name)
204-
self._check_subfolder_support(path = path, function = "FileService.delete")
304+
self._check_subfolder_support(path=path, function="FileService.delete")
205305

206306
url = self._construct_content_url(
207307
path=path,
208308
exclude_path_end=False,
209309
extension="")
210310

211311
return self._rest.DELETE(url, **kwargs)
212-
312+
213313
@require_version(version="11.4")
214314
def search_string_in_name(self, name_startswith: str = None, name_contains: Iterable = None,
215315
name_contains_operator: str = 'and', path: Union[Path, str] = "",
@@ -242,9 +342,9 @@ def search_string_in_name(self, name_startswith: str = None, name_contains: Iter
242342

243343
else:
244344
raise ValueError("'name_contains' must be str or iterable")
245-
345+
246346
path = Path(path)
247-
self._check_subfolder_support(path = path, function = "FileService.search_string_in_name")
347+
self._check_subfolder_support(path=path, function="FileService.search_string_in_name")
248348

249349
url = self._construct_content_url(
250350
path=Path(path),

Tests/FileService_test.py

+39-4
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,37 @@ def setUpV12(self):
4141
if self.tm1.files.exists(self.FILE_NAME2_IN_FOLDER):
4242
self.tm1.files.delete(self.FILE_NAME2_IN_FOLDER)
4343

44-
@skip_if_version_lower_than(version="11.4")
45-
def test_create_get(self):
44+
45+
def run_create_get(self, mpu, max_mb_per_part=None, max_workers=1):
4646
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
47-
self.tm1.files.update_or_create(self.FILE_NAME1, original_file.read())
47+
self.tm1.files.update_or_create(
48+
self.FILE_NAME1,
49+
original_file.read(),
50+
multi_part_upload=mpu,
51+
max_mb_per_part=max_mb_per_part,
52+
max_workers=max_workers)
4853

4954
created_file = self.tm1.files.get(self.FILE_NAME1)
50-
5155
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
5256
self.assertEqual(original_file.read(), created_file)
5357

58+
@skip_if_version_lower_than(version="11.4")
59+
def test_create_get(self):
60+
self.run_create_get(mpu=False)
61+
62+
@skip_if_version_lower_than(version="12")
63+
def test_create_get_with_mpu_1_byte_per_part(self):
64+
self.run_create_get(mpu=True, max_mb_per_part=1/(1024*1024))
65+
66+
67+
@skip_if_version_lower_than(version="12")
68+
def test_create_get_with_mpu_1_byte_per_part_10_max_workers(self):
69+
self.run_create_get(mpu=True, max_mb_per_part=1/(1024*1024), max_workers=10)
70+
71+
@skip_if_version_lower_than(version="12")
72+
def test_create_get_with_mpu_200_megabyte_per_part(self):
73+
self.run_create_get(mpu=True, max_mb_per_part=200)
74+
5475
@skip_if_version_lower_than(version="12")
5576
def test_create_get_in_folder(self):
5677
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
@@ -61,6 +82,20 @@ def test_create_get_in_folder(self):
6182
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
6283
self.assertEqual(original_file.read(), created_file)
6384

85+
@skip_if_version_lower_than(version="12")
86+
def test_create_get_in_folder_with_mpu(self):
87+
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
88+
self.tm1.files.update_or_create(
89+
self.FILE_NAME1_IN_FOLDER,
90+
original_file.read(),
91+
mpu=True,
92+
max_mb_per_part=1/(1024*1024))
93+
94+
created_file = self.tm1.files.get(self.FILE_NAME1_IN_FOLDER)
95+
96+
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
97+
self.assertEqual(original_file.read(), created_file)
98+
6499
@skip_if_version_lower_than(version="11.4")
65100
@skip_if_version_higher_or_equal_than(version="12")
66101
def test_create_in_folder_exception(self):

0 commit comments

Comments
 (0)