Skip to content

Commit

Permalink
fix(s3bot3): spool buffer file to end of all uploaded parts after each
Browse files Browse the repository at this point in the history
             part upload.

             fixes similar issue to jschneier#160 for s3boto3. Inspired by
             vinayinvicible's fix for jschneier#160.
  • Loading branch information
mattswoon committed Jul 24, 2017
1 parent e88dd98 commit 8339e18
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 3 deletions.
13 changes: 11 additions & 2 deletions storages/backends/s3boto3.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ def __init__(self, name, mode, storage, buffer_size=None):
if buffer_size is not None:
self.buffer_size = buffer_size
self._write_counter = 0
# file position of the latest part file
self._last_part_pos = 0

@property
def size(self):
Expand Down Expand Up @@ -120,10 +122,14 @@ def write(self, content):
if self._storage.encryption:
parameters['ServerSideEncryption'] = 'AES256'
self._multipart = self.obj.initiate_multipart_upload(**parameters)
if self.buffer_size <= self._buffer_file_size:
if self.buffer_size <= self._file_part_size:
self._flush_write_buffer()
return super(S3Boto3StorageFile, self).write(force_bytes(content))

@property
def _file_part_size(self):
return self._buffer_file_size - self._last_part_pos

@property
def _buffer_file_size(self):
pos = self.file.tell()
Expand All @@ -138,9 +144,12 @@ def _flush_write_buffer(self):
"""
if self._buffer_file_size:
self._write_counter += 1
self.file.seek(0)
pos = self.file.tell()
self.file.seek(self._last_part_pos)
part = self._multipart.Part(self._write_counter)
part.upload(Body=self.file.read())
self.file.seek(pos)
self._last_part_pos = self._buffer_file_size

def close(self):
if self._is_dirty:
Expand Down
107 changes: 106 additions & 1 deletion tests/test_s3boto3.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import gzip
import gzip, os
from datetime import datetime

from botocore.exceptions import ClientError
Expand Down Expand Up @@ -342,3 +342,108 @@ def test_strip_signing_parameters(self):
'%s?X-Amz-Date=12345678&X-Amz-Signature=Signature' % expected), expected)
self.assertEqual(self.storage._strip_signing_parameters(
'%s?expires=12345678&signature=Signature' % expected), expected)

def test_file_greater_than_5mb(self):
"""
test writing a large file in a single part so that the buffer is flushed
only on close
"""
name = 'test_storage_save.txt'
content = '0' * 10 * 1024 * 1024

# set the encryption flag used for multipart uploads
self.storage.encryption = True
self.storage.reduced_redundancy = True
self.storage.default_acl = 'public-read'


f = self.storage.open(name, 'w')
self.storage.bucket.Object.assert_called_with(name)
obj = self.storage.bucket.Object.return_value
# set the name of the mock object
obj.key = name
multipart = obj.initiate_multipart_upload.return_value
part = multipart.Part.return_value
multipart.parts.all.return_value = [mock.MagicMock(e_tag='123', part_number=1)]

with mock.patch.object(f, '_flush_write_buffer') as method:
f.write(content)
method.assert_not_called() # buffer not flushed on write

assert f._file_part_size == len(content)
obj.initiate_multipart_upload.assert_called_with(
ACL='public-read',
ContentType='text/plain',
ServerSideEncryption='AES256',
StorageClass='REDUCED_REDUNDANCY'
)

with mock.patch.object(f, '_flush_write_buffer', wraps=f._flush_write_buffer) as method:
f.close()
method.assert_called_with() # buffer flushed on close
multipart.Part.assert_called_with(1)
part.upload.assert_called_with(Body=content.encode('utf-8'))
multipart.complete.assert_called_once_with(
MultipartUpload={'Parts': [{'ETag': '123', 'PartNumber': 1}]})

def test_file_write_after_exceeding_5mb(self):
"""
test writing a large file in two parts so that the buffer is flushed
on write and on close
"""
name = 'test_storage_save.txt'
content1 = '0' * 5 * 1024 * 1024
content2 = '0'

# set the encryption flag used for multipart uploads
self.storage.encryption = True
self.storage.reduced_redundancy = True
self.storage.default_acl = 'public-read'

f = self.storage.open(name, 'w')
self.storage.bucket.Object.assert_called_with(name)
obj = self.storage.bucket.Object.return_value
# set the name of the mock object
obj.key = name
multipart = obj.initiate_multipart_upload.return_value
part = multipart.Part.return_value
multipart.parts.all.return_value = [
mock.MagicMock(e_tag='123', part_number=1),
mock.MagicMock(e_tag='456', part_number=2)
]

with mock.patch.object(f, '_flush_write_buffer', wraps=f._flush_write_buffer) as method:
f.write(content1)
method.assert_not_called() # buffer doesn't get flushed on the first write
assert f._file_part_size == len(content1) # file part size is the size of what's written
assert f._last_part_pos == 0 # no parts added, so last part stays at 0
f.write(content2)
method.assert_called_with() # second write flushes buffer
multipart.Part.assert_called_with(1) # first part created
part.upload.assert_called_with(Body=content1.encode('utf-8')) # first part is uploaded
assert f._last_part_pos == len(content1) # buffer spools to end of content1
assert f._buffer_file_size == len(content1) + len(content2) # _buffer_file_size is total written
assert f._file_part_size == len(content2) # new part is size of content2

obj.initiate_multipart_upload.assert_called_with(
ACL='public-read',
ContentType='text/plain',
ServerSideEncryption='AES256',
StorageClass='REDUCED_REDUNDANCY'
)
# save the internal file before closing
f.close()
multipart.Part.assert_called_with(2)
part.upload.assert_called_with(Body=content2.encode('utf-8'))
multipart.complete.assert_called_once_with(
MultipartUpload={'Parts': [
{
'ETag': '123',
'PartNumber': 1
},
{
'ETag': '456',
'PartNumber': 2
}
]})

0 comments on commit 8339e18

Please sign in to comment.