From 2e335342047cf9cc490db2271cfb2e275dfb5f08 Mon Sep 17 00:00:00 2001 From: Justin James Date: Wed, 24 Jul 2024 20:50:17 +0000 Subject: [PATCH] [#114] implemented multipart enhancement Design: 1. When a new part upload is initiated, determine if all lower numbered parts have been started. If so, you can determine the part offset via the previous part sizes. In this case stream the part directly to iRODS with a seek to the offset and writes. 2. To accomplish the above, when a new part upload is initiated, save the part size in a map that translates the upload_id's to a list of parts numbers and sizes. The following is an example of this map: { "1234abcd-1234-1234-123456789abc": { 0: 4096000, 1: 4096000, 4: 4096000 }, "01234abc-0123-0123-0123456789ab": { 0: 5192000, 3: 5192000 } } 3. If the part offset is not known, write the bytes to a local part file. When CompleteMultipartUpload is encountered, read all of these local part files and stream these to iRODS. If there is no local part file, that means that that part was streamed directly to iRODS and does not need to be rewritten. 4. The first open to iRODS will remain open until CompleteMultipartUpload is finished. This is done to make sure that the replica_token does not update in the middle of writing parts to iRODS. See the keep_dstream_open_flag flag in putobject.cpp. --- README.md | 2 + endpoints/s3/src/completemultipartupload.cpp | 207 +++++---- endpoints/s3/src/putobject.cpp | 436 ++++++++++++++++--- tests/docker/docker-compose.yml | 11 +- tests/putobject_test.py | 20 + 5 files changed, 531 insertions(+), 145 deletions(-) diff --git a/README.md b/README.md index 481aee7..3066266 100644 --- a/README.md +++ b/README.md @@ -496,6 +496,8 @@ docker compose build docker compose run client ``` +*Note: If you get an error like `'name' does not match any of the regexes: '^x-'` then you will need to upgrade your version of docker compose.* + The test output will appear in the terminal. Once the tests complete run the following to cleanup: ```bash diff --git a/endpoints/s3/src/completemultipartupload.cpp b/endpoints/s3/src/completemultipartupload.cpp index 43d7795..011a1f8 100644 --- a/endpoints/s3/src/completemultipartupload.cpp +++ b/endpoints/s3/src/completemultipartupload.cpp @@ -22,14 +22,12 @@ #include #include +#include #include #include #include #include -#include #include -#include -#include #include #include @@ -38,6 +36,22 @@ namespace beast = boost::beast; namespace fs = irods::experimental::filesystem; namespace logging = irods::http::logging; +namespace irods::s3::api::multipart_global_state +{ + extern std::unordered_map> part_size_map; + extern std::unordered_map< + std::string, + std::tuple< + irods::experimental::io::replica_token, + irods::experimental::io::replica_number, + std::shared_ptr, + std::shared_ptr, + std::shared_ptr>> + replica_token_number_and_odstream_map; + + extern std::mutex multipart_global_state_mutex; +} // end namespace irods::s3::api::multipart_global_state + namespace { @@ -64,14 +78,16 @@ void irods::s3::actions::handle_completemultipartupload( boost::beast::http::request_parser& empty_body_parser, const boost::urls::url_view& url) { + namespace part_shmem = irods::s3::api::multipart_global_state; + beast::http::response response; // Authenticate auto irods_username = irods::s3::authentication::authenticates(empty_body_parser, url); if (!irods_username) { - logging::error("{}: Failed to authenticate.", __FUNCTION__); + logging::error("{}: Failed to authenticate.", __func__); response.result(beast::http::status::forbidden); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } @@ -92,24 +108,22 @@ void irods::s3::actions::handle_completemultipartupload( } } - logging::debug("{} s3_bucket={} s3_key={}", __FUNCTION__, s3_bucket.string(), s3_key.string()); + logging::debug("{} s3_bucket={} s3_key={}", __func__, s3_bucket.string(), s3_key.string()); fs::path path; if (auto bucket = irods::s3::resolve_bucket(url.segments()); bucket.has_value()) { path = bucket.value(); path = irods::s3::finish_path(path, url.segments()); - logging::debug("{}: CompleteMultipartUpload path={}", __FUNCTION__, path.string()); + logging::debug("{}: CompleteMultipartUpload path={}", __func__, path.string()); } else { - logging::error("{}: Failed to resolve bucket", __FUNCTION__); + logging::error("{}: Failed to resolve bucket", __func__); response.result(beast::http::status::forbidden); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } - fs::client::create_collections(conn, path.parent_path()); - // get the uploadId from the param list std::string upload_id; if (const auto upload_id_param = url.params().find("uploadId"); upload_id_param != url.params().end()) { @@ -117,9 +131,9 @@ void irods::s3::actions::handle_completemultipartupload( } if (upload_id.empty()) { - logging::error("{}: Did not receive a an uploadId", __FUNCTION__); + logging::error("{}: Did not receive an uploadId", __func__); response.result(beast::http::status::bad_request); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } @@ -127,9 +141,9 @@ void irods::s3::actions::handle_completemultipartupload( // Do not allow an upload_id that is not in the format we have defined. People could do bad things // if we didn't enforce this. if (!std::regex_match(upload_id, upload_id_pattern)) { - logging::error("{}: Upload ID {} was not in expected format.", __FUNCTION__, upload_id); + logging::error("{}: Upload ID [{}] was not in expected format.", __func__, upload_id); response.result(beast::http::status::bad_request); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } @@ -143,7 +157,7 @@ void irods::s3::actions::handle_completemultipartupload( beast::http::read(session_ptr->stream().socket(), session_ptr->get_buffer(), parser); std::string& request_body = parser.get().body(); - logging::debug("{}: request_body\n{}", __FUNCTION__, request_body); + logging::debug("{}: request_body\n{}", __func__, request_body); int max_part_number = -1; int min_part_number = 1000; @@ -174,16 +188,16 @@ void irods::s3::actions::handle_completemultipartupload( } } catch (boost::property_tree::xml_parser_error& e) { - logging::debug("{}: Could not parse XML body.", __FUNCTION__); + logging::debug("{}: Could not parse XML body.", __func__); response.result(boost::beast::http::status::bad_request); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } catch (...) { - logging::debug("{}: Unknown error parsing XML body.", __FUNCTION__); + logging::debug("{}: Unknown error parsing XML body.", __func__); response.result(boost::beast::http::status::bad_request); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } @@ -192,21 +206,42 @@ void irods::s3::actions::handle_completemultipartupload( // with 1 and the largest part number is the same as the count of // part numbers. We could later check that each part number is included... if (min_part_number != 1) { - logging::debug("{}: Part numbers did not start with 1.", __FUNCTION__); + logging::debug("{}: Part numbers did not start with 1.", __func__); response.result(boost::beast::http::status::bad_request); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } if (max_part_number != part_number_count) { - logging::debug("{}: Missing at least one part number.", __FUNCTION__); + logging::debug("{}: Missing at least one part number.", __func__); response.result(boost::beast::http::status::bad_request); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } + // debug + if (spdlog::get_level() == spdlog::level::debug || spdlog::get_level() == spdlog::level::trace) { + std::lock_guard guard(part_shmem::multipart_global_state_mutex); + logging::info("{}:{} {}: ******* THIS RAN ********", __FILE__, __LINE__, __func__); + + if (part_shmem::part_size_map.find(upload_id) != part_shmem::part_size_map.end()) { + logging::debug("{}:{} {}: ------------------", __FILE__, __LINE__, __func__); + for (int i = 1; i <= part_number_count; ++i) { + bool found = part_shmem::part_size_map[upload_id].find(i) != part_shmem::part_size_map[upload_id].end(); + if (found) { + logging::debug( + "{}:{} {}: {}: {}", __FILE__, __LINE__, __func__, i, part_shmem::part_size_map[upload_id][i]); + } + else { + logging::debug("{}:{} {}: {}: UNKNOWN", __FILE__, __LINE__, __func__, i); + } + } + logging::debug("{}:{} {}: ------------------", __FILE__, __LINE__, __func__); + } + } + // build up a vector filenames, offsets, and lengths for each part std::vector part_info_vector; part_info_vector.reserve(max_part_number); @@ -216,21 +251,37 @@ void irods::s3::actions::handle_completemultipartupload( std::string part_file_location = config.value(nlohmann::json::json_pointer{"/s3_server/multipart_upload_part_files_directory"}, "."); - uint64_t offset_counter = 0; - for (int current_part_number = 1; current_part_number <= max_part_number; ++current_part_number) { - std::string part_filename = - part_file_location + "/irods_s3_api_" + upload_id + "." + std::to_string(current_part_number); - try { - auto part_size = std::filesystem::file_size(part_filename); - part_info_vector.push_back({part_filename, offset_counter, part_size}); - offset_counter += part_size; - } - catch (fs::filesystem_error& e) { - logging::error("{}: Failed locate part", __FUNCTION__); - response.result(beast::http::status::internal_server_error); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); - session_ptr->send(std::move(response)); - return; + { + std::lock_guard guard(part_shmem::multipart_global_state_mutex); + uint64_t offset_counter = 0; + for (int current_part_number = 1; current_part_number <= max_part_number; ++current_part_number) { + std::string part_filename = + part_file_location + "/irods_s3_api_" + upload_id + "." + std::to_string(current_part_number); + + if (part_shmem::part_size_map.find(upload_id) != part_shmem::part_size_map.end() && + part_shmem::part_size_map[upload_id].find(current_part_number) != + part_shmem::part_size_map[upload_id].end()) + { + // get size from part_size_map in shmem + auto part_size = part_shmem::part_size_map[upload_id][current_part_number]; + part_info_vector.push_back({part_filename, offset_counter, part_size}); + offset_counter += part_size; + } + else { + // get size from part files + try { + auto part_size = std::filesystem::file_size(part_filename); + part_info_vector.push_back({part_filename, offset_counter, part_size}); + offset_counter += part_size; + } + catch (fs::filesystem_error& e) { + logging::error("{}: Failed locate part", __func__); + response.result(beast::http::status::internal_server_error); + logging::debug("{}: returned [{}]", __func__, response.reason()); + session_ptr->send(std::move(response)); + return; + } + } } } @@ -240,31 +291,15 @@ void irods::s3::actions::handle_completemultipartupload( std::condition_variable cv; std::mutex cv_mutex; - // This thread will create the file then wait until all threads are done or an error occurs. - irods::experimental::io::client::default_transport xtrans{conn}; - irods::experimental::io::odstream d; // irods dstream for writing directly to irods - d.open( - xtrans, - path, - irods::experimental::io::root_resource_name{irods::s3::get_resource()}, - std::ios::out | std::ios::trunc); - - if (!d.is_open()) { - logging::error("{}: {} Failed open data stream to iRODS - path={}", __FUNCTION__, upload_id, path.string()); - response.result(beast::http::status::internal_server_error); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); - session_ptr->send(std::move(response)); - return; - } - - const auto& replica_token = d.replica_token(); - const auto& replica_number = d.replica_number(); + // get the replica_token and replica_number from replica_token_number_and_odstream_map + const auto& replica_token = std::get<0>(part_shmem::replica_token_number_and_odstream_map[upload_id]); + const auto& replica_number = std::get<1>(part_shmem::replica_token_number_and_odstream_map[upload_id]); // start tasks on thread pool for part uploads for (int current_part_number = 1; current_part_number <= max_part_number; ++current_part_number) { logging::debug( "{}: pushing upload work on thread pool {}-{} : [filename={}][offset={}][size={}]", - __FUNCTION__, + __func__, upload_id, current_part_number, part_info_vector[current_part_number - 1].part_filename, @@ -286,7 +321,7 @@ void irods::s3::actions::handle_completemultipartupload( part_offset = part_info_vector[current_part_number - 1].part_offset, part_size = part_info_vector[current_part_number - 1].part_size, // don't really need this read_buffer_size, - func = __FUNCTION__]() mutable { + func = __func__]() mutable { uint64_t read_write_byte_counter = 0; // upon exit, increment the task_done_counter and notify the coordinating thread @@ -322,15 +357,10 @@ void irods::s3::actions::handle_completemultipartupload( ifs.open(part_filename, std::ifstream::in); if (!ifs.is_open()) { - std::lock_guard lk(cv_mutex); - upload_status_object.fail_flag = true; - std::stringstream ss; - ss << "Failed to part file for reading" << part_filename; - upload_status_object.error_string = ss.str(); - logging::error( - "{}: {} upload_id={} part_number={}", + // if there is no part file then this part was uploaded directly to iRODS + logging::debug( + "{}: upload_id={} part_number={} Part was uploaded directly to iRODS. Skipping...", func, - upload_status_object.error_string, upload_id, current_part_number); return; @@ -404,27 +434,50 @@ void irods::s3::actions::handle_completemultipartupload( // wait until all threads are complete std::unique_lock lk(cv_mutex); - cv.wait(lk, [&upload_status_object, max_part_number, func = __FUNCTION__]() { + cv.wait(lk, [&upload_status_object, max_part_number, func = __func__]() { logging::debug("{}: wait: task_done_counter is {}", func, upload_status_object.task_done_counter); return upload_status_object.task_done_counter == max_part_number; }); - d.close(); + // close the object and delete the entry in the replica_token_number_and_odstream_map + if (part_shmem::replica_token_number_and_odstream_map.find(upload_id) != + part_shmem::replica_token_number_and_odstream_map.end()) + { + // Read all of the shared pointers in the tuple to make sure they are destructed in + // the order we require. std::tuple does not guarantee order of destruction. + auto conn_ptr = std::get<2>(part_shmem::replica_token_number_and_odstream_map[upload_id]); + auto transport_ptr = std::get<3>(part_shmem::replica_token_number_and_odstream_map[upload_id]); + auto dstream_ptr = std::get<4>(part_shmem::replica_token_number_and_odstream_map[upload_id]); + + logging::trace("{}:{} Closing iRODS data object.", __func__, __LINE__); + if (dstream_ptr) { + dstream_ptr->close(); + } - // remove the temporary part files - should we do this on failure? - for (int i = 0; i < max_part_number; ++i) { - std::remove(part_info_vector[i].part_filename.c_str()); + // delete the entry + part_shmem::replica_token_number_and_odstream_map.erase(upload_id); } // check to see if any threads failed if (upload_status_object.fail_flag) { - logging::error("{}: {}", __FUNCTION__, upload_status_object.error_string); + logging::error("{}: {}", __func__, upload_status_object.error_string); response.result(beast::http::status::internal_server_error); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } + // remove the temporary part files - on failures we don't want to clean up as this could be resent + for (int i = 0; i < max_part_number; ++i) { + std::remove(part_info_vector[i].part_filename.c_str()); + } + + // clean up shmem - on failures we don't want to clean up as this could be resent + { + std::lock_guard guard(part_shmem::multipart_global_state_mutex); + part_shmem::part_size_map.erase(upload_id); + } + // Now send the response // Example response: // @@ -449,9 +502,9 @@ void irods::s3::actions::handle_completemultipartupload( boost::property_tree::write_xml(s, document, settings); string_body_response.body() = s.str(); - logging::debug("{}: response\n{}", __FUNCTION__, s.str()); + logging::debug("{}: response\n{}", __func__, s.str()); string_body_response.result(boost::beast::http::status::ok); - logging::debug("{}: returned {}", __FUNCTION__, string_body_response.reason()); + logging::debug("{}: returned [{}]", __func__, string_body_response.reason()); session_ptr->send(std::move(string_body_response)); return; } diff --git a/endpoints/s3/src/putobject.cpp b/endpoints/s3/src/putobject.cpp index 6c94579..97619c4 100644 --- a/endpoints/s3/src/putobject.cpp +++ b/endpoints/s3/src/putobject.cpp @@ -18,18 +18,57 @@ #include #include +#include #include #include #include #include #include +#include namespace asio = boost::asio; namespace beast = boost::beast; namespace fs = irods::experimental::filesystem; namespace logging = irods::http::logging; +namespace irods::s3::api::multipart_global_state +{ + // This is a part size map which is shared between threads. Whenever part sizes are known + // they are added to this map. When complete or cancel multipart is executed, the part sizes + // for that upload_id are removed. + // The map looks like the following with the key of the first map being the upload_id and the + // key of the second map being the part number: + // { + // "1234abcd-1234-1234-123456789abc": + // { 0: 4096000, + // 1: 4096000, + // 4: 4096000 }, + // "01234abc-0123-0123-0123456789ab": + // { 0: 5192000, + // 3: 5192000 } + // } + std::unordered_map> part_size_map; + + // This map holds persistent data needed for each upload_id. This includes the replica_token and + // replica_number. In addition it holds shared pointers for the connection, transport, and odstream + // for the first stream that is opened. These shared pointers are saved to this map so that the first + // open() to an iRODS data object can remain open throughout the lifecycle of the request. This + // stream will be closed in either CompleteMultipartUpload or AbortMultipartUpload. + std::unordered_map< + std::string, + std::tuple< + irods::experimental::io::replica_token, + irods::experimental::io::replica_number, + std::shared_ptr, + std::shared_ptr, + std::shared_ptr>> + replica_token_number_and_odstream_map; + + // mutex to protect part_size_map + std::mutex multipart_global_state_mutex; +} // end namespace irods::s3::api::multipart_global_state + namespace { const std::regex upload_id_pattern("[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"); @@ -43,6 +82,7 @@ namespace parsing_done, parsing_error }; + } //namespace void manually_parse_chunked_body_write_to_irods_in_background( @@ -58,11 +98,10 @@ void manually_parse_chunked_body_write_to_irods_in_background( parsing_state current_state, size_t chunk_size, const std::string& parsing_buffer_string, + bool know_part_offset, + bool keep_dstream_open_flag, const std::string func); -/*std::make_shared( - parser, session_ptr, response, path, upload_part, upload_part_filename) - ->start();*/ class incremental_async_read : public std::enable_shared_from_this { irods::http::session_pointer_type session_ptr_; @@ -70,13 +109,17 @@ class incremental_async_read : public std::enable_shared_from_this> parser_; std::string irods_path_; bool upload_part_flag_; + bool part_offset_is_known_; + std::size_t part_offset_; + std::string upload_id_; std::string part_filename_; std::ofstream part_file_; - irods::experimental::io::odstream odstream_; std::vector buffer_; std::size_t total_bytes_read_{}; + bool keep_dstream_open_flag; std::shared_ptr conn_; - irods::experimental::io::client::default_transport tp_; + std::shared_ptr tp_; + std::shared_ptr odstream_; public: incremental_async_read( @@ -85,6 +128,9 @@ class incremental_async_read : public std::enable_shared_from_this& _response, std::string _irods_path, bool _upload_part_flag, + bool _know_part_offset_flag, + size_t _part_offset, + std::string _upload_id, std::string _part_filename, std::shared_ptr _conn) : session_ptr_{_session_ptr->shared_from_this()} @@ -92,17 +138,78 @@ class incremental_async_read : public std::enable_shared_from_thisget().version()); resp_.set("Etag", _irods_path); resp_.keep_alive(parser_->get().keep_alive()); - if (upload_part_flag_) { - logging::trace("{}: Opening part file [{}] for writing.", __func__, part_filename_); + tp_ = std::make_shared(*conn_); + odstream_ = std::make_shared(); + + if (upload_part_flag_ && part_offset_is_known_) { + // we know the offset so seek and stream directly to iRODS + std::lock_guard guard(part_shmem::multipart_global_state_mutex); + + // if there is no replica token then just open the object without replica token and save the token + if (part_shmem::replica_token_number_and_odstream_map.find(upload_id_) == + part_shmem::replica_token_number_and_odstream_map.end()) + { + logging::trace( + "{}: Open new iRODS data object [{}] for writing and seeking to {}.", + __func__, + irods_path_, + part_offset_); + odstream_->open( + *tp_, + irods_path_, + irods::experimental::io::root_resource_name{irods::s3::get_resource()}, + std::ios::out | std::ios::trunc); + if (odstream_->is_open()) { + // the first stream that opens will stay open until CompleteMultipartTransfer is called + keep_dstream_open_flag = true; + part_shmem::replica_token_number_and_odstream_map[upload_id_] = { + odstream_->replica_token(), odstream_->replica_number(), conn_, tp_, odstream_}; + } + } + else { + // get the replica token and pass it to open + logging::trace( + "{}: Open iRODS data object[{}] for writing and seeking to {}. Replica token={}", + __func__, + irods_path_, + part_offset_, + std::get<0>(part_shmem::replica_token_number_and_odstream_map[upload_id_]).value); + odstream_->open( + *tp_, + std::get<0>(part_shmem::replica_token_number_and_odstream_map[upload_id_]), // replica token + irods_path_, + std::get<1>(part_shmem::replica_token_number_and_odstream_map[upload_id_]), // replica number + std::ios::out | std::ios::in); + } + + if (!odstream_->is_open()) { + auto msg = fmt::format( + "{}:{} Failed to open iRODS object [{}]. Offset={}", + __func__, + __LINE__, + _irods_path, + part_offset_); + logging::error(msg); + THROW(SYS_INTERNAL_ERR, std::move(msg)); + } + odstream_->seekp(part_offset_); + } + else if (upload_part_flag_) { + logging::trace("{}: Open part file [{}] for writing.", __func__, part_filename_); part_file_.open(part_filename_); if (!part_file_) { auto msg = fmt::format("{}: Failed to open part file for writing [{}].", __func__, _part_filename); @@ -110,12 +217,12 @@ class incremental_async_read : public std::enable_shared_from_thisopen(*tp_, irods_path_, irods::experimental::io::root_resource_name{irods::s3::get_resource()}); + + if (!odstream_->is_open()) { + auto msg = fmt::format("{}:{} Failed to open iRODS object [{}].", __func__, __LINE__, _irods_path); logging::error(msg); THROW(SYS_INTERNAL_ERR, std::move(msg)); } @@ -142,7 +249,7 @@ class incremental_async_read : public std::enable_shared_from_this void { - logging::debug("{}: multipart upload: Number of bytes read from socket = [{}]", __func__, _bytes_transferred); + logging::trace("{}: multipart upload: Number of bytes read from socket = [{}]", __func__, _bytes_transferred); if (_ec && _ec != beast::http::error::need_buffer) { logging::error("{}: multipart upload: Error reading from socket: {}", __func__, _ec.message()); @@ -157,20 +264,20 @@ class incremental_async_read : public std::enable_shared_from_thisbuffer_.size() - self->parser_->get().body().size; - logging::debug( + logging::trace( "{}: multipart upload: [{}] bytes in buffer_body for part file [{}].", __func__, byte_count, self->part_filename_); self->total_bytes_read_ += byte_count; - logging::debug( + logging::trace( "{}: multipart upload: Total bytes [{}] read for part file [{}].", __func__, self->total_bytes_read_, self->part_filename_); - if (self->upload_part_flag_) { + if (self->upload_part_flag_ && !self->part_offset_is_known_) { if (!self->part_file_.write(self->buffer_.data(), byte_count)) { logging::error( "{}: multipart upload: Error writing [{}] bytes to part file [{}].", @@ -181,14 +288,14 @@ class incremental_async_read : public std::enable_shared_from_thissession_ptr_->send(std::move(self->resp_)); // Schedules an async write op. return; } - logging::debug( + logging::trace( "{}: multipart upload: Wrote [{}] bytes to part file [{}].", __func__, byte_count, self->part_filename_); } else { - if (!self->odstream_.write(self->buffer_.data(), byte_count)) { + if (!self->odstream_->write(self->buffer_.data(), byte_count)) { logging::error( "{}: multipart upload: Error writing [{}] bytes to iRODS data object [{}].", __func__, @@ -198,19 +305,20 @@ class incremental_async_read : public std::enable_shared_from_thissession_ptr_->send(std::move(self->resp_)); // Schedules an async write op. return; } - logging::debug( - "{}: multipart upload: Wrote [{}] bytes to iRODS file [{}].", + logging::trace( + "{}: multipart upload: Wrote [{}] bytes to iRODS data object [{}].", __func__, byte_count, self->irods_path_); } if (self->parser_->is_done()) { - if (self->upload_part_flag_) { + if (self->part_file_.is_open()) { self->part_file_.close(); } - else { - self->odstream_.close(); + if (self->odstream_->is_open() && !self->keep_dstream_open_flag) { + logging::trace("{}:{} Closing iRODS data object [{}].", __func__, __LINE__, self->irods_path_); + self->odstream_->close(); } logging::trace("{}: Request message has been processed [parser is done]", __func__); @@ -225,7 +333,7 @@ class incremental_async_read : public std::enable_shared_from_thisread_from_socket(); }); } // on_incremental_async_read -}; // struct incremental_async_read +}; // class incremental_async_read void irods::s3::actions::handle_putobject( irods::http::session_pointer_type session_ptr, @@ -233,6 +341,7 @@ void irods::s3::actions::handle_putobject( const boost::urls::url_view& url) { using json_pointer = nlohmann::json::json_pointer; + namespace part_shmem = irods::s3::api::multipart_global_state; beast::http::response response; @@ -240,9 +349,9 @@ void irods::s3::actions::handle_putobject( auto irods_username = irods::s3::authentication::authenticates(empty_body_parser, url); if (!irods_username) { - logging::error("{}: Failed to authenticate.", __FUNCTION__); + logging::error("{}: Failed to authenticate.", __func__); response.result(beast::http::status::forbidden); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } @@ -259,7 +368,7 @@ void irods::s3::actions::handle_putobject( if (header != parser_message.end() && header->value() == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD") { special_chunked_header = true; } - logging::debug("{} special_chunk_header: {}", __FUNCTION__, special_chunked_header); + logging::debug("{} special_chunk_header: {}", __func__, special_chunked_header); // See if we have chunked set. If so turn off special chunked header flag as the // parser will handle it. @@ -276,9 +385,9 @@ void irods::s3::actions::handle_putobject( if (!chunked_flag) { header = parser_message.find("Content-Length"); if (header == parser_message.end()) { - logging::error("{} Neither Content-Length nor chunked mode were set.", __FUNCTION__); + logging::error("{} Neither Content-Length nor chunked mode were set.", __func__); response.result(boost::beast::http::status::bad_request); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } @@ -298,7 +407,7 @@ void irods::s3::actions::handle_putobject( return; } - logging::debug("{}: Sent 100-continue", __FUNCTION__); + logging::debug("{}: Sent 100-continue", __func__); } fs::path path; @@ -307,16 +416,18 @@ void irods::s3::actions::handle_putobject( path = irods::s3::finish_path(path, url.segments()); } else { - logging::error("{}: Failed to resolve bucket", __FUNCTION__); + logging::error("{}: Failed to resolve bucket", __func__); response.result(beast::http::status::not_found); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } - logging::debug("{}: Path [{}]", __FUNCTION__, path.string()); + logging::debug("{}: Path [{}]", __func__, path.string()); // check to see if this is a part upload bool upload_part = false; + bool know_part_offset = false; + uint64_t part_offset = 0; std::string part_number; std::string upload_id; std::string upload_part_filename; @@ -331,27 +442,154 @@ void irods::s3::actions::handle_putobject( // either partNumber or uploadId provided upload_part = true; if (part_number.empty()) { - logging::error("{}: UploadPart detected but partNumber was not provided.", __FUNCTION__); + logging::error("{}: UploadPart detected but partNumber was not provided.", __func__); response.result(beast::http::status::bad_request); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } else if (upload_id.empty()) { - logging::error("{}: UploadPart detected but upload_id was not provided.", __FUNCTION__); + logging::error("{}: UploadPart detected but upload_id was not provided.", __func__); response.result(beast::http::status::bad_request); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } else if (!std::regex_match(upload_id, upload_id_pattern)) { - logging::error("{}: Upload ID {} was not in expected format.", __FUNCTION__, upload_id); + logging::error("{}: Upload ID [{}] was not in expected format.", __func__, upload_id); response.result(beast::http::status::bad_request); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } + // parse the part_number + unsigned int part_number_int = 0; + try { + part_number_int = boost::lexical_cast(part_number.c_str()); + } + catch (const boost::bad_lexical_cast&) { + logging::error("{}: Upload ID [{}] Could not parse part_number [{}]", __func__, upload_id, part_number); + response.result(beast::http::status::bad_request); + logging::debug("{}: returned [{}]", __func__, response.reason()); + session_ptr->send(std::move(response)); + return; + } + + // see if we have enough information to stream this part directly to iRODS + uint64_t part_size = 0; + { + std::lock_guard guard(part_shmem::multipart_global_state_mutex); + + // if an entry for upload id does not exist go ahead and create it + if (part_shmem::part_size_map.find(upload_id) == part_shmem::part_size_map.end()) { + part_shmem::part_size_map[upload_id] = std::unordered_map(); + } + try { + // add this part size to part_size_map + if (chunked_flag || special_chunked_header) { + // chunked - get part size from x-amz-decoded-content-length + // if this isn't provided then just continue without storing the part size + auto header = parser_message.find("x-amz-decoded-content-length"); + if (header != parser_message.end()) { + part_size = boost::lexical_cast(parser_message["x-amz-decoded-content-length"]); + + // Make sure someone hasn't previously uploaded this part with a different part size. + // If so we can't handle that and have to reject the call. + if (part_shmem::part_size_map[upload_id].find(part_number_int) != + part_shmem::part_size_map[upload_id].end()) { + auto old_part_size = part_shmem::part_size_map[upload_id][part_number_int]; + if (old_part_size != part_size) { + // reject this + logging::error( + "{}: Upload ID [{}] - part_number [{}] was uploaded a second time with a different " + "part size. Old part size = [{}]. New part size = [{}]. " + "Rejecting this request.", + __func__, + upload_id, + part_number_int, + old_part_size, + part_size); + response.result(beast::http::status::bad_request); + logging::debug("{}: returned [{}]", __func__, response.reason()); + session_ptr->send(std::move(response)); + return; + } + } + + part_shmem::part_size_map[upload_id][part_number_int] = part_size; + } + } + else { + // not chunked, get part size from content_length + part_size = boost::lexical_cast(parser_message[beast::http::field::content_length]); + + // Make sure someone hasn't previously uploaded this part with a different part size. + // If so we can't handle that and have to reject the call. + if (part_shmem::part_size_map[upload_id].find(part_number_int) != + part_shmem::part_size_map[upload_id].end()) { + auto old_part_size = part_shmem::part_size_map[upload_id][part_number_int]; + if (old_part_size != part_size) { + // reject this + logging::error( + "{}: Upload ID [{}] - part_number [{}] was uploaded a second time with a different " + "part " + "size. Old part size = [{}]. New part size = [{}]. " + "Rejecting this request", + __func__, + upload_id, + part_number_int, + old_part_size, + part_size); + response.result(beast::http::status::bad_request); + logging::debug("{}: returned [{}]", __func__, response.reason()); + session_ptr->send(std::move(response)); + return; + } + } + + part_shmem::part_size_map[upload_id][part_number_int] = part_size; + } + + // see if we know all of the previous part_sizes + know_part_offset = true; + for (unsigned int i = 1; i <= part_number_int - 1; ++i) { + if (part_shmem::part_size_map[upload_id].find(i) != part_shmem::part_size_map[upload_id].end()) { + part_offset += part_shmem::part_size_map[upload_id][i]; + } + else { + know_part_offset = false; + break; + } + } + } + catch (const boost::bad_lexical_cast&) { + // The part size provided was not correct. We will attempt to continue without storing the part + // size in global memory. This will mean all subsequent parts will have to be written to a local + // cache file and flushed on CompleteMultipartUpload which will impact performance. + // + // Also note that the boost::beast::http parser will likely just reject this request so we may never get + // to this point. + logging::warn( + "{}:{} {}: part_number = [{}] could not parse part size as unint64_t, " + "continuing without storing part size in memory", + __FILE__, + __LINE__, + __func__, + part_number_int); + } + } + + logging::debug( + "{}:{} {}: part_number = {}, part_size = {}, know_part_offset = {}, part_offset = {}", + __FILE__, + __LINE__, + __func__, + part_number_int, + part_size, + know_part_offset, + part_offset); + // get the base location for the part files const nlohmann::json& config = irods::http::globals::configuration(); std::string part_file_location = @@ -359,7 +597,7 @@ void irods::s3::actions::handle_putobject( // the current part file full path upload_part_filename = part_file_location + "/irods_s3_api_" + upload_id + "." + part_number; - logging::debug("{}: UploadPart detected. partNumber={} uploadId={}", __FUNCTION__, part_number, upload_id); + logging::debug("{}: UploadPart detected. partNumber={} uploadId={}", __func__, part_number, upload_id); } // Make sure the parent collection exists. @@ -369,7 +607,7 @@ void irods::s3::actions::handle_putobject( } uint64_t read_buffer_size = irods::s3::get_put_object_buffer_size_in_bytes(); - logging::debug("{}: read_buffer_size={}", __FUNCTION__, read_buffer_size); + logging::debug("{}: read_buffer_size={}", __func__, read_buffer_size); response.set("Etag", path.c_str()); response.set("Connection", "close"); @@ -402,10 +640,11 @@ void irods::s3::actions::handle_putobject( } if (special_chunked_header) { + bool keep_dstream_open_flag = false; using irods_default_transport = irods::experimental::io::client::default_transport; // create an output file stream to iRODS - wrap all structs in shared pointers - // since these objects will persist than the current routine + // since these objects will persist longer than the current routine auto tp = std::make_shared(*conn); auto d = std::make_shared(); @@ -413,22 +652,71 @@ void irods::s3::actions::handle_putobject( // since this will persist longer than the current routine std::shared_ptr ofs = std::make_shared(); - if (upload_part) { + if (upload_part && know_part_offset) { + { + std::lock_guard guard(part_shmem::multipart_global_state_mutex); + + // if there is no replica token then just open the object without replica token and save the token + if (part_shmem::replica_token_number_and_odstream_map.find(upload_id) == + part_shmem::replica_token_number_and_odstream_map.end()) + { + logging::trace( + "{}: Open new iRODS data object [{}] for writing and seeking to {}.", + __func__, + path.string(), + part_offset); + d->open( + *tp, + path, + irods::experimental::io::root_resource_name{irods::s3::get_resource()}, + std::ios::out | std::ios::trunc); + if (d->is_open()) { + keep_dstream_open_flag = true; + part_shmem::replica_token_number_and_odstream_map[upload_id] = { + d->replica_token(), d->replica_number(), conn, tp, d}; + } + } + else { + // get the replica token and pass it to open + logging::trace( + "{}: Open iRODS data object [{}] for writing and seeking to {}.", + __func__, + path.string(), + part_offset); + d->open( + *tp, + std::get<0>(part_shmem::replica_token_number_and_odstream_map[upload_id]), // replica token + path, + std::get<1>(part_shmem::replica_token_number_and_odstream_map[upload_id]), // replica number + std::ios::out | std::ios::in); + } + } + if (!d->is_open()) { + logging::error("{}: Failed to open dstream to iRODS", __func__); + response.result(beast::http::status::internal_server_error); + logging::debug("{}: returned [{}]", __func__, response.reason()); + session_ptr->send(std::move(response)); + return; + } + d->seekp(part_offset); + } + else if (upload_part) { + logging::debug("{}: Open part file [{}] for writing.", __func__, upload_part_filename); ofs->open(upload_part_filename, std::ofstream::out); if (!ofs->is_open()) { - logging::error("{}: Failed to open stream for writing part", __FUNCTION__); + logging::error("{}: Failed to open stream for writing part", __func__); response.result(beast::http::status::internal_server_error); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } } else { - d->open(*tp, std::move(path), irods::experimental::io::root_resource_name{irods::s3::get_resource()}); + d->open(*tp, path, irods::experimental::io::root_resource_name{irods::s3::get_resource()}); if (!d->is_open()) { - logging::error("{}: Failed to open dstream to iRODS", __FUNCTION__); + logging::error("{}: Failed to open dstream to iRODS", __func__); response.result(beast::http::status::internal_server_error); - logging::debug("{}: returned {}", __FUNCTION__, response.reason()); + logging::debug("{}: returned [{}]", __func__, response.reason()); session_ptr->send(std::move(response)); return; } @@ -461,11 +749,23 @@ void irods::s3::actions::handle_putobject( current_state, chunk_size, parsing_buffer_string, - __FUNCTION__); + know_part_offset, + keep_dstream_open_flag, + __func__); } else { + logging::debug("{}: upload_part={}", __func__, upload_part); std::make_shared( - parser, session_ptr, response, path, upload_part, upload_part_filename, conn) + parser, + session_ptr, + response, + path, + upload_part, + know_part_offset, + part_offset, + upload_id, + upload_part_filename, + conn) ->start(); } } // handle_putobject @@ -483,6 +783,8 @@ void manually_parse_chunked_body_write_to_irods_in_background( parsing_state current_state, size_t chunk_size, const std::string& parsing_buffer_string_, + bool know_part_offset, + bool keep_dstream_open_flag, const std::string func) { irods::http::globals::background_task([session_ptr, @@ -497,6 +799,8 @@ void manually_parse_chunked_body_write_to_irods_in_background( current_state, chunk_size, parsing_buffer_string = std::move(parsing_buffer_string_), + know_part_offset, + keep_dstream_open_flag, func]() mutable { boost::beast::error_code ec; auto& parser_message = parser->get(); @@ -520,7 +824,7 @@ void manually_parse_chunked_body_write_to_irods_in_background( if (ec) { logging::error("{}: Error when parsing file - {}", func, ec.what()); response.result(beast::http::status::internal_server_error); - logging::debug("{}: returned {}", func, response.reason()); + logging::debug("{}: returned [{}]", func, response.reason()); session_ptr->send(std::move(response)); return; } @@ -650,7 +954,7 @@ void manually_parse_chunked_body_write_to_irods_in_background( std::string body = parsing_buffer_string.substr(0, chunk_size); try { - if (upload_part) { + if (upload_part && !know_part_offset) { ofs->write(body.c_str(), body.length()); } else { @@ -660,7 +964,7 @@ void manually_parse_chunked_body_write_to_irods_in_background( catch (std::exception& e) { logging::error("{}: Exception when writing to file - {}", func, e.what()); response.result(beast::http::status::internal_server_error); - logging::debug("{}: returned {}", func, response.reason()); + logging::debug("{}: returned [{}]", func, response.reason()); session_ptr->send(std::move(response)); return; } @@ -717,27 +1021,33 @@ void manually_parse_chunked_body_write_to_irods_in_background( // if we are done return ok if (current_state == parsing_state::parsing_done) { - if (upload_part) { + // this is to force that these are destructed in the correct order + auto conn_ptr = conn; + auto transport_ptr = tp; + auto dstream_ptr = d; + + if (ofs->is_open()) { ofs->close(); } - else { + if (!keep_dstream_open_flag && d->is_open()) { + logging::trace("{}:{} Closing iRODS data object.", __func__, __LINE__); d->close(); } response.result(beast::http::status::ok); - logging::debug("{}: returned {}:{}", func, response.reason(), __LINE__); + logging::debug("{}: returned [{}]:{}", func, response.reason(), __LINE__); session_ptr->send(std::move(response)); return; } else if (current_state == parsing_state::parsing_error) { - if (upload_part) { + if (ofs->is_open()) { ofs->close(); } - else { + if (d->is_open()) { d->close(); } logging::error("{}: Error parsing chunked body", func); response.result(boost::beast::http::status::bad_request); - logging::debug("{}: returned {}", func, response.reason()); + logging::debug("{}: returned [{}]", func, response.reason()); session_ptr->send(std::move(response)); return; } @@ -757,6 +1067,8 @@ void manually_parse_chunked_body_write_to_irods_in_background( current_state, chunk_size, parsing_buffer_string, + know_part_offset, + keep_dstream_open_flag, func); }); } // manually_parse_chunked_body_write_to_irods_in_background diff --git a/tests/docker/docker-compose.yml b/tests/docker/docker-compose.yml index aad114b..eb959f9 100644 --- a/tests/docker/docker-compose.yml +++ b/tests/docker/docker-compose.yml @@ -1,9 +1,9 @@ -version: '3' +name: irods-s3-api services: irods: build: - context: . + context: . dockerfile: irods.ubuntu20.Dockerfile ports: - "1247" @@ -12,12 +12,12 @@ services: start_period: 20s interval: 10s timeout: 10s - retries: 20 + retries: 20 irods-s3-api: build: context: ../.. - dockerfile: irods_runner.Dockerfile + dockerfile: irods_runner.Dockerfile volumes: - ./config.json:/config.json:ro ports: @@ -34,5 +34,4 @@ services: - irods - irods-s3-api volumes: - - ../../:/irods_client_s3_cpp - + - ../../:/irods_client_s3_cpp diff --git a/tests/putobject_test.py b/tests/putobject_test.py index 42290e0..61f41f1 100644 --- a/tests/putobject_test.py +++ b/tests/putobject_test.py @@ -64,6 +64,11 @@ def test_botocore_put_in_bucket_root_large_file(self): assert_command(f'iget {self.bucket_irods_path}/{put_filename} {get_filename}') assert_command(f'diff -q {put_filename} {get_filename}') + # perform upload a second time to make sure original file was closed properly + self.boto3_client.upload_file(put_filename, self.bucket_name, put_filename) + assert_command(f'iget -f {self.bucket_irods_path}/{put_filename} {get_filename}') + assert_command(f'diff -q {put_filename} {get_filename}') + finally: os.remove(put_filename) os.remove(get_filename) @@ -122,6 +127,14 @@ def test_aws_put_in_bucket_root_large_file(self): assert_command(f'iget {self.bucket_irods_path}/{put_filename} {get_filename}') assert_command(f'diff -q {put_filename} {get_filename}') + # perform upload a second time to make sure original file was closed properly + assert_command(f'aws --profile s3_api_alice --endpoint-url {self.s3_api_url} ' + f's3 cp {put_filename} s3://{self.bucket_name}/{put_filename}', + 'STDOUT_SINGLELINE', + f'upload: ./{put_filename} to s3://{self.bucket_name}/{put_filename}') + assert_command(f'iget -f {self.bucket_irods_path}/{put_filename} {get_filename}') + assert_command(f'diff -q {put_filename} {get_filename}') + finally: os.remove(put_filename) os.remove(get_filename) @@ -181,6 +194,13 @@ def test_mc_put_large_file_in_bucket_root(self): assert_command(f'iget {self.bucket_irods_path}/{put_filename} {get_filename}') assert_command(f'diff -q {put_filename} {get_filename}') + # perform upload a second time to make sure original file was closed properly + assert_command(f'mc cp {put_filename} s3-api-alice/{self.bucket_name}/{put_filename}', + 'STDOUT_SINGLELINE', + 'large_file') + assert_command(f'iget -f {self.bucket_irods_path}/{put_filename} {get_filename}') + assert_command(f'diff -q {put_filename} {get_filename}') + finally: os.remove(put_filename) os.remove(get_filename)