Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Galactic] Loan messages implementation #547

Merged
merged 5 commits into from
Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@

#include "type_support_common.hpp"

using DataSharingKind = eprosima::fastdds::dds::DataSharingKind;

rmw_publisher_t *
rmw_fastrtps_cpp::create_publisher(
const CustomParticipantInfo * participant_info,
Expand Down Expand Up @@ -248,6 +250,8 @@ rmw_fastrtps_cpp::create_publisher(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

// Get QoS from RMW
Expand Down Expand Up @@ -291,7 +295,8 @@ rmw_fastrtps_cpp::create_publisher(
rmw_publisher_free(rmw_publisher);
});

rmw_publisher->can_loan_messages = false;
bool has_data_sharing = DataSharingKind::OFF != writer_qos.data_sharing().kind();
rmw_publisher->can_loan_messages = has_data_sharing && info->type_support_->is_plain();
rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier;
rmw_publisher->data = info;

Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ rmw_create_client(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -359,6 +361,8 @@ rmw_create_client(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
8 changes: 2 additions & 6 deletions rmw_fastrtps_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,7 @@ rmw_publish_loaned_message(
void * ros_message,
rmw_publisher_allocation_t * allocation)
{
(void) publisher;
(void) ros_message;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_publish_loaned_message not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_publish_loaned_message(
eprosima_fastrtps_identifier, publisher, ros_message, allocation);
}
} // extern "C"
16 changes: 4 additions & 12 deletions rmw_fastrtps_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,25 +169,17 @@ rmw_borrow_loaned_message(
const rosidl_message_type_support_t * type_support,
void ** ros_message)
{
(void) publisher;
(void) type_support;
(void) ros_message;

RMW_SET_ERROR_MSG("rmw_borrow_loaned_message not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_borrow_loaned_message(
eprosima_fastrtps_identifier, publisher, type_support, ros_message);
}

rmw_ret_t
rmw_return_loaned_message_from_publisher(
const rmw_publisher_t * publisher,
void * loaned_message)
{
(void) publisher;
(void) loaned_message;

RMW_SET_ERROR_MSG(
"rmw_return_loaned_message_from_publisher not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_publisher(
eprosima_fastrtps_identifier, publisher, loaned_message);
}

rmw_ret_t
Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ rmw_create_service(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -362,6 +364,8 @@ rmw_create_service(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
30 changes: 9 additions & 21 deletions rmw_fastrtps_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,9 @@ rmw_take_loaned_message(
bool * taken,
rmw_subscription_allocation_t * allocation)
{
(void) subscription;
(void) loaned_message;
(void) taken;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_take_loaned_message not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
static_cast<void>(allocation);
return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal(
eprosima_fastrtps_identifier, subscription, loaned_message, taken, nullptr);
}

rmw_ret_t
Expand All @@ -108,27 +104,19 @@ rmw_take_loaned_message_with_info(
rmw_message_info_t * message_info,
rmw_subscription_allocation_t * allocation)
{
(void) subscription;
(void) loaned_message;
(void) taken;
(void) message_info;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_take_loaned_message_with_info not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
static_cast<void>(allocation);
RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT);
return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal(
eprosima_fastrtps_identifier, subscription, loaned_message, taken, message_info);
}

rmw_ret_t
rmw_return_loaned_message_from_subscription(
const rmw_subscription_t * subscription,
void * loaned_message)
{
(void) subscription;
(void) loaned_message;

RMW_SET_ERROR_MSG(
"rmw_return_loaned_message_from_subscription not implemented for rmw_fastrtps_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_subscription(
eprosima_fastrtps_identifier, subscription, loaned_message);
}

rmw_ret_t
Expand Down
5 changes: 4 additions & 1 deletion rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "rmw_fastrtps_shared_cpp/namespace_prefix.hpp"
#include "rmw_fastrtps_shared_cpp/qos.hpp"
#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"
#include "rmw_fastrtps_shared_cpp/subscription.hpp"
#include "rmw_fastrtps_shared_cpp/utils.hpp"

#include "rmw_fastrtps_cpp/identifier.hpp"
Expand Down Expand Up @@ -242,6 +243,8 @@ create_subscription(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -324,7 +327,7 @@ create_subscription(
return nullptr;
}
rmw_subscription->options = *subscription_options;
rmw_subscription->can_loan_messages = false;
rmw_fastrtps_shared_cpp::__init_subscription_for_loans(rmw_subscription);

topic.should_be_deleted = false;
cleanup_rmw_subscription.cancel();
Expand Down
2 changes: 2 additions & 0 deletions rmw_fastrtps_cpp/src/type_support_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ void TypeSupport::set_members(const message_type_support_callbacks_t * members)

// Total size is encapsulation size + data size
m_typeSize = 4 + data_size;
// Account for RTPS submessage alignment
m_typeSize = (m_typeSize + 3) & ~3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MiguelCompany mind to explain this change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hidmic
TL/DR: Account for RTPS sub-message alignment

The DDSI-RTPS spec requires each submessage to be aligned on a 4-byte boundary. This means that, when using PREALLOCATED memory policy, we could receive a serialized payload bigger than m_typeSize, which would thus be discarded. When using the rmw's default PREALLOCATED_WITH_REALLOC memory policy, an unnecessary reallocation could happen.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments on f40ff2d

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. How is this not a problem on master? Upstream Fast-DDS differences?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hidmic You are right! It is a problem on master. I've created #550 with the proper fix.

}

size_t TypeSupport::getEstimatedSerializedSize(const void * ros_message, const void * impl) const
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ MessageTypeSupport<MembersType>::MessageTypeSupport(
} else {
this->m_typeSize++;
}
// Account for RTPS submessage alignment
this->m_typeSize = (this->m_typeSize + 3) & ~3;
}

} // namespace rmw_fastrtps_dynamic_cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ RequestTypeSupport<ServiceMembersType, MessageMembersType>::RequestTypeSupport(
} else {
this->m_typeSize++;
}
// Account for RTPS submessage alignment
this->m_typeSize = (this->m_typeSize + 3) & ~3;
}

