Skip to content

Commit

Permalink
s3: upload parts from temporary file
Browse files Browse the repository at this point in the history
Currently when doing many concurrent uploads, rohmu can use a lot of
memory.  Copying parts first to a temporary file should not take a lot
of extra time compared to uploading to s3.  Note that upload_part
requires `.seek` so that retries work correctly, so we cannot just pass
fp directly.
  • Loading branch information
joelynch committed Feb 12, 2025
1 parent 1d0b29e commit 50bc7e0
Showing 1 changed file with 23 additions and 15 deletions.
38 changes: 23 additions & 15 deletions rohmu/object_storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from functools import partial
from http import HTTPStatus
from pathlib import Path
from rohmu.common.constants import IO_BLOCK_SIZE
from rohmu.common.models import StorageOperation
from rohmu.common.statsd import StatsdConfig
from rohmu.errors import (
Expand Down Expand Up @@ -50,6 +51,7 @@
import botocore.session
import contextlib
import math
import tempfile
import time

if TYPE_CHECKING:
Expand Down Expand Up @@ -505,20 +507,26 @@ def multipart_upload_file_object(
mp_id = cmu_response["UploadId"]

while True:
data = self._read_bytes(fp, self.multipart_chunk_size)
if not data:
break

start_of_part_upload = time.monotonic()
self.stats.operation(StorageOperation.store_file, size=len(data))
try:
cup_response = self.get_client().upload_part(
Body=data,
Bucket=self.bucket_name,
Key=path,
PartNumber=part_number,
UploadId=mp_id,
)
# Use a temporary file to minimize memory usage
with tempfile.TemporaryFile() as data:
length = 0
while chunk := fp.read(min(IO_BLOCK_SIZE, self.multipart_chunk_size - length)):
data.write(chunk)
length += len(chunk)
if not length:
break
data.seek(0)

start_of_part_upload = time.monotonic()
self.stats.operation(StorageOperation.store_file, size=length)
cup_response = self.get_client().upload_part(
Body=data,
Bucket=self.bucket_name,
Key=path,
PartNumber=part_number,
UploadId=mp_id,
)
except botocore.exceptions.ClientError as ex:
self.log.exception("Uploading part %d for %s failed", part_number, path)
self.stats.operation(StorageOperation.multipart_aborted)
Expand All @@ -536,7 +544,7 @@ def multipart_upload_file_object(
"Uploaded part %s of %s, size %s in %.2fs",
part_number,
chunks,
len(data),
length,
time.monotonic() - start_of_part_upload,
)
parts.append(
Expand All @@ -546,7 +554,7 @@ def multipart_upload_file_object(
}
)
part_number += 1
bytes_sent += len(data)
bytes_sent += length
if progress_fn:
# TODO: change this to incremental progress. Size parameter is currently unused.
progress_fn(bytes_sent, size) # type: ignore[arg-type]
Expand Down

0 comments on commit 50bc7e0

Please sign in to comment.