From 9d6eb1509cdcf09e7bec8bc61988dd62f5682278 Mon Sep 17 00:00:00 2001 From: Chris Lalancette Date: Thu, 12 Aug 2021 00:27:58 +0000 Subject: [PATCH] Update the LoanManager to do internal locking. The original motivation to do this is to please the clang thread safety analysis, which can't quite figure out the locking regime with an external mutex. Instead, move major functionality into the LoanManager class, and then mark the item list as private. One thing to note is that the logic of __rmw_take_loaned_message_internal is slightly changed here. Before it would emplace an Item onto the LoanManager's list, then look through the data_reader to see if it could take an item. If it could, it would just fill that Item in and return. If it couldn't, it would pop that item out of the list. This commit changes it so that we instantiate a temporary Item. If we can take an item, we then push that Item into the LoanManager list. We never pop it. This should be equivalent (and slightly more performant), but is a change from what we had before. Signed-off-by: Chris Lalancette --- rmw_fastrtps_shared_cpp/src/rmw_take.cpp | 58 ++++++++++++++++-------- 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp index ac37033d3..44747129e 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include "rmw/allocators.h" #include "rmw/error_handling.h" @@ -401,8 +402,32 @@ struct LoanManager { } + void add_item(std::unique_ptr item) + { + std::lock_guard guard(mtx); + items.push_back(std::move(item)); + } + + std::unique_ptr erase_item(void * loaned_message) + { + std::unique_ptr ret{nullptr}; + + std::lock_guard guard(mtx); + for (auto it = items.begin(); it != items.end(); ++it) { + if (loaned_message == (*it)->data_seq.buffer()[0]) { + ret = std::move(*it); + items.erase(it); + break; + } + } + + return ret; + } + +private: std::mutex mtx; - eprosima::fastrtps::ResourceLimitedVector items RCPPUTILS_TSA_GUARDED_BY(mtx); + using ItemVector = eprosima::fastrtps::ResourceLimitedVector>; + ItemVector items RCPPUTILS_TSA_GUARDED_BY(mtx); }; void @@ -440,13 +465,8 @@ __rmw_take_loaned_message_internal( RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); auto info = static_cast(subscription->data); - auto loan_mgr = info->loan_manager_; - std::unique_lock guard(loan_mgr->mtx); - auto item = loan_mgr->items.emplace_back(); - if (nullptr == item) { - RMW_SET_ERROR_MSG("Out of resources for loaned message info"); - return RMW_RET_ERROR; - } + + auto item = std::make_unique(); while (ReturnCode_t::RETCODE_OK == info->data_reader_->take(item->data_seq, item->info_seq, 1)) { if (item->info_seq[0].valid_data) { @@ -456,6 +476,9 @@ __rmw_take_loaned_message_internal( *loaned_message = item->data_seq.buffer()[0]; *taken = true; info->listener_->update_has_data(info->data_reader_); + + info->loan_manager_->add_item(std::move(item)); + return RMW_RET_OK; } @@ -464,7 +487,6 @@ __rmw_take_loaned_message_internal( } // No data available, return loan information. - loan_mgr->items.pop_back(); *taken = false; info->listener_->update_has_data(info->data_reader_); return RMW_RET_OK; @@ -487,17 +509,15 @@ __rmw_return_loaned_message_from_subscription( RMW_CHECK_ARGUMENT_FOR_NULL(loaned_message, RMW_RET_INVALID_ARGUMENT); auto info = static_cast(subscription->data); - auto loan_mgr = info->loan_manager_; - std::lock_guard guard(loan_mgr->mtx); - for (auto it = loan_mgr->items.begin(); it != loan_mgr->items.end(); ++it) { - if (loaned_message == it->data_seq.buffer()[0]) { - if (!info->data_reader_->return_loan(it->data_seq, it->info_seq)) { - RMW_SET_ERROR_MSG("Error returning loan"); - return RMW_RET_ERROR; - } - loan_mgr->items.erase(it); - return RMW_RET_OK; + std::unique_ptr item; + item = info->loan_manager_->erase_item(loaned_message); + if (item != nullptr) { + if (!info->data_reader_->return_loan(item->data_seq, item->info_seq)) { + RMW_SET_ERROR_MSG("Error returning loan"); + return RMW_RET_ERROR; } + + return RMW_RET_OK; } RMW_SET_ERROR_MSG("Trying to return message not loaned by this subscription");