Skip to content

Commit

Permalink
Loan messages implementation (ros2#523)
Browse files Browse the repository at this point in the history
* Added is_plain_ attribute to base TypeSupport.
* Added new methods to base TypeSupport.
* Implementation of rmw_borrow_loaned_message.
* Implementation of rmw_return_loaned_message_from_publisher.
* Enable loan messages on publishers of plain types.
* Implementation for taking loaned messages.
* Enable loan messages on subscriptions of plain types.

Signed-off-by: Miguel Company <[email protected]>
Co-authored-by: Michel Hidalgo <[email protected]>
  • Loading branch information
MiguelCompany and hidmic committed Jul 9, 2021
1 parent 7bb3563 commit a8a9cf1
Show file tree
Hide file tree
Showing 27 changed files with 383 additions and 88 deletions.
7 changes: 6 additions & 1 deletion rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@

#include "type_support_common.hpp"

using DataSharingKind = eprosima::fastdds::dds::DataSharingKind;

rmw_publisher_t *
rmw_fastrtps_cpp::create_publisher(
const CustomParticipantInfo * participant_info,
Expand Down Expand Up @@ -248,6 +250,8 @@ rmw_fastrtps_cpp::create_publisher(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

// Get QoS from RMW
Expand Down Expand Up @@ -291,7 +295,8 @@ rmw_fastrtps_cpp::create_publisher(
rmw_publisher_free(rmw_publisher);
});

rmw_publisher->can_loan_messages = false;
bool has_data_sharing = DataSharingKind::OFF != writer_qos.data_sharing().kind();
rmw_publisher->can_loan_messages = has_data_sharing && info->type_support_->is_plain();
rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier;
rmw_publisher->data = info;

Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ rmw_create_client(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -359,6 +361,8 @@ rmw_create_client(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
8 changes: 2 additions & 6 deletions rmw_fastrtps_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,7 @@ rmw_publish_loaned_message(
void * ros_message,
rmw_publisher_allocation_t * allocation)
{
(void) publisher;
(void) ros_message;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_publish_loaned_message not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_publish_loaned_message(
eprosima_fastrtps_identifier, publisher, ros_message, allocation);
}
} // extern "C"
16 changes: 4 additions & 12 deletions rmw_fastrtps_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,25 +169,17 @@ rmw_borrow_loaned_message(
const rosidl_message_type_support_t * type_support,
void ** ros_message)
{
(void) publisher;
(void) type_support;
(void) ros_message;

RMW_SET_ERROR_MSG("rmw_borrow_loaned_message not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_borrow_loaned_message(
eprosima_fastrtps_identifier, publisher, type_support, ros_message);
}

rmw_ret_t
rmw_return_loaned_message_from_publisher(
const rmw_publisher_t * publisher,
void * loaned_message)
{
(void) publisher;
(void) loaned_message;

RMW_SET_ERROR_MSG(
"rmw_return_loaned_message_from_publisher not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_publisher(
eprosima_fastrtps_identifier, publisher, loaned_message);
}

rmw_ret_t
Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ rmw_create_service(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -362,6 +364,8 @@ rmw_create_service(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
30 changes: 9 additions & 21 deletions rmw_fastrtps_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,9 @@ rmw_take_loaned_message(
bool * taken,
rmw_subscription_allocation_t * allocation)
{
(void) subscription;
(void) loaned_message;
(void) taken;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_take_loaned_message not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
static_cast<void>(allocation);
return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal(
eprosima_fastrtps_identifier, subscription, loaned_message, taken, nullptr);
}

rmw_ret_t
Expand All @@ -108,27 +104,19 @@ rmw_take_loaned_message_with_info(
rmw_message_info_t * message_info,
rmw_subscription_allocation_t * allocation)
{
(void) subscription;
(void) loaned_message;
(void) taken;
(void) message_info;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_take_loaned_message_with_info not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
static_cast<void>(allocation);
RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT);
return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal(
eprosima_fastrtps_identifier, subscription, loaned_message, taken, message_info);
}

rmw_ret_t
rmw_return_loaned_message_from_subscription(
const rmw_subscription_t * subscription,
void * loaned_message)
{
(void) subscription;
(void) loaned_message;

RMW_SET_ERROR_MSG(
"rmw_return_loaned_message_from_subscription not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_subscription(
eprosima_fastrtps_identifier, subscription, loaned_message);
}

rmw_ret_t
Expand Down
5 changes: 4 additions & 1 deletion rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "rmw_fastrtps_shared_cpp/namespace_prefix.hpp"
#include "rmw_fastrtps_shared_cpp/qos.hpp"
#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"
#include "rmw_fastrtps_shared_cpp/subscription.hpp"
#include "rmw_fastrtps_shared_cpp/utils.hpp"

#include "rmw_fastrtps_cpp/identifier.hpp"
Expand Down Expand Up @@ -242,6 +243,8 @@ create_subscription(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -324,7 +327,7 @@ create_subscription(
return nullptr;
}
rmw_subscription->options = *subscription_options;
rmw_subscription->can_loan_messages = false;
rmw_fastrtps_shared_cpp::__init_subscription_for_loans(rmw_subscription);

topic.should_be_deleted = false;
cleanup_rmw_subscription.cancel();
Expand Down
9 changes: 8 additions & 1 deletion rmw_fastrtps_cpp/src/type_support_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,22 @@ TypeSupport::TypeSupport()
{
m_isGetKeyDefined = false;
max_size_bound_ = false;
is_plain_ = false;
}

void TypeSupport::set_members(const message_type_support_callbacks_t * members)
{
members_ = members;

// Fully bound by default
#ifdef ROSIDL_TYPESUPPORT_FASTRTPS_HAS_PLAIN_TYPES
char bounds_info;
auto data_size = static_cast<uint32_t>(members->max_serialized_size(bounds_info));
max_size_bound_ = 0 != (bounds_info & ROSIDL_TYPESUPPORT_FASTRTPS_BOUNDED_TYPE);
is_plain_ = bounds_info == ROSIDL_TYPESUPPORT_FASTRTPS_PLAIN_TYPE;
#else
max_size_bound_ = true;
auto data_size = static_cast<uint32_t>(members->max_serialized_size(max_size_bound_));
#endif

// A fully bound message of size 0 is an empty message
if (max_size_bound_ && (data_size == 0) ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ MessageTypeSupport<MembersType>::MessageTypeSupport(
ss << "dds_::" << message_name << "_";
this->setName(ss.str().c_str());

// Fully bound by default
// Fully bound and plain by default
this->max_size_bound_ = true;
this->is_plain_ = true;
// Encapsulation size
this->m_typeSize = 4;
if (this->members_->member_count_ != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ RequestTypeSupport<ServiceMembersType, MessageMembersType>::RequestTypeSupport(
ss << "dds_::" << service_name << "_Request_";
this->setName(ss.str().c_str());

// Fully bound by default
// Fully bound and plain by default
this->max_size_bound_ = true;
this->is_plain_ = true;
// Encapsulation size
this->m_typeSize = 4;
if (this->members_->member_count_ != 0) {
Expand Down Expand Up @@ -79,8 +80,9 @@ ResponseTypeSupport<ServiceMembersType, MessageMembersType>::ResponseTypeSupport
ss << "dds_::" << service_name << "_Response_";
this->setName(ss.str().c_str());

// Fully bound by default
// Fully bound and plain by default
this->max_size_bound_ = true;
this->is_plain_ = true;
// Encapsulation size
this->m_typeSize = 4;
if (this->members_->member_count_ != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ TypeSupport<MembersType>::TypeSupport(const void * ros_type_support)
{
m_isGetKeyDefined = false;
max_size_bound_ = false;
is_plain_ = false;
}

// C++ specialization
Expand Down Expand Up @@ -829,9 +830,15 @@ size_t TypeSupport<MembersType>::calculateMaxSerializedSize(
size_t array_size = 1;
if (member->is_array_) {
array_size = member->array_size_;

// Whether it is unbounded.
if (0u == array_size) {
this->max_size_bound_ = false;
}

// Whether it is a sequence.
if (0 == array_size || member->is_upper_bound_) {
this->max_size_bound_ = false;
this->is_plain_ = false;
current_alignment += padding +
eprosima::fastcdr::Cdr::alignment(current_alignment, padding);
}
Expand Down Expand Up @@ -866,6 +873,7 @@ size_t TypeSupport<MembersType>::calculateMaxSerializedSize(
case ::rosidl_typesupport_introspection_cpp::ROS_TYPE_WSTRING:
{
this->max_size_bound_ = false;
this->is_plain_ = false;
size_t character_size =
(member->type_id_ == rosidl_typesupport_introspection_cpp::ROS_TYPE_WSTRING) ? 4 : 1;
for (size_t index = 0; index < array_size; ++index) {
Expand Down
6 changes: 5 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "type_support_common.hpp"
#include "type_support_registry.hpp"

using DataSharingKind = eprosima::fastdds::dds::DataSharingKind;
using TypeSupportProxy = rmw_fastrtps_dynamic_cpp::TypeSupportProxy;

rmw_publisher_t *
Expand Down Expand Up @@ -263,6 +264,8 @@ rmw_fastrtps_dynamic_cpp::create_publisher(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

// Get QoS from RMW
Expand Down Expand Up @@ -306,7 +309,8 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
rmw_publisher_free(rmw_publisher);
});

rmw_publisher->can_loan_messages = false;
bool has_data_sharing = DataSharingKind::OFF != writer_qos.data_sharing().kind();
rmw_publisher->can_loan_messages = has_data_sharing && info->type_support_->is_plain();
rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier;
rmw_publisher->data = info;

Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ rmw_create_client(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -390,6 +392,8 @@ rmw_create_client(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
8 changes: 2 additions & 6 deletions rmw_fastrtps_dynamic_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,8 @@ rmw_publish_loaned_message(
void * ros_message,
rmw_publisher_allocation_t * allocation)
{
(void) publisher;
(void) ros_message;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_publish_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_publish_loaned_message(
eprosima_fastrtps_identifier, publisher, ros_message, allocation);
}

rmw_ret_t
Expand Down
16 changes: 4 additions & 12 deletions rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,25 +170,17 @@ rmw_borrow_loaned_message(
const rosidl_message_type_support_t * type_support,
void ** ros_message)
{
(void) publisher;
(void) type_support;
(void) ros_message;

RMW_SET_ERROR_MSG("rmw_borrow_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_borrow_loaned_message(
eprosima_fastrtps_identifier, publisher, type_support, ros_message);
}

rmw_ret_t
rmw_return_loaned_message_from_publisher(
const rmw_publisher_t * publisher,
void * loaned_message)
{
(void) publisher;
(void) loaned_message;

RMW_SET_ERROR_MSG(
"rmw_return_loaned_message_from_publisher is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_publisher(
eprosima_fastrtps_identifier, publisher, loaned_message);
}

using BaseTypeSupport = rmw_fastrtps_dynamic_cpp::BaseTypeSupport;
Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ rmw_create_service(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -393,6 +395,8 @@ rmw_create_service(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
Loading

0 comments on commit a8a9cf1

Please sign in to comment.