Skip to content

Commit

Permalink
Optimized page blob upload by skipping empty ranges
Browse files Browse the repository at this point in the history
  • Loading branch information
zezha-msft committed Nov 17, 2017
1 parent 1ea5f5f commit eb9183a
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 21 deletions.
4 changes: 4 additions & 0 deletions azure-storage-blob/ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

> See [BreakingChanges](BreakingChanges.md) for a detailed list of API breaks.
## Version XX.XX.XX:

- Optimized page blob upload for create_blob_from_* methods, by skipping the empty chunks.

## Version 0.37.1:

- Enabling MD5 validation no longer uses the memory-efficient algorithm for large block blobs, since computing the MD5 hash requires reading the entire block into memory.
Expand Down
51 changes: 33 additions & 18 deletions azure-storage-blob/azure/storage/blob/_upload_chunking.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,19 @@ def get_chunk_streams(self):
data = self.padder.update(data)
if self.encryptor:
data = self.encryptor.update(data)
yield index, BytesIO(data)
yield index, data
else:
if self.padder:
data = self.padder.update(data) + self.padder.finalize()
if self.encryptor:
data = self.encryptor.update(data) + self.encryptor.finalize()
if len(data) > 0:
yield index, BytesIO(data)
yield index, data
break
index += len(data)

def process_chunk(self, chunk_data):
chunk_bytes = chunk_data[1].read()
chunk_bytes = chunk_data[1]
chunk_offset = chunk_data[0]
return self._upload_chunk_with_progress(chunk_offset, chunk_bytes)

Expand Down Expand Up @@ -290,24 +290,39 @@ def _upload_substream_block(self, block_id, block_stream):


class _PageBlobChunkUploader(_BlobChunkUploader):
EMPTY_PAGE = bytearray(512)

def _is_chunk_empty(self, chunk_data):
# read until non-zero data is encountered
# if reached the end without returning, then chunk_data is all 0's
data = BytesIO(chunk_data)
page = data.read(512)
while page != b'':
if page != self.EMPTY_PAGE:
return False
page = data.read(512)
return True

def _upload_chunk(self, chunk_start, chunk_data):
chunk_end = chunk_start + len(chunk_data) - 1
resp = self.blob_service._update_page(
self.container_name,
self.blob_name,
chunk_data,
chunk_start,
chunk_end,
validate_content=self.validate_content,
lease_id=self.lease_id,
if_match=self.if_match,
timeout=self.timeout,
)
# avoid uploading the empty pages
if not self._is_chunk_empty(chunk_data):
chunk_end = chunk_start + len(chunk_data) - 1
resp = self.blob_service._update_page(
self.container_name,
self.blob_name,
chunk_data,
chunk_start,
chunk_end,
validate_content=self.validate_content,
lease_id=self.lease_id,
if_match=self.if_match,
timeout=self.timeout,
)

if not self.parallel:
self.if_match = resp.etag
if not self.parallel:
self.if_match = resp.etag

self.set_response_properties(resp)
self.set_response_properties(resp)


class _AppendBlobChunkUploader(_BlobChunkUploader):
Expand Down
8 changes: 5 additions & 3 deletions azure-storage-blob/azure/storage/blob/pageblobservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ class PageBlobService(BaseBlobService):
and a range that align to 512-byte page boundaries. A write to a page blob
can overwrite just one page, some pages, or up to 4 MB of the page blob.
Writes to page blobs happen in-place and are immediately committed to the
blob. The maximum size for a page blob is 1 TB.
blob. The maximum size for a page blob is 8 TB.
:ivar int MAX_PAGE_SIZE:
The size of the pages put by create_blob_from_* methods. Smaller pages
may be put if there is less data provided. The maximum page size the service
supports is 4MB.
supports is 4MB. When using the create_blob_from_* methods, empty pages are skipped.
'''

MAX_PAGE_SIZE = 4 * 1024 * 1024
Expand Down Expand Up @@ -802,6 +802,7 @@ def create_blob_from_path(
'''
Creates a new blob from a file path, or updates the content of an
existing blob, with automatic chunking and progress notifications.
Empty chunks are skipped, while non-emtpy ones(even if only partly filled) are uploaded.
:param str container_name:
Name of existing container.
Expand Down Expand Up @@ -895,6 +896,7 @@ def create_blob_from_stream(
'''
Creates a new blob from a file/stream, or updates the content of an
existing blob, with automatic chunking and progress notifications.
Empty chunks are skipped, while non-emtpy ones(even if only partly filled) are uploaded.
:param str container_name:
Name of existing container.
Expand Down Expand Up @@ -1028,7 +1030,7 @@ def create_blob_from_bytes(
'''
Creates a new blob from an array of bytes, or updates the content
of an existing blob, with automatic chunking and progress
notifications.
notifications. Empty chunks are skipped, while non-emtpy ones(even if only partly filled) are uploaded.
:param str container_name:
Name of existing container.
Expand Down
32 changes: 32 additions & 0 deletions tests/blob/test_page_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,38 @@ def test_create_blob_from_stream(self):
self.assertEqual(blob.properties.etag, create_resp.etag)
self.assertEqual(blob.properties.last_modified, create_resp.last_modified)

def test_create_blob_from_stream_with_empty_pages(self):
# parallel tests introduce random order of requests, can only run live
if TestMode.need_recording_file(self.test_mode):
return

# Arrange
# data is almost all empty (0s) except two ranges
blob_name = self._get_blob_reference()
data = bytearray(LARGE_BLOB_SIZE)
data[512: 1024] = self.get_random_bytes(512)
data[8192: 8196] = self.get_random_bytes(4)
with open(FILE_PATH, 'wb') as stream:
stream.write(data)

# Act
blob_size = len(data)
with open(FILE_PATH, 'rb') as stream:
create_resp = self.bs.create_blob_from_stream(self.container_name, blob_name, stream, blob_size)
blob = self.bs.get_blob_properties(self.container_name, blob_name)

# Assert
# the uploader should have skipped the empty ranges
self.assertBlobEqual(self.container_name, blob_name, data[:blob_size])
page_ranges = list(self.bs.get_page_ranges(self.container_name, blob_name))
self.assertEqual(len(page_ranges), 2)
self.assertEqual(page_ranges[0].start, 0)
self.assertEqual(page_ranges[0].end, 4095)
self.assertEqual(page_ranges[1].start, 8192)
self.assertEqual(page_ranges[1].end, 12287)
self.assertEqual(blob.properties.etag, create_resp.etag)
self.assertEqual(blob.properties.last_modified, create_resp.last_modified)

def test_create_blob_from_stream_non_seekable(self):
# parallel tests introduce random order of requests, can only run live
if TestMode.need_recording_file(self.test_mode):
Expand Down

0 comments on commit eb9183a

Please sign in to comment.