Skip to content

Commit 9f135d1

Browse files
committed
implement mpu file upload
1 parent fad99fb commit 9f135d1

File tree

2 files changed

+168
-40
lines changed

2 files changed

+168
-40
lines changed

TM1py/Services/FileService.py

+129-36
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
# -*- coding: utf-8 -*-
2+
import concurrent.futures
23
import json
34
import warnings
5+
from io import BytesIO
46
from pathlib import Path
57
from typing import List, Iterable, Union
68

9+
from TM1py.Exceptions import TM1pyVersionException
710
from TM1py.Services import RestService
811
from TM1py.Services.ObjectService import ObjectService
912
from TM1py.Utils import format_url
1013
from TM1py.Utils.Utils import verify_version, require_version
11-
from TM1py.Exceptions import TM1pyVersionException
1214

1315

1416
class FileService(ObjectService):
@@ -37,7 +39,7 @@ def get_names(self, **kwargs) -> bytes:
3739
version_content_path=self.version_content_path)
3840

3941
return self._rest.GET(url, **kwargs).content
40-
42+
4143
@require_version(version="11.4")
4244
def get_all_names(self, path: Union[str, Path] = "", **kwargs) -> List[str]:
4345
""" return list of blob file names
@@ -49,16 +51,16 @@ def get_all_names(self, path: Union[str, Path] = "", **kwargs) -> List[str]:
4951

5052
response = self._rest.GET(url, **kwargs).content
5153
return [file['Name'] for file in json.loads(response)['value']]
52-
54+
5355
@require_version(version="11.4")
5456
def get(self, file_name: str, **kwargs) -> bytes:
5557
""" Get file
5658
5759
:param file_name: file name in root or path to file
5860
"""
5961
path = Path(file_name)
60-
self._check_subfolder_support(path = path, function="FileService.get")
61-
62+
self._check_subfolder_support(path=path, function="FileService.get")
63+
6264
url = self._construct_content_url(
6365
path=path,
6466
exclude_path_end=False,
@@ -108,23 +110,27 @@ def _construct_content_url(self, path: Path, exclude_path_end: bool = True, exte
108110
**parent_folders)
109111

110112
return url.rstrip("/")
111-
113+
112114
def _check_subfolder_support(self, path: Path, function: str) -> None:
113115
REQUIRED_VERSION = "12"
114116
if len(path.parts) > 1 and not verify_version(required_version=REQUIRED_VERSION, version=self.version):
115-
raise TM1pyVersionException(function=function, required_version=REQUIRED_VERSION , feature='Subfolder')
116-
117+
raise TM1pyVersionException(function=function, required_version=REQUIRED_VERSION, feature='Subfolder')
118+
117119
@require_version(version="11.4")
118-
def create(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
120+
def create(self, file_name: Union[str, Path], file_content: bytes, multi_part_upload: bool = False,
121+
max_mb_per_part: float = 200, max_workers: int = 1, **kwargs):
119122
""" Create file
120123
121124
Folders in file_name (e.g. folderA/folderB/file.csv) will be created implicitly
122125
123126
:param file_name: file name in root or path to file
124127
:param file_content: file_content as bytes or BytesIO
128+
:param multi_part_upload: boolean use multipart upload or not (only available from TM1 12 onwards)
129+
:param max_mb_per_part: max megabyte per part in multipart upload (only available from TM1 12 onwards)
130+
:param max_workers: max parallel workers for multipart upload (only available from TM1 12 onwards)
125131
"""
126132
path = Path(file_name)
127-
self._check_subfolder_support(path = path, function = "FileService.create")
133+
self._check_subfolder_support(path=path, function="FileService.create")
128134

129135
# Create folder structure iteratively
130136
if path.parents:
@@ -141,46 +147,133 @@ def create(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
141147
}
142148
self._rest.POST(url, json.dumps(body), **kwargs)
143149

150+
return self._upload_file_content(path, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)
151+
152+
def _upload_file_content(
153+
self,
154+
path: Path,
155+
file_content: bytes,
156+
multi_part_upload: bool = False,
157+
max_mb_per_part: float = 200,
158+
max_workers: int = 1,
159+
**kwargs):
160+
144161
url = self._construct_content_url(path, exclude_path_end=False, extension="Content")
145-
return self._rest.PUT(
146-
url=url,
147-
data=file_content,
148-
headers=self.binary_http_header,
162+
163+
if not multi_part_upload:
164+
return self._rest.PUT(
165+
url=url,
166+
data=file_content,
167+
headers=self.binary_http_header,
168+
**kwargs)
169+
170+
self.upload_file_content_with_mpu(url, file_content, max_mb_per_part, max_workers, **kwargs)
171+
172+
def upload_file_content_with_mpu(self, content_url: str, file_content: bytes, max_mb_per_part: float,
173+
max_workers: int = 1, **kwargs):
174+
# Initiate multipart upload
175+
response = self._rest.POST(
176+
url=content_url + "/mpu.CreateMultipartUpload",
177+
data="{}",
178+
async_requests_mode=False,
149179
**kwargs)
150-
180+
upload_id = response.json()['UploadID']
181+
182+
# Split the file content into parts
183+
parts_to_upload = self._split_into_parts(
184+
data=file_content,
185+
max_chunk_size=int(max_mb_per_part * 1024 * 1024)
186+
)
187+
188+
part_numbers_and_etags = []
189+
190+
# helper function for uploading each part
191+
def upload_part(part_index, bytes_part):
192+
response = self._rest.POST(
193+
url=content_url + f"/!uploads('{upload_id}')/Parts",
194+
data=bytes_part,
195+
headers={**self.binary_http_header, 'Accept': 'application/json,text/plain'},
196+
async_requests_mode=False,
197+
**kwargs)
198+
return part_index, response.json()["PartNumber"], response.json()["@odata.etag"]
199+
200+
if max_workers > 1:
201+
# upload parts concurrently
202+
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
203+
204+
futures = {
205+
executor.submit(upload_part, i, part): i
206+
for i, part
207+
in enumerate(parts_to_upload)}
208+
209+
for future in concurrent.futures.as_completed(futures):
210+
part_index, part_number, etag = future.result()
211+
part_numbers_and_etags.append((part_index, part_number, etag))
212+
213+
else:
214+
# Sequential upload
215+
for i, bytes_part in enumerate(parts_to_upload):
216+
part_index, part_number, etag = upload_part(i, bytes_part)
217+
part_numbers_and_etags.append((part_index, part_number, etag))
218+
219+
# Complete the multipart upload
220+
self._rest.POST(
221+
url=content_url + f"/!uploads('{upload_id}')/mpu.Complete",
222+
data=json.dumps(
223+
{"Parts": [
224+
{"PartNumber": part_number, "ETag": etag}
225+
for _, part_number, etag in sorted(part_numbers_and_etags)
226+
]}
227+
)
228+
)
229+
230+
def _split_into_parts(self, data: Union[bytes, BytesIO], max_chunk_size: int = 200 * 1024 * 1024):
231+
# Convert data to bytes if it's a BytesIO object
232+
if isinstance(data, BytesIO):
233+
data = data.getvalue()
234+
235+
# List to store chunks
236+
parts = []
237+
238+
# Split data into chunks
239+
for i in range(0, len(data), max_chunk_size):
240+
part = data[i:i + max_chunk_size]
241+
parts.append(part)
242+
243+
return parts
244+
151245
@require_version(version="11.4")
152-
def update(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
246+
def update(self, file_name: Union[str, Path], file_content: bytes, multi_part_upload: bool = False,
247+
max_mb_per_part: float = 200, max_workers: int = 1, **kwargs):
153248
""" Update existing file
154249
155250
:param file_name: file name in root or path to file
156251
:param file_content: file_content as bytes or BytesIO
252+
:param multi_part_upload: boolean use multipart upload or not (only available from TM1 12 onwards)
253+
:param max_mb_per_part: max megabyte per part in multipart upload (only available from TM1 12 onwards)
254+
:param max_workers: max parallel workers for multipart upload (only available from TM1 12 onwards)
157255
"""
158256
path = Path(file_name)
159-
self._check_subfolder_support(path = path, function = "FileService.update")
257+
self._check_subfolder_support(path=path, function="FileService.update")
160258

161-
url = self._construct_content_url(
162-
path=path,
163-
exclude_path_end=False,
164-
extension="Content")
259+
return self._upload_file_content(path, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)
165260

166-
return self._rest.PUT(
167-
url=url,
168-
data=file_content,
169-
headers=self.binary_http_header,
170-
**kwargs)
171-
172261
@require_version(version="11.4")
173-
def update_or_create(self, file_name: Union[str, Path], file_content: bytes, **kwargs):
262+
def update_or_create(self, file_name: Union[str, Path], file_content: bytes, multi_part_upload: bool = False,
263+
max_mb_per_part: float = 200, max_workers: int = 1, **kwargs):
174264
""" Create file or update file if it already exists
175265
176266
:param file_name: file name in root or path to file
177267
:param file_content: file_content as bytes or BytesIO
268+
:param multi_part_upload: boolean use multipart upload or not (only available from TM1 12 onwards)
269+
:param max_mb_per_part: max megabyte per part in multipart upload (only available from TM1 12 onwards)
270+
:param max_workers: max parallel workers for multipart upload (only available from TM1 12 onwards)
178271
"""
179272
if self.exists(file_name, **kwargs):
180-
return self.update(file_name, file_content, **kwargs)
273+
return self.update(file_name, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)
274+
275+
return self.create(file_name, file_content, multi_part_upload, max_mb_per_part, max_workers, **kwargs)
181276

182-
return self.create(file_name, file_content, **kwargs)
183-
184277
@require_version(version="11.4")
185278
def exists(self, file_name: Union[str, Path], **kwargs):
186279
""" Check if file exists
@@ -193,23 +286,23 @@ def exists(self, file_name: Union[str, Path], **kwargs):
193286
extension="")
194287

