diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_client_info.hpp b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_client_info.hpp index dbb4f3d20..b085a80db 100644 --- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_client_info.hpp +++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_client_info.hpp @@ -16,11 +16,6 @@ #define RMW_FASTRTPS_CPP__CUSTOM_CLIENT_INFO_HPP_ #include -#include -#include -#include - -#include "fastcdr/FastBuffer.h" #include "fastrtps/subscriber/SampleInfo.h" #include "fastrtps/subscriber/Subscriber.h" @@ -28,6 +23,8 @@ #include "fastrtps/participant/Participant.h" #include "fastrtps/publisher/Publisher.h" +using SampleIdentity = eprosima::fastrtps::rtps::SampleIdentity; + class ClientListener; typedef struct CustomClientInfo @@ -42,12 +39,6 @@ typedef struct CustomClientInfo const char * typesupport_identifier_; } CustomClientInfo; -typedef struct CustomClientResponse -{ - eprosima::fastrtps::rtps::SampleIdentity sample_identity_; - std::unique_ptr buffer_; -} CustomClientResponse; - class ClientListener : public eprosima::fastrtps::SubscriberListener { public: @@ -59,59 +50,47 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener void onNewDataMessage(eprosima::fastrtps::Subscriber * sub) { - assert(sub); - - CustomClientResponse response; - // Todo(sloretz) eliminate heap allocation pending eprosima/Fast-CDR#19 - response.buffer_.reset(new eprosima::fastcdr::FastBuffer()); - eprosima::fastrtps::SampleInfo_t sinfo; - - if (sub->takeNextData(response.buffer_.get(), &sinfo)) { - if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) { - response.sample_identity_ = sinfo.related_sample_identity; - - if (response.sample_identity_.writer_guid() == info_->writer_guid_) { - std::lock_guard lock(internalMutex_); - - if (conditionMutex_ != nullptr) { - std::unique_lock clock(*conditionMutex_); - list.emplace_back(std::move(response)); - // the change to list_has_data_ needs to be mutually exclusive with - // rmw_wait() which checks hasData() and decides if wait() needs to - // be called - list_has_data_.store(true); - clock.unlock(); - conditionVariable_->notify_one(); - } else { - list.emplace_back(std::move(response)); - list_has_data_.store(true); - } - } - } + (void) sub; + std::lock_guard lock(internalMutex_); + + if (conditionMutex_ != nullptr) { + std::unique_lock clock(*conditionMutex_); + // the change to list_has_data_ needs to be mutually exclusive with + // rmw_wait() which checks hasData() and decides if wait() needs to + // be called + list_has_data_.store(true); + clock.unlock(); + conditionVariable_->notify_one(); + } else { + list_has_data_.store(true); } } bool - getResponse(CustomClientResponse & response) + getResponse(void * data, SampleIdentity & identity) { std::lock_guard lock(internalMutex_); - auto pop_response = [this](CustomClientResponse & response) -> bool + auto pop_response = [this](void * data, SampleIdentity & identity) -> bool { - if (!list.empty()) { - response = std::move(list.front()); - list.pop_front(); - list_has_data_.store(!list.empty()); - return true; + eprosima::fastrtps::SampleInfo_t sinfo; + bool ret_val = false; + auto subs = info_->response_subscriber_; + while (!ret_val && (subs->getUnreadCount() > 0) && (subs->takeNextData(data, &sinfo))) { + if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) { + identity = sinfo.related_sample_identity; + ret_val = (identity.writer_guid() == info_->writer_guid_); + } } - return false; + list_has_data_.store(subs->getUnreadCount() > 0); + return ret_val; }; if (conditionMutex_ != nullptr) { std::unique_lock clock(*conditionMutex_); - return pop_response(response); + return pop_response(data, identity); } - return pop_response(response); + return pop_response(data, identity); } void @@ -139,7 +118,6 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener private: CustomClientInfo * info_; std::mutex internalMutex_; - std::list list; std::atomic_bool list_has_data_; std::mutex * conditionMutex_; std::condition_variable * conditionVariable_; diff --git a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_service_info.hpp b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_service_info.hpp index 0ca083c46..6fdb4bdd7 100644 --- a/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_service_info.hpp @@ -16,9 +16,6 @@ #define RMW_FASTRTPS_CPP__CUSTOM_SERVICE_INFO_HPP_ #include -#include - -#include "fastcdr/FastBuffer.h" #include "fastrtps/participant/Participant.h" #include "fastrtps/publisher/Publisher.h" @@ -27,6 +24,8 @@ #include "fastrtps/subscriber/SubscriberListener.h" #include "fastrtps/subscriber/SampleInfo.h" +using SampleIdentity = eprosima::fastrtps::rtps::SampleIdentity; + class ServiceListener; typedef struct CustomServiceInfo @@ -40,15 +39,6 @@ typedef struct CustomServiceInfo const char * typesupport_identifier_; } CustomServiceInfo; -typedef struct CustomServiceRequest -{ - eprosima::fastrtps::rtps::SampleIdentity sample_identity_; - eprosima::fastcdr::FastBuffer * buffer_; - - CustomServiceRequest() - : buffer_(nullptr) {} -} CustomServiceRequest; - class ServiceListener : public eprosima::fastrtps::SubscriberListener { public: @@ -63,57 +53,45 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener void onNewDataMessage(eprosima::fastrtps::Subscriber * sub) { - assert(sub); - - CustomServiceRequest request; - request.buffer_ = new eprosima::fastcdr::FastBuffer(); - eprosima::fastrtps::SampleInfo_t sinfo; - - if (sub->takeNextData(request.buffer_, &sinfo)) { - if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) { - request.sample_identity_ = sinfo.sample_identity; - - std::lock_guard lock(internalMutex_); - - if (conditionMutex_ != nullptr) { - std::unique_lock clock(*conditionMutex_); - list.push_back(request); - // the change to list_has_data_ needs to be mutually exclusive with - // rmw_wait() which checks hasData() and decides if wait() needs to - // be called - list_has_data_.store(true); - clock.unlock(); - conditionVariable_->notify_one(); - } else { - list.push_back(request); - list_has_data_.store(true); - } - } + (void) sub; + std::lock_guard lock(internalMutex_); + + if (conditionMutex_ != nullptr) { + std::unique_lock clock(*conditionMutex_); + // the change to list_has_data_ needs to be mutually exclusive with + // rmw_wait() which checks hasData() and decides if wait() needs to + // be called + list_has_data_.store(true); + clock.unlock(); + conditionVariable_->notify_one(); + } else { + list_has_data_.store(true); } } - CustomServiceRequest - getRequest() + bool + getRequest(void * data, SampleIdentity & identity) { std::lock_guard lock(internalMutex_); - CustomServiceRequest request; + + auto pop_request = [this](void * data, SampleIdentity & identity) -> bool + { + eprosima::fastrtps::SampleInfo_t sinfo; + bool ret_val = false; + auto subs = info_->request_subscriber_; + while (!ret_val && (subs->getUnreadCount() > 0) && (subs->takeNextData(data, &sinfo))) { + identity = sinfo.sample_identity; + ret_val = (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind); + } + list_has_data_.store(subs->getUnreadCount() > 0); + return ret_val; + }; if (conditionMutex_ != nullptr) { std::unique_lock clock(*conditionMutex_); - if (!list.empty()) { - request = list.front(); - list.pop_front(); - list_has_data_.store(!list.empty()); - } - } else { - if (!list.empty()) { - request = list.front(); - list.pop_front(); - list_has_data_.store(!list.empty()); - } + return pop_request(data, identity); } - - return request; + return pop_request(data, identity); } void @@ -141,7 +119,6 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener private: CustomServiceInfo * info_; std::mutex internalMutex_; - std::list list; std::atomic_bool list_has_data_; std::mutex * conditionMutex_; std::condition_variable * conditionVariable_; diff --git a/rmw_fastrtps_cpp/src/rmw_request.cpp b/rmw_fastrtps_cpp/src/rmw_request.cpp index c0a0c1282..263d47e39 100644 --- a/rmw_fastrtps_cpp/src/rmw_request.cpp +++ b/rmw_fastrtps_cpp/src/rmw_request.cpp @@ -95,21 +95,19 @@ rmw_take_request( auto info = static_cast(service->data); assert(info); - CustomServiceRequest request = info->listener_->getRequest(); - eprosima::fastcdr::Cdr deser(*request.buffer_, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, - eprosima::fastcdr::Cdr::DDS_CDR); - - if (request.buffer_ != nullptr) { + eprosima::fastcdr::FastBuffer buffer; + eprosima::fastrtps::rtps::SampleIdentity sample_identity; + if (info->listener_->getRequest(&buffer, sample_identity)) { + eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, + eprosima::fastcdr::Cdr::DDS_CDR); _deserialize_ros_message(deser, ros_request, info->request_type_support_, info->typesupport_identifier_); // Get header - memcpy(request_header->writer_guid, &request.sample_identity_.writer_guid(), + memcpy(request_header->writer_guid, &sample_identity.writer_guid(), sizeof(eprosima::fastrtps::rtps::GUID_t)); - request_header->sequence_number = ((int64_t)request.sample_identity_.sequence_number().high) << - 32 | request.sample_identity_.sequence_number().low; - - delete request.buffer_; + request_header->sequence_number = ((int64_t)sample_identity.sequence_number().high) << + 32 | sample_identity.sequence_number().low; *taken = true; } diff --git a/rmw_fastrtps_cpp/src/rmw_response.cpp b/rmw_fastrtps_cpp/src/rmw_response.cpp index d8142f818..7c8e7f4ce 100644 --- a/rmw_fastrtps_cpp/src/rmw_response.cpp +++ b/rmw_fastrtps_cpp/src/rmw_response.cpp @@ -15,6 +15,7 @@ #include #include "fastcdr/Cdr.h" +#include "fastcdr/FastBuffer.h" #include "fastrtps/subscriber/Subscriber.h" @@ -50,18 +51,19 @@ rmw_take_response( auto info = static_cast(client->data); assert(info); - CustomClientResponse response; + eprosima::fastcdr::FastBuffer buffer; + eprosima::fastrtps::rtps::SampleIdentity sample_identity; - if (info->listener_->getResponse(response)) { + if (info->listener_->getResponse(&buffer, sample_identity)) { eprosima::fastcdr::Cdr deser( - *response.buffer_, + buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR); _deserialize_ros_message( deser, ros_response, info->response_type_support_, info->typesupport_identifier_); - request_header->sequence_number = ((int64_t)response.sample_identity_.sequence_number().high) << - 32 | response.sample_identity_.sequence_number().low; + request_header->sequence_number = ((int64_t)sample_identity.sequence_number().high) << + 32 | sample_identity.sequence_number().low; *taken = true; }