diff --git a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp index ac37033d3..44747129e 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_take.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_take.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include "rmw/allocators.h" #include "rmw/error_handling.h" @@ -401,8 +402,32 @@ struct LoanManager { } + void add_item(std::unique_ptr item) + { + std::lock_guard guard(mtx); + items.push_back(std::move(item)); + } + + std::unique_ptr erase_item(void * loaned_message) + { + std::unique_ptr ret{nullptr}; + + std::lock_guard guard(mtx); + for (auto it = items.begin(); it != items.end(); ++it) { + if (loaned_message == (*it)->data_seq.buffer()[0]) { + ret = std::move(*it); + items.erase(it); + break; + } + } + + return ret; + } + +private: std::mutex mtx; - eprosima::fastrtps::ResourceLimitedVector items RCPPUTILS_TSA_GUARDED_BY(mtx); + using ItemVector = eprosima::fastrtps::ResourceLimitedVector>; + ItemVector items RCPPUTILS_TSA_GUARDED_BY(mtx); }; void @@ -440,13 +465,8 @@ __rmw_take_loaned_message_internal( RMW_CHECK_ARGUMENT_FOR_NULL(taken, RMW_RET_INVALID_ARGUMENT); auto info = static_cast(subscription->data); - auto loan_mgr = info->loan_manager_; - std::unique_lock guard(loan_mgr->mtx); - auto item = loan_mgr->items.emplace_back(); - if (nullptr == item) { - RMW_SET_ERROR_MSG("Out of resources for loaned message info"); - return RMW_RET_ERROR; - } + + auto item = std::make_unique(); while (ReturnCode_t::RETCODE_OK == info->data_reader_->take(item->data_seq, item->info_seq, 1)) { if (item->info_seq[0].valid_data) { @@ -456,6 +476,9 @@ __rmw_take_loaned_message_internal( *loaned_message = item->data_seq.buffer()[0]; *taken = true; info->listener_->update_has_data(info->data_reader_); + + info->loan_manager_->add_item(std::move(item)); + return RMW_RET_OK; } @@ -464,7 +487,6 @@ __rmw_take_loaned_message_internal( } // No data available, return loan information. - loan_mgr->items.pop_back(); *taken = false; info->listener_->update_has_data(info->data_reader_); return RMW_RET_OK; @@ -487,17 +509,15 @@ __rmw_return_loaned_message_from_subscription( RMW_CHECK_ARGUMENT_FOR_NULL(loaned_message, RMW_RET_INVALID_ARGUMENT); auto info = static_cast(subscription->data); - auto loan_mgr = info->loan_manager_; - std::lock_guard guard(loan_mgr->mtx); - for (auto it = loan_mgr->items.begin(); it != loan_mgr->items.end(); ++it) { - if (loaned_message == it->data_seq.buffer()[0]) { - if (!info->data_reader_->return_loan(it->data_seq, it->info_seq)) { - RMW_SET_ERROR_MSG("Error returning loan"); - return RMW_RET_ERROR; - } - loan_mgr->items.erase(it); - return RMW_RET_OK; + std::unique_ptr item; + item = info->loan_manager_->erase_item(loaned_message); + if (item != nullptr) { + if (!info->data_reader_->return_loan(item->data_seq, item->info_seq)) { + RMW_SET_ERROR_MSG("Error returning loan"); + return RMW_RET_ERROR; } + + return RMW_RET_OK; } RMW_SET_ERROR_MSG("Trying to return message not loaned by this subscription");