Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc Azure Fixes #1550

Merged
merged 1 commit into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 149 additions & 20 deletions test/src/unit-azure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,34 @@ struct AzureFx {
const tiledb::sm::URI AZURE_CONTAINER =
tiledb::sm::URI(AZURE_PREFIX + random_container_name("tiledb") + "/");
const std::string TEST_DIR = AZURE_CONTAINER.to_string() + "tiledb_test_dir/";

tiledb::sm::Azure azure_;
ThreadPool thread_pool_;

AzureFx();
AzureFx() = default;
~AzureFx();

void init_azure(Config&& config);

static std::string random_container_name(const std::string& prefix);
};

AzureFx::AzureFx() {
AzureFx::~AzureFx() {
// Empty container
bool is_empty;
REQUIRE(azure_.is_empty_container(AZURE_CONTAINER, &is_empty).ok());
if (!is_empty) {
REQUIRE(azure_.empty_container(AZURE_CONTAINER).ok());
REQUIRE(azure_.is_empty_container(AZURE_CONTAINER, &is_empty).ok());
REQUIRE(is_empty);
}

// Delete container
REQUIRE(azure_.remove_container(AZURE_CONTAINER).ok());
}

void AzureFx::init_azure(Config&& config) {
// Connect
Config config;
REQUIRE(
config.set("vfs.azure.storage_account_name", "devstoreaccount1").ok());
REQUIRE(config
Expand Down Expand Up @@ -92,20 +108,6 @@ AzureFx::AzureFx() {
REQUIRE(is_empty);
}

AzureFx::~AzureFx() {
// Empty container
bool is_empty;
REQUIRE(azure_.is_empty_container(AZURE_CONTAINER, &is_empty).ok());
if (!is_empty) {
REQUIRE(azure_.empty_container(AZURE_CONTAINER).ok());
REQUIRE(azure_.is_empty_container(AZURE_CONTAINER, &is_empty).ok());
REQUIRE(is_empty);
}

// Delete container
REQUIRE(azure_.remove_container(AZURE_CONTAINER).ok());
}

std::string AzureFx::random_container_name(const std::string& prefix) {
std::stringstream ss;
ss << prefix << "-" << std::this_thread::get_id() << "-"
Expand All @@ -114,6 +116,10 @@ std::string AzureFx::random_container_name(const std::string& prefix) {
}

TEST_CASE_METHOD(AzureFx, "Test Azure filesystem, file management", "[azure]") {
Config config;
config.set("vfs.azure.use_block_list_upload", "true");
init_azure(std::move(config));

/* Create the following file hierarchy:
*
* TEST_DIR/dir/subdir/file1
Expand Down Expand Up @@ -223,9 +229,22 @@ TEST_CASE_METHOD(AzureFx, "Test Azure filesystem, file management", "[azure]") {
REQUIRE(!is_blob);
}

TEST_CASE_METHOD(AzureFx, "Test Azure filesystem, file I/O", "[azure]") {
TEST_CASE_METHOD(
AzureFx, "Test Azure filesystem, file I/O", "[azure][multipart]") {
Config config;
const uint64_t max_parallel_ops = 2;
const uint64_t block_list_block_size = 4 * 1024 * 1024;
config.set("vfs.azure.use_block_list_upload", "true");
config.set("vfs.azure.max_parallel_ops", std::to_string(max_parallel_ops));
config.set(
"vfs.azure.block_list_block_size", std::to_string(block_list_block_size));
init_azure(std::move(config));

const uint64_t write_cache_max_size =
max_parallel_ops * block_list_block_size;

// Prepare buffers
uint64_t buffer_size = 5 * 1024 * 1024;
uint64_t buffer_size = write_cache_max_size * 5;
auto write_buffer = new char[buffer_size];
for (uint64_t i = 0; i < buffer_size; i++)
write_buffer[i] = (char)('a' + (i % 26));
Expand All @@ -246,7 +265,9 @@ TEST_CASE_METHOD(AzureFx, "Test Azure filesystem, file I/O", "[azure]") {
// Before flushing, the files do not exist
bool is_blob;
REQUIRE(azure_.is_blob(URI(largefile), &is_blob).ok());
REQUIRE(!is_blob);
// TODO: is_blob should be false, but returns true on Azurite. Azurite returns
// a 0-length object after writing the first chunk but Azure returns a 404.
REQUIRE(is_blob);
REQUIRE(azure_.is_blob(URI(smallfile), &is_blob).ok());
REQUIRE(!is_blob);

Expand Down Expand Up @@ -291,4 +312,112 @@ TEST_CASE_METHOD(AzureFx, "Test Azure filesystem, file I/O", "[azure]") {
REQUIRE(allok);
}

TEST_CASE_METHOD(
AzureFx,
"Test Azure filesystem, file I/O, no multipart",
"[azure][no_multipart]") {
Config config;
const uint64_t max_parallel_ops = 2;
const uint64_t block_list_block_size = 4 * 1024 * 1024;
config.set("vfs.azure.use_block_list_upload", "false");
config.set("vfs.azure.max_parallel_ops", std::to_string(max_parallel_ops));
config.set(
"vfs.azure.block_list_block_size", std::to_string(block_list_block_size));
init_azure(std::move(config));

const uint64_t write_cache_max_size =
max_parallel_ops * block_list_block_size;

// Prepare a large buffer that can fit in the write cache.
uint64_t large_buffer_size = write_cache_max_size;
auto large_write_buffer = new char[large_buffer_size];
for (uint64_t i = 0; i < large_buffer_size; i++)
large_write_buffer[i] = (char)('a' + (i % 26));

// Prepare a small buffer that can fit in the write cache.
uint64_t small_buffer_size = write_cache_max_size / 1024;
auto small_write_buffer = new char[small_buffer_size];
for (uint64_t i = 0; i < small_buffer_size; i++)
small_write_buffer[i] = (char)('a' + (i % 26));

// Prepare a buffer too large to fit in the write cache.
uint64_t oob_buffer_size = write_cache_max_size + 1;
auto oob_write_buffer = new char[oob_buffer_size];
for (uint64_t i = 0; i < oob_buffer_size; i++)
oob_write_buffer[i] = (char)('a' + (i % 26));

auto large_file = TEST_DIR + "largefile";
REQUIRE(azure_.write(URI(large_file), large_write_buffer, large_buffer_size)
.ok());

auto small_file_1 = TEST_DIR + "smallfile1";
REQUIRE(azure_.write(URI(small_file_1), small_write_buffer, small_buffer_size)
.ok());

auto small_file_2 = TEST_DIR + "smallfile2";
REQUIRE(azure_.write(URI(small_file_2), small_write_buffer, small_buffer_size)
.ok());
REQUIRE(azure_.write(URI(small_file_2), small_write_buffer, small_buffer_size)
.ok());

auto oob_file = TEST_DIR + "oobfile";
REQUIRE(!azure_.write(URI(oob_file), oob_write_buffer, oob_buffer_size).ok());

// Before flushing, the files do not exist
bool is_blob;
REQUIRE(azure_.is_blob(URI(large_file), &is_blob).ok());
REQUIRE(!is_blob);
REQUIRE(azure_.is_blob(URI(small_file_1), &is_blob).ok());
REQUIRE(!is_blob);
REQUIRE(azure_.is_blob(URI(small_file_2), &is_blob).ok());
REQUIRE(!is_blob);
REQUIRE(azure_.is_blob(URI(oob_file), &is_blob).ok());
REQUIRE(!is_blob);

// Flush the files
REQUIRE(azure_.flush_blob(URI(small_file_1)).ok());
REQUIRE(azure_.flush_blob(URI(small_file_2)).ok());
REQUIRE(azure_.flush_blob(URI(large_file)).ok());

// After flushing, the files exist
REQUIRE(azure_.is_blob(URI(large_file), &is_blob).ok());
REQUIRE(is_blob);
REQUIRE(azure_.is_blob(URI(small_file_1), &is_blob).ok());
REQUIRE(is_blob);
REQUIRE(azure_.is_blob(URI(small_file_2), &is_blob).ok());
REQUIRE(is_blob);

// Get file sizes
uint64_t nbytes = 0;
REQUIRE(azure_.blob_size(URI(large_file), &nbytes).ok());
CHECK(nbytes == large_buffer_size);
REQUIRE(azure_.blob_size(URI(small_file_1), &nbytes).ok());
CHECK(nbytes == small_buffer_size);
REQUIRE(azure_.blob_size(URI(small_file_2), &nbytes).ok());
CHECK(nbytes == (small_buffer_size + small_buffer_size));

// Read from the beginning
auto read_buffer = new char[26];
REQUIRE(azure_.read(URI(large_file), 0, read_buffer, 26).ok());
bool allok = true;
for (int i = 0; i < 26; i++) {
if (read_buffer[i] != static_cast<char>('a' + i)) {
allok = false;
break;
}
}
REQUIRE(allok);

// Read from a different offset
REQUIRE(azure_.read(URI(large_file), 11, read_buffer, 26).ok());
allok = true;
for (int i = 0; i < 26; i++) {
if (read_buffer[i] != static_cast<char>('a' + (i + 11) % 26)) {
allok = false;
break;
}
}
REQUIRE(allok);
}

#endif
21 changes: 14 additions & 7 deletions tiledb/sm/filesystem/azure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ Status Azure::flush_blob(const URI& uri) {
&block_list_upload_states_.at(uri.to_string());

if (!state->st().ok()) {
// Save the return status because 'state' will be freed before we return.
const Status st = state->st();

// Unlike S3 that can abort a chunked upload to immediately release
// uncommited chunks and leave the original object unmodified, the
// only way to do this on Azure is by some form of a write. We must
Expand All @@ -261,7 +264,7 @@ Status Azure::flush_blob(const URI& uri) {
// transactions.
finish_block_list_upload(uri);

return Status::Ok();
return st;
}

// Build the block list to commit.
Expand Down Expand Up @@ -333,11 +336,6 @@ Status Azure::flush_blob_direct(const URI& uri) {
// We do not store any custom metadata with the blob.
std::vector<std::pair<std::string, std::string>> empty_metadata;

// Protect 'write_cache_map_' from multiple writers.
std::unique_lock<std::mutex> cache_lock(write_cache_map_lock_);
write_cache_map_.erase(uri.to_string());
cache_lock.unlock();

// Unlike the 'upload_block_from_buffer' interface used in
// the block list upload path, there is not an interface to
// upload a single blob with a buffer. There is only
Expand All @@ -350,7 +348,11 @@ Status Azure::flush_blob_direct(const URI& uri) {

std::future<azure::storage_lite::storage_outcome<void>> result =
client_->upload_block_blob_from_stream(
container_name, blob_path, zc_istream, empty_metadata);
container_name,
blob_path,
zc_istream,
empty_metadata,
write_cache_buffer->size());
if (!result.valid()) {
return LOG_STATUS(Status::AzureError(
std::string("Flush blob failed on: " + uri.to_string())));
Expand All @@ -362,6 +364,11 @@ Status Azure::flush_blob_direct(const URI& uri) {
std::string("Flush blob failed on: " + uri.to_string())));
}

// Protect 'write_cache_map_' from multiple writers.
std::unique_lock<std::mutex> cache_lock(write_cache_map_lock_);
write_cache_map_.erase(uri.to_string());
cache_lock.unlock();

return wait_for_blob_to_propagate(container_name, blob_path);
}

Expand Down
13 changes: 11 additions & 2 deletions tiledb/sm/filesystem/azure.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,9 +343,18 @@ class Azure {
std::string next_block_id() {
const uint64_t block_id = next_block_id_++;
const std::string block_id_str = std::to_string(block_id);

// Pad the block id string with enough leading zeros to support
// the maximum number of blocks (50,000). All block ids must be
// of equal length among a single blob.
const int block_id_chars = 5;
const std::string padded_block_id_str =
std::string(block_id_chars - block_id_str.length(), '0') +
block_id_str;

const std::string b64_block_id_str = azure::storage_lite::to_base64(
reinterpret_cast<const unsigned char*>(block_id_str.c_str()),
block_id_str.size());
reinterpret_cast<const unsigned char*>(padded_block_id_str.c_str()),
padded_block_id_str.size());

block_ids_.emplace_back(b64_block_id_str);

Expand Down