195288
return self._exists(url, **kwargs)
196-
289+
197290
@require_version(version="11.4")
198291
def delete(self, file_name: Union[str, Path], **kwargs):
199292
""" Delete file
200293
201294
:param file_name: file name in root or path to file
202295
"""
203296
path = Path(file_name)
204-
self._check_subfolder_support(path = path, function = "FileService.delete")
297+
self._check_subfolder_support(path=path, function="FileService.delete")
205298

206299
url = self._construct_content_url(
207300
path=path,
208301
exclude_path_end=False,
209302
extension="")
210303

211304
return self._rest.DELETE(url, **kwargs)
212-
305+
213306
@require_version(version="11.4")
214307
def search_string_in_name(self, name_startswith: str = None, name_contains: Iterable = None,
215308
name_contains_operator: str = 'and', path: Union[Path, str] = "",
@@ -242,9 +335,9 @@ def search_string_in_name(self, name_startswith: str = None, name_contains: Iter
242335

243336
else:
244337
raise ValueError("'name_contains' must be str or iterable")
245-
338+
246339
path = Path(path)
247-
self._check_subfolder_support(path = path, function = "FileService.search_string_in_name")
340+
self._check_subfolder_support(path=path, function="FileService.search_string_in_name")
248341

249342
url = self._construct_content_url(
250343
path=Path(path),

Tests/FileService_test.py

+39-4
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,37 @@ def setUpV12(self):
4141
if self.tm1.files.exists(self.FILE_NAME2_IN_FOLDER):
4242
self.tm1.files.delete(self.FILE_NAME2_IN_FOLDER)
4343

44-
@skip_if_version_lower_than(version="11.4")
45-
def test_create_get(self):
44+
45+
def run_create_get(self, mpu, max_mb_per_part=None, max_workers=1):
4646
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
47-
self.tm1.files.update_or_create(self.FILE_NAME1, original_file.read())
47+
self.tm1.files.update_or_create(
48+
self.FILE_NAME1,
49+
original_file.read(),
50+
multi_part_upload=mpu,
51+
max_mb_per_part=max_mb_per_part,
52+
max_workers=max_workers)
4853

4954
created_file = self.tm1.files.get(self.FILE_NAME1)
50-
5155
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
5256
self.assertEqual(original_file.read(), created_file)
5357

58+
@skip_if_version_lower_than(version="11.4")
59+
def test_create_get(self):
60+
self.run_create_get(mpu=False)
61+
62+
@skip_if_version_lower_than(version="12")
63+
def test_create_get_with_mpu_1_byte_per_part(self):
64+
self.run_create_get(mpu=True, max_mb_per_part=1/(1024*1024))
65+
66+
67+
@skip_if_version_lower_than(version="12")
68+
def test_create_get_with_mpu_1_byte_per_part_10_max_workers(self):
69+
self.run_create_get(mpu=True, max_mb_per_part=1/(1024*1024), max_workers=10)
70+
71+
@skip_if_version_lower_than(version="12")
72+
def test_create_get_with_mpu_200_megabyte_per_part(self):
73+
self.run_create_get(mpu=True, max_mb_per_part=200)
74+
5475
@skip_if_version_lower_than(version="12")
5576
def test_create_get_in_folder(self):
5677
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
@@ -61,6 +82,20 @@ def test_create_get_in_folder(self):
6182
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
6283
self.assertEqual(original_file.read(), created_file)
6384

85+
@skip_if_version_lower_than(version="12")
86+
def test_create_get_in_folder_with_mpu(self):
87+
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
88+
self.tm1.files.update_or_create(
89+
self.FILE_NAME1_IN_FOLDER,
90+
original_file.read(),
91+
mpu=True,
92+
max_mb_per_part=1/(1024*1024))
93+
94+
created_file = self.tm1.files.get(self.FILE_NAME1_IN_FOLDER)
95+
96+
with open(Path(__file__).parent.joinpath('resources', 'file.csv'), "rb") as original_file:
97+
self.assertEqual(original_file.read(), created_file)
98+
6499
@skip_if_version_lower_than(version="11.4")
65100
@skip_if_version_higher_or_equal_than(version="12")
66101
def test_create_in_folder_exception(self):

0 commit comments

Comments
 (0)