From 653789e01f89ddf027ba9c1aecdf0dd5a24c0b7f Mon Sep 17 00:00:00 2001 From: Joe maley Date: Mon, 16 Mar 2020 15:10:24 -0400 Subject: [PATCH] Misc Azure Fixes Change #1: Fix non-multipart uploads This fixes an issue where we free the buffer before writing it in the non-multipart upload path, which can be used for quicker small writes. Adds an associated unit test. Change #2: Missing error return path in blob flush In the blob flush path for multipart uploads, the last flush of the write cache may silently fail. This ensures the user receives a non-OK status from flush_blob() in this scenario. Change #3: Multipart uploads fail when they have > 10 chunks In a the multipart upload (aka blocklist upload), each chunk is uploaded a block with a string-id unique to the overall object (aka blob). There is an undocumented requirement that all string-ids must be of equal length. Currently, the string-ids are generated from an incrementing integer. For example, the id of the first chunk is "0", the second chunk is "1", the tenth chunk is "9", and the eleventh chunk is "10". The eleventh chunk has a string-id size of 2 while all the others have a size of 1. The eleventh chunk (and all other future chunks) fail with the following error message: "The specified blob or block content is invalid" This patches pads the string-ids to ensure they are all 5-characters long. The maximum number of chunks is 50,000 so 5-characters is sufficient to contain all chunk ids. More info here: https://gauravmantri.com/2013/05/18/windows-azure-blob-storage-dealing-with-the-specified-blob-or-block-content-is-invalid-error/ --- test/src/unit-azure.cc | 165 ++++++++++++++++++++++++++++++---- tiledb/sm/filesystem/azure.cc | 21 +++-- tiledb/sm/filesystem/azure.h | 13 ++- 3 files changed, 171 insertions(+), 28 deletions(-) diff --git a/test/src/unit-azure.cc b/test/src/unit-azure.cc index 445f2fe430c3..13241423d4ba 100644 --- a/test/src/unit-azure.cc +++ b/test/src/unit-azure.cc @@ -49,18 +49,34 @@ struct AzureFx { const tiledb::sm::URI AZURE_CONTAINER = tiledb::sm::URI(AZURE_PREFIX + random_container_name("tiledb") + "/"); const std::string TEST_DIR = AZURE_CONTAINER.to_string() + "tiledb_test_dir/"; + tiledb::sm::Azure azure_; ThreadPool thread_pool_; - AzureFx(); + AzureFx() = default; ~AzureFx(); + void init_azure(Config&& config); + static std::string random_container_name(const std::string& prefix); }; -AzureFx::AzureFx() { +AzureFx::~AzureFx() { + // Empty container + bool is_empty; + REQUIRE(azure_.is_empty_container(AZURE_CONTAINER, &is_empty).ok()); + if (!is_empty) { + REQUIRE(azure_.empty_container(AZURE_CONTAINER).ok()); + REQUIRE(azure_.is_empty_container(AZURE_CONTAINER, &is_empty).ok()); + REQUIRE(is_empty); + } + + // Delete container + REQUIRE(azure_.remove_container(AZURE_CONTAINER).ok()); +} + +void AzureFx::init_azure(Config&& config) { // Connect - Config config; REQUIRE( config.set("vfs.azure.storage_account_name", "devstoreaccount1").ok()); REQUIRE(config @@ -92,20 +108,6 @@ AzureFx::AzureFx() { REQUIRE(is_empty); } -AzureFx::~AzureFx() { - // Empty container - bool is_empty; - REQUIRE(azure_.is_empty_container(AZURE_CONTAINER, &is_empty).ok()); - if (!is_empty) { - REQUIRE(azure_.empty_container(AZURE_CONTAINER).ok()); - REQUIRE(azure_.is_empty_container(AZURE_CONTAINER, &is_empty).ok()); - REQUIRE(is_empty); - } - - // Delete container - REQUIRE(azure_.remove_container(AZURE_CONTAINER).ok()); -} - std::string AzureFx::random_container_name(const std::string& prefix) { std::stringstream ss; ss << prefix << "-" << std::this_thread::get_id() << "-" @@ -114,6 +116,10 @@ std::string AzureFx::random_container_name(const std::string& prefix) { } TEST_CASE_METHOD(AzureFx, "Test Azure filesystem, file management", "[azure]") { + Config config; + config.set("vfs.azure.use_block_list_upload", "true"); + init_azure(std::move(config)); + /* Create the following file hierarchy: * * TEST_DIR/dir/subdir/file1 @@ -223,9 +229,22 @@ TEST_CASE_METHOD(AzureFx, "Test Azure filesystem, file management", "[azure]") { REQUIRE(!is_blob); } -TEST_CASE_METHOD(AzureFx, "Test Azure filesystem, file I/O", "[azure]") { +TEST_CASE_METHOD( + AzureFx, "Test Azure filesystem, file I/O", "[azure][multipart]") { + Config config; + const uint64_t max_parallel_ops = 2; + const uint64_t block_list_block_size = 4 * 1024 * 1024; + config.set("vfs.azure.use_block_list_upload", "true"); + config.set("vfs.azure.max_parallel_ops", std::to_string(max_parallel_ops)); + config.set( + "vfs.azure.block_list_block_size", std::to_string(block_list_block_size)); + init_azure(std::move(config)); + + const uint64_t write_cache_max_size = + max_parallel_ops * block_list_block_size; + // Prepare buffers - uint64_t buffer_size = 5 * 1024 * 1024; + uint64_t buffer_size = write_cache_max_size * 5; auto write_buffer = new char[buffer_size]; for (uint64_t i = 0; i < buffer_size; i++) write_buffer[i] = (char)('a' + (i % 26)); @@ -291,4 +310,112 @@ TEST_CASE_METHOD(AzureFx, "Test Azure filesystem, file I/O", "[azure]") { REQUIRE(allok); } +TEST_CASE_METHOD( + AzureFx, + "Test Azure filesystem, file I/O, no multipart", + "[azure][no_multipart]") { + Config config; + const uint64_t max_parallel_ops = 2; + const uint64_t block_list_block_size = 4 * 1024 * 1024; + config.set("vfs.azure.use_block_list_upload", "false"); + config.set("vfs.azure.max_parallel_ops", std::to_string(max_parallel_ops)); + config.set( + "vfs.azure.block_list_block_size", std::to_string(block_list_block_size)); + init_azure(std::move(config)); + + const uint64_t write_cache_max_size = + max_parallel_ops * block_list_block_size; + + // Prepare a large buffer that can fit in the write cache. + uint64_t large_buffer_size = write_cache_max_size; + auto large_write_buffer = new char[large_buffer_size]; + for (uint64_t i = 0; i < large_buffer_size; i++) + large_write_buffer[i] = (char)('a' + (i % 26)); + + // Prepare a small buffer that can fit in the write cache. + uint64_t small_buffer_size = write_cache_max_size / 1024; + auto small_write_buffer = new char[small_buffer_size]; + for (uint64_t i = 0; i < small_buffer_size; i++) + small_write_buffer[i] = (char)('a' + (i % 26)); + + // Prepare a buffer too large to fit in the write cache. + uint64_t oob_buffer_size = write_cache_max_size + 1; + auto oob_write_buffer = new char[oob_buffer_size]; + for (uint64_t i = 0; i < oob_buffer_size; i++) + oob_write_buffer[i] = (char)('a' + (i % 26)); + + auto large_file = TEST_DIR + "largefile"; + REQUIRE(azure_.write(URI(large_file), large_write_buffer, large_buffer_size) + .ok()); + + auto small_file_1 = TEST_DIR + "smallfile1"; + REQUIRE(azure_.write(URI(small_file_1), small_write_buffer, small_buffer_size) + .ok()); + + auto small_file_2 = TEST_DIR + "smallfile2"; + REQUIRE(azure_.write(URI(small_file_2), small_write_buffer, small_buffer_size) + .ok()); + REQUIRE(azure_.write(URI(small_file_2), small_write_buffer, small_buffer_size) + .ok()); + + auto oob_file = TEST_DIR + "oobfile"; + REQUIRE(!azure_.write(URI(oob_file), oob_write_buffer, oob_buffer_size).ok()); + + // Before flushing, the files do not exist + bool is_blob; + REQUIRE(azure_.is_blob(URI(large_file), &is_blob).ok()); + REQUIRE(!is_blob); + REQUIRE(azure_.is_blob(URI(small_file_1), &is_blob).ok()); + REQUIRE(!is_blob); + REQUIRE(azure_.is_blob(URI(small_file_2), &is_blob).ok()); + REQUIRE(!is_blob); + REQUIRE(azure_.is_blob(URI(oob_file), &is_blob).ok()); + REQUIRE(!is_blob); + + // Flush the files + REQUIRE(azure_.flush_blob(URI(small_file_1)).ok()); + REQUIRE(azure_.flush_blob(URI(small_file_2)).ok()); + REQUIRE(azure_.flush_blob(URI(large_file)).ok()); + + // After flushing, the files exist + REQUIRE(azure_.is_blob(URI(large_file), &is_blob).ok()); + REQUIRE(is_blob); + REQUIRE(azure_.is_blob(URI(small_file_1), &is_blob).ok()); + REQUIRE(is_blob); + REQUIRE(azure_.is_blob(URI(small_file_2), &is_blob).ok()); + REQUIRE(is_blob); + + // Get file sizes + uint64_t nbytes = 0; + REQUIRE(azure_.blob_size(URI(large_file), &nbytes).ok()); + CHECK(nbytes == large_buffer_size); + REQUIRE(azure_.blob_size(URI(small_file_1), &nbytes).ok()); + CHECK(nbytes == small_buffer_size); + REQUIRE(azure_.blob_size(URI(small_file_2), &nbytes).ok()); + CHECK(nbytes == (small_buffer_size + small_buffer_size)); + + // Read from the beginning + auto read_buffer = new char[26]; + REQUIRE(azure_.read(URI(large_file), 0, read_buffer, 26).ok()); + bool allok = true; + for (int i = 0; i < 26; i++) { + if (read_buffer[i] != static_cast('a' + i)) { + allok = false; + break; + } + } + REQUIRE(allok); + + // Read from a different offset + REQUIRE(azure_.read(URI(large_file), 11, read_buffer, 26).ok()); + allok = true; + for (int i = 0; i < 26; i++) { + if (read_buffer[i] != static_cast('a' + (i + 11) % 26)) { + allok = false; + break; + } + } + REQUIRE(allok); +} + #endif diff --git a/tiledb/sm/filesystem/azure.cc b/tiledb/sm/filesystem/azure.cc index f2fdcb4cecdb..445a37c5b416 100644 --- a/tiledb/sm/filesystem/azure.cc +++ b/tiledb/sm/filesystem/azure.cc @@ -244,6 +244,9 @@ Status Azure::flush_blob(const URI& uri) { &block_list_upload_states_.at(uri.to_string()); if (!state->st().ok()) { + // Save the return status because 'state' will be freed before we return. + const Status st = state->st(); + // Unlike S3 that can abort a chunked upload to immediately release // uncommited chunks and leave the original object unmodified, the // only way to do this on Azure is by some form of a write. We must @@ -261,7 +264,7 @@ Status Azure::flush_blob(const URI& uri) { // transactions. finish_block_list_upload(uri); - return Status::Ok(); + return st; } // Build the block list to commit. @@ -333,11 +336,6 @@ Status Azure::flush_blob_direct(const URI& uri) { // We do not store any custom metadata with the blob. std::vector> empty_metadata; - // Protect 'write_cache_map_' from multiple writers. - std::unique_lock cache_lock(write_cache_map_lock_); - write_cache_map_.erase(uri.to_string()); - cache_lock.unlock(); - // Unlike the 'upload_block_from_buffer' interface used in // the block list upload path, there is not an interface to // upload a single blob with a buffer. There is only @@ -350,7 +348,11 @@ Status Azure::flush_blob_direct(const URI& uri) { std::future> result = client_->upload_block_blob_from_stream( - container_name, blob_path, zc_istream, empty_metadata); + container_name, + blob_path, + zc_istream, + empty_metadata, + write_cache_buffer->size()); if (!result.valid()) { return LOG_STATUS(Status::AzureError( std::string("Flush blob failed on: " + uri.to_string()))); @@ -362,6 +364,11 @@ Status Azure::flush_blob_direct(const URI& uri) { std::string("Flush blob failed on: " + uri.to_string()))); } + // Protect 'write_cache_map_' from multiple writers. + std::unique_lock cache_lock(write_cache_map_lock_); + write_cache_map_.erase(uri.to_string()); + cache_lock.unlock(); + return wait_for_blob_to_propagate(container_name, blob_path); } diff --git a/tiledb/sm/filesystem/azure.h b/tiledb/sm/filesystem/azure.h index 45a290012526..d10eae83dbeb 100644 --- a/tiledb/sm/filesystem/azure.h +++ b/tiledb/sm/filesystem/azure.h @@ -343,9 +343,18 @@ class Azure { std::string next_block_id() { const uint64_t block_id = next_block_id_++; const std::string block_id_str = std::to_string(block_id); + + // Pad the block id string with enough leading zeros to support + // the maximum number of blocks (50,000). All block ids must be + // of equal length among a single blob. + const int block_id_chars = 5; + const std::string padded_block_id_str = + std::string(block_id_chars - block_id_str.length(), '0') + + block_id_str; + const std::string b64_block_id_str = azure::storage_lite::to_base64( - reinterpret_cast(block_id_str.c_str()), - block_id_str.size()); + reinterpret_cast(padded_block_id_str.c_str()), + padded_block_id_str.size()); block_ids_.emplace_back(b64_block_id_str);