Skip to content

Commit

Permalink
Refs #3043. Avoid dynamic creation of FastBuffer objects.
Browse files Browse the repository at this point in the history
  • Loading branch information
MiguelCompany committed Jun 18, 2018
1 parent 7875cce commit 0f46c54
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 121 deletions.
80 changes: 29 additions & 51 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_client_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,15 @@
#define RMW_FASTRTPS_CPP__CUSTOM_CLIENT_INFO_HPP_

#include <atomic>
#include <list>
#include <memory>
#include <utility>

#include "fastcdr/FastBuffer.h"

#include "fastrtps/subscriber/SampleInfo.h"
#include "fastrtps/subscriber/Subscriber.h"
#include "fastrtps/subscriber/SubscriberListener.h"
#include "fastrtps/participant/Participant.h"
#include "fastrtps/publisher/Publisher.h"

using SampleIdentity = eprosima::fastrtps::rtps::SampleIdentity;

class ClientListener;

typedef struct CustomClientInfo
Expand All @@ -42,12 +39,6 @@ typedef struct CustomClientInfo
const char * typesupport_identifier_;
} CustomClientInfo;

typedef struct CustomClientResponse
{
eprosima::fastrtps::rtps::SampleIdentity sample_identity_;
std::unique_ptr<eprosima::fastcdr::FastBuffer> buffer_;
} CustomClientResponse;

class ClientListener : public eprosima::fastrtps::SubscriberListener
{
public:
Expand All @@ -59,59 +50,47 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
void
onNewDataMessage(eprosima::fastrtps::Subscriber * sub)
{
assert(sub);

CustomClientResponse response;
// Todo(sloretz) eliminate heap allocation pending eprosima/Fast-CDR#19
response.buffer_.reset(new eprosima::fastcdr::FastBuffer());
eprosima::fastrtps::SampleInfo_t sinfo;

if (sub->takeNextData(response.buffer_.get(), &sinfo)) {
if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
response.sample_identity_ = sinfo.related_sample_identity;

if (response.sample_identity_.writer_guid() == info_->writer_guid_) {
std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
list.emplace_back(std::move(response));
// the change to list_has_data_ needs to be mutually exclusive with
// rmw_wait() which checks hasData() and decides if wait() needs to
// be called
list_has_data_.store(true);
clock.unlock();
conditionVariable_->notify_one();
} else {
list.emplace_back(std::move(response));
list_has_data_.store(true);
}
}
}
(void) sub;
std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
// the change to list_has_data_ needs to be mutually exclusive with
// rmw_wait() which checks hasData() and decides if wait() needs to
// be called
list_has_data_.store(true);
clock.unlock();
conditionVariable_->notify_one();
} else {
list_has_data_.store(true);
}
}

bool
getResponse(CustomClientResponse & response)
getResponse(void * data, SampleIdentity & identity)
{
std::lock_guard<std::mutex> lock(internalMutex_);

auto pop_response = [this](CustomClientResponse & response) -> bool
auto pop_response = [this](void * data, SampleIdentity & identity) -> bool
{
if (!list.empty()) {
response = std::move(list.front());
list.pop_front();
list_has_data_.store(!list.empty());
return true;
eprosima::fastrtps::SampleInfo_t sinfo;
bool ret_val = false;
auto subs = info_->response_subscriber_;
while (!ret_val && (subs->getUnreadCount() > 0) && (subs->takeNextData(data, &sinfo))) {
if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
identity = sinfo.related_sample_identity;
ret_val = (identity.writer_guid() == info_->writer_guid_);
}
}
return false;
list_has_data_.store(subs->getUnreadCount() > 0);
return ret_val;
};

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
return pop_response(response);
return pop_response(data, identity);
}
return pop_response(response);
return pop_response(data, identity);
}

void
Expand Down Expand Up @@ -139,7 +118,6 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
private:
CustomClientInfo * info_;
std::mutex internalMutex_;
std::list<CustomClientResponse> list;
std::atomic_bool list_has_data_;
std::mutex * conditionMutex_;
std::condition_variable * conditionVariable_;
Expand Down
87 changes: 32 additions & 55 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_service_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
#define RMW_FASTRTPS_CPP__CUSTOM_SERVICE_INFO_HPP_

#include <atomic>
#include <list>

#include "fastcdr/FastBuffer.h"

#include "fastrtps/participant/Participant.h"
#include "fastrtps/publisher/Publisher.h"
Expand All @@ -27,6 +24,8 @@
#include "fastrtps/subscriber/SubscriberListener.h"
#include "fastrtps/subscriber/SampleInfo.h"

using SampleIdentity = eprosima::fastrtps::rtps::SampleIdentity;

class ServiceListener;

typedef struct CustomServiceInfo
Expand All @@ -40,15 +39,6 @@ typedef struct CustomServiceInfo
const char * typesupport_identifier_;
} CustomServiceInfo;

typedef struct CustomServiceRequest
{
eprosima::fastrtps::rtps::SampleIdentity sample_identity_;
eprosima::fastcdr::FastBuffer * buffer_;

CustomServiceRequest()
: buffer_(nullptr) {}
} CustomServiceRequest;

