Skip to content

Commit

Permalink
Merge pull request ros2#34 from mauropasse/mauro/irobot-humble-ignore…
Browse files Browse the repository at this point in the history
…_local_endpoints

Ignore local endpoints: RMW_FASTRTPS
  • Loading branch information
alsora authored Nov 24, 2023
2 parents 4f8206b + 9b75fe2 commit 3c264d3
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 6 deletions.
17 changes: 17 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,23 @@ rmw_publisher_count_matched_subscriptions(
publisher, subscription_count);
}

rmw_ret_t
rmw_publisher_count_non_local_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * non_local_subscription_count)
{
RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
publisher,
publisher->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(non_local_subscription_count, RMW_RET_INVALID_ARGUMENT);

return rmw_fastrtps_shared_cpp::__rmw_publisher_count_non_local_matched_subscriptions(
publisher, non_local_subscription_count);
}

rmw_ret_t
rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher)
{
Expand Down
17 changes: 17 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,23 @@ rmw_publisher_count_matched_subscriptions(
publisher, subscription_count);
}

rmw_ret_t
rmw_publisher_count_non_local_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * non_local_subscription_count)
{
RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
publisher,
publisher->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(non_local_subscription_count, RMW_RET_INVALID_ARGUMENT);

return rmw_fastrtps_shared_cpp::__rmw_publisher_count_non_local_matched_subscriptions(
publisher, non_local_subscription_count);
}

rmw_ret_t
rmw_publisher_assert_liveliness(const rmw_publisher_t * publisher)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,22 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_publication_matched(
eprosima::fastdds::dds::DataWriter * /* writer */,
eprosima::fastdds::dds::DataWriter * writer,
const eprosima::fastdds::dds::PublicationMatchedStatus & info) final
{
std::lock_guard<std::mutex> lock(discovery_m_);
eprosima::fastrtps::rtps::GUID_t subscription_guid =
eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle);
if (info.current_count_change == 1) {
subscriptions_.insert(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle));
subscriptions_.insert(subscription_guid);
if (writer->guid().guidPrefix != subscription_guid.guidPrefix) {
non_local_subscriptions_.insert(subscription_guid);
}
} else if (info.current_count_change == -1) {
subscriptions_.erase(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle));
subscriptions_.erase(subscription_guid);
if (writer->guid().guidPrefix != subscription_guid.guidPrefix) {
non_local_subscriptions_.erase(subscription_guid);
}
}
}

Expand Down Expand Up @@ -105,6 +113,12 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
return subscriptions_.size();
}

size_t non_local_subscription_count()
{
std::lock_guard<std::mutex> lock(discovery_m_);
return non_local_subscriptions_.size();
}

RMW_FASTRTPS_SHARED_CPP_PUBLIC
eprosima::fastdds::dds::StatusCondition & get_statuscondition() const final;

Expand All @@ -129,6 +143,9 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
std::set<eprosima::fastrtps::rtps::GUID_t> subscriptions_
RCPPUTILS_TSA_GUARDED_BY(discovery_m_);

std::set<eprosima::fastrtps::rtps::GUID_t> non_local_subscriptions_
RCPPUTILS_TSA_PT_GUARDED_BY(discovery_m_);

bool deadline_changes_
RCPPUTILS_TSA_GUARDED_BY(on_new_event_m_);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ __rmw_publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * subscription_count);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_publisher_count_non_local_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * non_local_subscription_count);

RMW_FASTRTPS_SHARED_CPP_PUBLIC
rmw_ret_t
__rmw_publisher_get_actual_qos(
Expand Down
12 changes: 12 additions & 0 deletions rmw_fastrtps_shared_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,18 @@ __rmw_publisher_count_matched_subscriptions(
return RMW_RET_OK;
}

rmw_ret_t
__rmw_publisher_count_non_local_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * non_local_subscription_count)
{
auto info = static_cast<CustomPublisherInfo *>(publisher->data);

*non_local_subscription_count = info->listener_->non_local_subscription_count();

return RMW_RET_OK;
}

rmw_ret_t
__rmw_publisher_assert_liveliness(
const char * identifier,
Expand Down
18 changes: 15 additions & 3 deletions rmw_fastrtps_shared_cpp/src/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,20 @@ create_datareader(
eprosima::fastdds::dds::DataReader ** data_reader
)
{
eprosima::fastdds::dds::DataReaderQos updated_qos = datareader_qos;
using PropertyPolicyHelper = eprosima::fastrtps::rtps::PropertyPolicyHelper;

eprosima::fastdds::dds::DataReaderQos input_qos = datareader_qos;
if (subscription_options->ignore_local_publications) {
if (nullptr ==
PropertyPolicyHelper::find_property(
input_qos.properties(),
"fastdds.match_local_endpoints"))
{
input_qos.properties().properties().emplace_back("fastdds.match_local_endpoints", "false");
}
}

eprosima::fastdds::dds::DataReaderQos updated_qos = input_qos;
switch (subscription_options->require_unique_network_flow_endpoints) {
default:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_SYSTEM_DEFAULT:
Expand All @@ -180,7 +193,6 @@ create_datareader(
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_OPTIONALLY_REQUIRED:
case RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_STRICTLY_REQUIRED:
// Ensure we request unique network flow endpoints
using PropertyPolicyHelper = eprosima::fastrtps::rtps::PropertyPolicyHelper;
if (nullptr ==
PropertyPolicyHelper::find_property(
updated_qos.properties(),
Expand All @@ -203,7 +215,7 @@ create_datareader(
{
*data_reader = subscriber->create_datareader(
des_topic,
datareader_qos,
input_qos,
listener,
eprosima::fastdds::dds::StatusMask::subscription_matched());
}
Expand Down

0 comments on commit 3c264d3

Please sign in to comment.