template<typename ServiceMembersType, typename MessageMembersType>
Expand Down Expand Up @@ -88,6 +90,8 @@ ResponseTypeSupport<ServiceMembersType, MessageMembersType>::ResponseTypeSupport
} else {
this->m_typeSize++;
}
// Account for RTPS submessage alignment
this->m_typeSize = (this->m_typeSize + 3) & ~3;
}

} // namespace rmw_fastrtps_dynamic_cpp
Expand Down
6 changes: 5 additions & 1 deletion rmw_fastrtps_dynamic_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "type_support_common.hpp"
#include "type_support_registry.hpp"

using DataSharingKind = eprosima::fastdds::dds::DataSharingKind;
using TypeSupportProxy = rmw_fastrtps_dynamic_cpp::TypeSupportProxy;

rmw_publisher_t *
Expand Down Expand Up @@ -263,6 +264,8 @@ rmw_fastrtps_dynamic_cpp::create_publisher(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

// Get QoS from RMW
Expand Down Expand Up @@ -306,7 +309,8 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
rmw_publisher_free(rmw_publisher);
});

rmw_publisher->can_loan_messages = false;
bool has_data_sharing = DataSharingKind::OFF != writer_qos.data_sharing().kind();
rmw_publisher->can_loan_messages = has_data_sharing && info->type_support_->is_plain();
rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier;
rmw_publisher->data = info;

Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ rmw_create_client(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -390,6 +392,8 @@ rmw_create_client(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
8 changes: 2 additions & 6 deletions rmw_fastrtps_dynamic_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,8 @@ rmw_publish_loaned_message(
void * ros_message,
rmw_publisher_allocation_t * allocation)
{
(void) publisher;
(void) ros_message;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_publish_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_publish_loaned_message(
eprosima_fastrtps_identifier, publisher, ros_message, allocation);
}

rmw_ret_t
Expand Down
16 changes: 4 additions & 12 deletions rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,25 +170,17 @@ rmw_borrow_loaned_message(
const rosidl_message_type_support_t * type_support,
void ** ros_message)
{
(void) publisher;
(void) type_support;
(void) ros_message;

RMW_SET_ERROR_MSG("rmw_borrow_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_borrow_loaned_message(
eprosima_fastrtps_identifier, publisher, type_support, ros_message);
}

rmw_ret_t
rmw_return_loaned_message_from_publisher(
const rmw_publisher_t * publisher,
void * loaned_message)
{
(void) publisher;
(void) loaned_message;

RMW_SET_ERROR_MSG(
"rmw_return_loaned_message_from_publisher is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_publisher(
eprosima_fastrtps_identifier, publisher, loaned_message);
}

using BaseTypeSupport = rmw_fastrtps_dynamic_cpp::BaseTypeSupport;
Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ rmw_create_service(
if (!participant_info->leave_middleware_default_qos) {
reader_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

reader_qos.data_sharing().off();
}

if (!get_datareader_qos(*qos_policies, reader_qos)) {
Expand Down Expand Up @@ -393,6 +395,8 @@ rmw_create_service(

writer_qos.endpoint().history_memory_policy =
eprosima::fastrtps::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE;

writer_qos.data_sharing().off();
}

if (!get_datawriter_qos(*qos_policies, writer_qos)) {
Expand Down
31 changes: 9 additions & 22 deletions rmw_fastrtps_dynamic_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,9 @@ rmw_take_loaned_message(
bool * taken,
rmw_subscription_allocation_t * allocation)
{
(void) subscription;
(void) loaned_message;
(void) taken;
(void) allocation;

RMW_SET_ERROR_MSG("rmw_take_loaned_message is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
static_cast<void>(allocation);
return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal(
eprosima_fastrtps_identifier, subscription, loaned_message, taken, nullptr);
}

rmw_ret_t
Expand All @@ -108,28 +104,19 @@ rmw_take_loaned_message_with_info(
rmw_message_info_t * message_info,
rmw_subscription_allocation_t * allocation)
{
(void) subscription;
(void) loaned_message;
(void) taken;
(void) message_info;
(void) allocation;

RMW_SET_ERROR_MSG(
"rmw_take_loaned_message_with_info is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
static_cast<void>(allocation);
RMW_CHECK_ARGUMENT_FOR_NULL(message_info, RMW_RET_INVALID_ARGUMENT);
return rmw_fastrtps_shared_cpp::__rmw_take_loaned_message_internal(
eprosima_fastrtps_identifier, subscription, loaned_message, taken, message_info);
}

rmw_ret_t
rmw_return_loaned_message_from_subscription(
const rmw_subscription_t * subscription,
void * loaned_message)
{
(void) subscription;
(void) loaned_message;

RMW_SET_ERROR_MSG(
"rmw_return_loaned_message_from_subscription is not implemented for rmw_fastrtps_dynamic_cpp");
return RMW_RET_UNSUPPORTED;
return rmw_fastrtps_shared_cpp::__rmw_return_loaned_message_from_subscription(
eprosima_fastrtps_identifier, subscription, loaned_message);
}

rmw_ret_t
Expand Down
Loading