Skip to content

Commit

Permalink
[#114] implemented multipart enhancement
Browse files Browse the repository at this point in the history
Design:

1. When a new part upload is initiated, determine if all lower
numbered parts have been started. If so, you can determine the part
offset via the previous part sizes.  In this case stream the part
directly to iRODS with a seek to the offset and writes.

2. To accomplish the above, when a new part upload is initiated, save
the part size in a map that translates the upload_id's to a list of
parts numbers and sizes. The following is an example of this map:

{
  "1234abcd-1234-1234-123456789abc":
    { 0: 4096000,
      1: 4096000,
      4: 4096000 },
  "01234abc-0123-0123-0123456789ab":
    { 0: 5192000,
      3: 5192000 }
}

3. If the part offset is not known, write the bytes to a local part
file. When CompleteMultipartUpload is encountered, read all of these
local part files and stream these to iRODS. If there is no local part
file, that means that that part was streamed directly to iRODS and does
not need to be rewritten.

4. The first open to iRODS will remain open until
CompleteMultipartUpload is finished. This is done to make sure that the
replica_token does not update in the middle of writing parts to iRODS.
See the keep_dstream_open_flag flag in putobject.cpp.
  • Loading branch information
JustinKyleJames authored and alanking committed Oct 3, 2024
1 parent f31fe44 commit 2e33534
Show file tree
Hide file tree
Showing 5 changed files with 531 additions and 145 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,8 @@ docker compose build
docker compose run client
```

*Note: If you get an error like `'name' does not match any of the regexes: '^x-'` then you will need to upgrade your version of docker compose.*

The test output will appear in the terminal. Once the tests complete run the following to cleanup:

```bash
Expand Down
207 changes: 130 additions & 77 deletions endpoints/s3/src/completemultipartupload.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@
#include <boost/property_tree/xml_parser.hpp>

#include <fmt/format.h>
#include <spdlog/spdlog.h>
#include <regex>
#include <cstdio>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <thread>
#include <chrono>
#include <string>
#include <sstream>

Expand All @@ -38,6 +36,22 @@ namespace beast = boost::beast;
namespace fs = irods::experimental::filesystem;
namespace logging = irods::http::logging;

namespace irods::s3::api::multipart_global_state
{
extern std::unordered_map<std::string, std::unordered_map<unsigned int, uint64_t>> part_size_map;
extern std::unordered_map<
std::string,
std::tuple<
irods::experimental::io::replica_token,
irods::experimental::io::replica_number,
std::shared_ptr<irods::experimental::client_connection>,
std::shared_ptr<irods::experimental::io::client::native_transport>,
std::shared_ptr<irods::experimental::io::odstream>>>
replica_token_number_and_odstream_map;

extern std::mutex multipart_global_state_mutex;
} // end namespace irods::s3::api::multipart_global_state

namespace
{

Expand All @@ -64,14 +78,16 @@ void irods::s3::actions::handle_completemultipartupload(
boost::beast::http::request_parser<boost::beast::http::empty_body>& empty_body_parser,
const boost::urls::url_view& url)
{
namespace part_shmem = irods::s3::api::multipart_global_state;

beast::http::response<beast::http::empty_body> response;

// Authenticate
auto irods_username = irods::s3::authentication::authenticates(empty_body_parser, url);
if (!irods_username) {
logging::error("{}: Failed to authenticate.", __FUNCTION__);
logging::error("{}: Failed to authenticate.", __func__);
response.result(beast::http::status::forbidden);
logging::debug("{}: returned {}", __FUNCTION__, response.reason());
logging::debug("{}: returned [{}]", __func__, response.reason());
session_ptr->send(std::move(response));
return;
}
Expand All @@ -92,44 +108,42 @@ void irods::s3::actions::handle_completemultipartupload(
}
}

logging::debug("{} s3_bucket={} s3_key={}", __FUNCTION__, s3_bucket.string(), s3_key.string());
logging::debug("{} s3_bucket={} s3_key={}", __func__, s3_bucket.string(), s3_key.string());

fs::path path;
if (auto bucket = irods::s3::resolve_bucket(url.segments()); bucket.has_value()) {
path = bucket.value();
path = irods::s3::finish_path(path, url.segments());
logging::debug("{}: CompleteMultipartUpload path={}", __FUNCTION__, path.string());
logging::debug("{}: CompleteMultipartUpload path={}", __func__, path.string());
}
else {
logging::error("{}: Failed to resolve bucket", __FUNCTION__);
logging::error("{}: Failed to resolve bucket", __func__);
response.result(beast::http::status::forbidden);
logging::debug("{}: returned {}", __FUNCTION__, response.reason());
logging::debug("{}: returned [{}]", __func__, response.reason());
session_ptr->send(std::move(response));
return;
}

fs::client::create_collections(conn, path.parent_path());

// get the uploadId from the param list
std::string upload_id;
if (const auto upload_id_param = url.params().find("uploadId"); upload_id_param != url.params().end()) {
upload_id = (*upload_id_param).value;
}

if (upload_id.empty()) {
logging::error("{}: Did not receive a an uploadId", __FUNCTION__);
logging::error("{}: Did not receive an uploadId", __func__);
response.result(beast::http::status::bad_request);
logging::debug("{}: returned {}", __FUNCTION__, response.reason());
logging::debug("{}: returned [{}]", __func__, response.reason());
session_ptr->send(std::move(response));
return;
}

// Do not allow an upload_id that is not in the format we have defined. People could do bad things
// if we didn't enforce this.
if (!std::regex_match(upload_id, upload_id_pattern)) {
logging::error("{}: Upload ID {} was not in expected format.", __FUNCTION__, upload_id);
logging::error("{}: Upload ID [{}] was not in expected format.", __func__, upload_id);
response.result(beast::http::status::bad_request);
logging::debug("{}: returned {}", __FUNCTION__, response.reason());
logging::debug("{}: returned [{}]", __func__, response.reason());
session_ptr->send(std::move(response));
return;
}
Expand All @@ -143,7 +157,7 @@ void irods::s3::actions::handle_completemultipartupload(
beast::http::read(session_ptr->stream().socket(), session_ptr->get_buffer(), parser);

std::string& request_body = parser.get().body();
logging::debug("{}: request_body\n{}", __FUNCTION__, request_body);
logging::debug("{}: request_body\n{}", __func__, request_body);

int max_part_number = -1;
int min_part_number = 1000;
Expand Down Expand Up @@ -174,16 +188,16 @@ void irods::s3::actions::handle_completemultipartupload(
}
}
catch (boost::property_tree::xml_parser_error& e) {
logging::debug("{}: Could not parse XML body.", __FUNCTION__);
logging::debug("{}: Could not parse XML body.", __func__);
response.result(boost::beast::http::status::bad_request);
logging::debug("{}: returned {}", __FUNCTION__, response.reason());
logging::debug("{}: returned [{}]", __func__, response.reason());
session_ptr->send(std::move(response));
return;
}
catch (...) {
logging::debug("{}: Unknown error parsing XML body.", __FUNCTION__);
logging::debug("{}: Unknown error parsing XML body.", __func__);
response.result(boost::beast::http::status::bad_request);
logging::debug("{}: returned {}", __FUNCTION__, response.reason());
logging::debug("{}: returned [{}]", __func__, response.reason());
session_ptr->send(std::move(response));
return;
}
Expand All @@ -192,21 +206,42 @@ void irods::s3::actions::handle_completemultipartupload(
// with 1 and the largest part number is the same as the count of
// part numbers. We could later check that each part number is included...
if (min_part_number != 1) {
logging::debug("{}: Part numbers did not start with 1.", __FUNCTION__);
logging::debug("{}: Part numbers did not start with 1.", __func__);
response.result(boost::beast::http::status::bad_request);
logging::debug("{}: returned {}", __FUNCTION__, response.reason());
logging::debug("{}: returned [{}]", __func__, response.reason());
session_ptr->send(std::move(response));
return;
}

if (max_part_number != part_number_count) {
logging::debug("{}: Missing at least one part number.", __FUNCTION__);
logging::debug("{}: Missing at least one part number.", __func__);
response.result(boost::beast::http::status::bad_request);
logging::debug("{}: returned {}", __FUNCTION__, response.reason());
logging::debug("{}: returned [{}]", __func__, response.reason());
session_ptr->send(std::move(response));
return;
}

// debug
if (spdlog::get_level() == spdlog::level::debug || spdlog::get_level() == spdlog::level::trace) {
std::lock_guard<std::mutex> guard(part_shmem::multipart_global_state_mutex);
logging::info("{}:{} {}: ******* THIS RAN ********", __FILE__, __LINE__, __func__);

if (part_shmem::part_size_map.find(upload_id) != part_shmem::part_size_map.end()) {
logging::debug("{}:{} {}: ------------------", __FILE__, __LINE__, __func__);
for (int i = 1; i <= part_number_count; ++i) {
bool found = part_shmem::part_size_map[upload_id].find(i) != part_shmem::part_size_map[upload_id].end();
if (found) {
logging::debug(
"{}:{} {}: {}: {}", __FILE__, __LINE__, __func__, i, part_shmem::part_size_map[upload_id][i]);
}
else {
logging::debug("{}:{} {}: {}: UNKNOWN", __FILE__, __LINE__, __func__, i);
}
}
logging::debug("{}:{} {}: ------------------", __FILE__, __LINE__, __func__);
}
}

// build up a vector filenames, offsets, and lengths for each part
std::vector<part_info> part_info_vector;
part_info_vector.reserve(max_part_number);
Expand All @@ -216,21 +251,37 @@ void irods::s3::actions::handle_completemultipartupload(
std::string part_file_location =
config.value(nlohmann::json::json_pointer{"/s3_server/multipart_upload_part_files_directory"}, ".");

uint64_t offset_counter = 0;
for (int current_part_number = 1; current_part_number <= max_part_number; ++current_part_number) {
std::string part_filename =
part_file_location + "/irods_s3_api_" + upload_id + "." + std::to_string(current_part_number);
try {
auto part_size = std::filesystem::file_size(part_filename);
part_info_vector.push_back({part_filename, offset_counter, part_size});
offset_counter += part_size;
}
catch (fs::filesystem_error& e) {
logging::error("{}: Failed locate part", __FUNCTION__);
response.result(beast::http::status::internal_server_error);
logging::debug("{}: returned {}", __FUNCTION__, response.reason());
session_ptr->send(std::move(response));
return;
{
std::lock_guard<std::mutex> guard(part_shmem::multipart_global_state_mutex);
uint64_t offset_counter = 0;
for (int current_part_number = 1; current_part_number <= max_part_number; ++current_part_number) {
std::string part_filename =
part_file_location + "/irods_s3_api_" + upload_id + "." + std::to_string(current_part_number);

if (part_shmem::part_size_map.find(upload_id) != part_shmem::part_size_map.end() &&
part_shmem::part_size_map[upload_id].find(current_part_number) !=
part_shmem::part_size_map[upload_id].end())
{
// get size from part_size_map in shmem
auto part_size = part_shmem::part_size_map[upload_id][current_part_number];
part_info_vector.push_back({part_filename, offset_counter, part_size});
offset_counter += part_size;
}
else {
// get size from part files
try {
auto part_size = std::filesystem::file_size(part_filename);
part_info_vector.push_back({part_filename, offset_counter, part_size});
offset_counter += part_size;
}
catch (fs::filesystem_error& e) {
logging::error("{}: Failed locate part", __func__);
response.result(beast::http::status::internal_server_error);
logging::debug("{}: returned [{}]", __func__, response.reason());
session_ptr->send(std::move(response));
return;
}
}
}
}

Expand All @@ -240,31 +291,15 @@ void irods::s3::actions::handle_completemultipartupload(
std::condition_variable cv;
std::mutex cv_mutex;

// This thread will create the file then wait until all threads are done or an error occurs.
irods::experimental::io::client::default_transport xtrans{conn};
irods::experimental::io::odstream d; // irods dstream for writing directly to irods
d.open(
xtrans,
path,
irods::experimental::io::root_resource_name{irods::s3::get_resource()},
std::ios::out | std::ios::trunc);

if (!d.is_open()) {
logging::error("{}: {} Failed open data stream to iRODS - path={}", __FUNCTION__, upload_id, path.string());
response.result(beast::http::status::internal_server_error);
logging::debug("{}: returned {}", __FUNCTION__, response.reason());
session_ptr->send(std::move(response));
return;
}

const auto& replica_token = d.replica_token();
const auto& replica_number = d.replica_number();
// get the replica_token and replica_number from replica_token_number_and_odstream_map
const auto& replica_token = std::get<0>(part_shmem::replica_token_number_and_odstream_map[upload_id]);
const auto& replica_number = std::get<1>(part_shmem::replica_token_number_and_odstream_map[upload_id]);

// start tasks on thread pool for part uploads
for (int current_part_number = 1; current_part_number <= max_part_number; ++current_part_number) {
logging::debug(
"{}: pushing upload work on thread pool {}-{} : [filename={}][offset={}][size={}]",
__FUNCTION__,
__func__,
upload_id,
current_part_number,
part_info_vector[current_part_number - 1].part_filename,
Expand All @@ -286,7 +321,7 @@ void irods::s3::actions::handle_completemultipartupload(
part_offset = part_info_vector[current_part_number - 1].part_offset,
part_size = part_info_vector[current_part_number - 1].part_size, // don't really need this
read_buffer_size,
func = __FUNCTION__]() mutable {
func = __func__]() mutable {
uint64_t read_write_byte_counter = 0;

// upon exit, increment the task_done_counter and notify the coordinating thread
Expand Down Expand Up @@ -322,15 +357,10 @@ void irods::s3::actions::handle_completemultipartupload(
ifs.open(part_filename, std::ifstream::in);

if (!ifs.is_open()) {
std::lock_guard<std::mutex> lk(cv_mutex);
upload_status_object.fail_flag = true;
std::stringstream ss;
ss << "Failed to part file for reading" << part_filename;
upload_status_object.error_string = ss.str();
logging::error(
"{}: {} upload_id={} part_number={}",
// if there is no part file then this part was uploaded directly to iRODS
logging::debug(
"{}: upload_id={} part_number={} Part was uploaded directly to iRODS. Skipping...",
func,
upload_status_object.error_string,
upload_id,
current_part_number);
return;
Expand Down Expand Up @@ -404,27 +434,50 @@ void irods::s3::actions::handle_completemultipartupload(

// wait until all threads are complete
std::unique_lock<std::mutex> lk(cv_mutex);
cv.wait(lk, [&upload_status_object, max_part_number, func = __FUNCTION__]() {
cv.wait(lk, [&upload_status_object, max_part_number, func = __func__]() {
logging::debug("{}: wait: task_done_counter is {}", func, upload_status_object.task_done_counter);
return upload_status_object.task_done_counter == max_part_number;
});

d.close();
// close the object and delete the entry in the replica_token_number_and_odstream_map
if (part_shmem::replica_token_number_and_odstream_map.find(upload_id) !=
part_shmem::replica_token_number_and_odstream_map.end())
{
// Read all of the shared pointers in the tuple to make sure they are destructed in
// the order we require. std::tuple does not guarantee order of destruction.
auto conn_ptr = std::get<2>(part_shmem::replica_token_number_and_odstream_map[upload_id]);
auto transport_ptr = std::get<3>(part_shmem::replica_token_number_and_odstream_map[upload_id]);
auto dstream_ptr = std::get<4>(part_shmem::replica_token_number_and_odstream_map[upload_id]);

logging::trace("{}:{} Closing iRODS data object.", __func__, __LINE__);
if (dstream_ptr) {
dstream_ptr->close();
}

// remove the temporary part files - should we do this on failure?
for (int i = 0; i < max_part_number; ++i) {
std::remove(part_info_vector[i].part_filename.c_str());
// delete the entry
part_shmem::replica_token_number_and_odstream_map.erase(upload_id);
}

// check to see if any threads failed
if (upload_status_object.fail_flag) {
logging::error("{}: {}", __FUNCTION__, upload_status_object.error_string);
logging::error("{}: {}", __func__, upload_status_object.error_string);
response.result(beast::http::status::internal_server_error);
logging::debug("{}: returned {}", __FUNCTION__, response.reason());
logging::debug("{}: returned [{}]", __func__, response.reason());
session_ptr->send(std::move(response));
return;
}

// remove the temporary part files - on failures we don't want to clean up as this could be resent
for (int i = 0; i < max_part_number; ++i) {
std::remove(part_info_vector[i].part_filename.c_str());
}

// clean up shmem - on failures we don't want to clean up as this could be resent
{
std::lock_guard<std::mutex> guard(part_shmem::multipart_global_state_mutex);
part_shmem::part_size_map.erase(upload_id);
}

// Now send the response
// Example response:
// <CompleteMultipartUploadResult>
Expand All @@ -449,9 +502,9 @@ void irods::s3::actions::handle_completemultipartupload(

boost::property_tree::write_xml(s, document, settings);
string_body_response.body() = s.str();
logging::debug("{}: response\n{}", __FUNCTION__, s.str());
logging::debug("{}: response\n{}", __func__, s.str());
string_body_response.result(boost::beast::http::status::ok);
logging::debug("{}: returned {}", __FUNCTION__, string_body_response.reason());
logging::debug("{}: returned [{}]", __func__, string_body_response.reason());
session_ptr->send(std::move(string_body_response));
return;
}
Loading

0 comments on commit 2e33534

Please sign in to comment.