class ServiceListener : public eprosima::fastrtps::SubscriberListener
{
public:
Expand All @@ -63,57 +53,45 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
void
onNewDataMessage(eprosima::fastrtps::Subscriber * sub)
{
assert(sub);

CustomServiceRequest request;
request.buffer_ = new eprosima::fastcdr::FastBuffer();
eprosima::fastrtps::SampleInfo_t sinfo;

if (sub->takeNextData(request.buffer_, &sinfo)) {
if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
request.sample_identity_ = sinfo.sample_identity;

std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
list.push_back(request);
// the change to list_has_data_ needs to be mutually exclusive with
// rmw_wait() which checks hasData() and decides if wait() needs to
// be called
list_has_data_.store(true);
clock.unlock();
conditionVariable_->notify_one();
} else {
list.push_back(request);
list_has_data_.store(true);
}
}
(void) sub;
std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
// the change to list_has_data_ needs to be mutually exclusive with
// rmw_wait() which checks hasData() and decides if wait() needs to
// be called
list_has_data_.store(true);
clock.unlock();
conditionVariable_->notify_one();
} else {
list_has_data_.store(true);
}
}

CustomServiceRequest
getRequest()
bool
getRequest(void * data, SampleIdentity & identity)
{
std::lock_guard<std::mutex> lock(internalMutex_);
CustomServiceRequest request;

auto pop_request = [this](void * data, SampleIdentity & identity) -> bool
{
eprosima::fastrtps::SampleInfo_t sinfo;
bool ret_val = false;
auto subs = info_->request_subscriber_;
while (!ret_val && (subs->getUnreadCount() > 0) && (subs->takeNextData(data, &sinfo))) {
identity = sinfo.sample_identity;
ret_val = (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind);
}
list_has_data_.store(subs->getUnreadCount() > 0);
return ret_val;
};

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
if (!list.empty()) {
request = list.front();
list.pop_front();
list_has_data_.store(!list.empty());
}
} else {
if (!list.empty()) {
request = list.front();
list.pop_front();
list_has_data_.store(!list.empty());
}
return pop_request(data, identity);
}

return request;
return pop_request(data, identity);
}

void
Expand Down Expand Up @@ -141,7 +119,6 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
private:
CustomServiceInfo * info_;
std::mutex internalMutex_;
std::list<CustomServiceRequest> list;
std::atomic_bool list_has_data_;
std::mutex * conditionMutex_;
std::condition_variable * conditionVariable_;
Expand Down
18 changes: 8 additions & 10 deletions rmw_fastrtps_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,19 @@ rmw_take_request(
auto info = static_cast<CustomServiceInfo *>(service->data);
assert(info);

CustomServiceRequest request = info->listener_->getRequest();
eprosima::fastcdr::Cdr deser(*request.buffer_, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

if (request.buffer_ != nullptr) {
eprosima::fastcdr::FastBuffer buffer;
eprosima::fastrtps::rtps::SampleIdentity sample_identity;
if (info->listener_->getRequest(&buffer, sample_identity)) {
eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);
_deserialize_ros_message(deser, ros_request, info->request_type_support_,
info->typesupport_identifier_);

// Get header
memcpy(request_header->writer_guid, &request.sample_identity_.writer_guid(),
memcpy(request_header->writer_guid, &sample_identity.writer_guid(),
sizeof(eprosima::fastrtps::rtps::GUID_t));
request_header->sequence_number = ((int64_t)request.sample_identity_.sequence_number().high) <<
32 | request.sample_identity_.sequence_number().low;

delete request.buffer_;
request_header->sequence_number = ((int64_t)sample_identity.sequence_number().high) <<
32 | sample_identity.sequence_number().low;

*taken = true;
}
Expand Down
12 changes: 7 additions & 5 deletions rmw_fastrtps_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <cassert>

#include "fastcdr/Cdr.h"
#include "fastcdr/FastBuffer.h"

#include "fastrtps/subscriber/Subscriber.h"

Expand Down Expand Up @@ -50,18 +51,19 @@ rmw_take_response(
auto info = static_cast<CustomClientInfo *>(client->data);
assert(info);

CustomClientResponse response;
eprosima::fastcdr::FastBuffer buffer;
eprosima::fastrtps::rtps::SampleIdentity sample_identity;

if (info->listener_->getResponse(response)) {
if (info->listener_->getResponse(&buffer, sample_identity)) {
eprosima::fastcdr::Cdr deser(
*response.buffer_,
buffer,
eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);
_deserialize_ros_message(
deser, ros_response, info->response_type_support_, info->typesupport_identifier_);

request_header->sequence_number = ((int64_t)response.sample_identity_.sequence_number().high) <<
32 | response.sample_identity_.sequence_number().low;
request_header->sequence_number = ((int64_t)sample_identity.sequence_number().high) <<
32 | sample_identity.sequence_number().low;

*taken = true;
}
Expand Down

0 comments on commit 0f46c54

Please sign in to comment.