diff --git a/rmw_fastrtps_cpp/src/rmw_client.cpp b/rmw_fastrtps_cpp/src/rmw_client.cpp index fdd92eb61..b70c47ef2 100644 --- a/rmw_fastrtps_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_cpp/src/rmw_client.cpp @@ -92,6 +92,8 @@ rmw_create_client( info = new CustomClientInfo(); info->participant_ = participant; info->typesupport_identifier_ = type_support->typesupport_identifier; + info->request_publisher_matched_count_ = 0; + info->response_subscriber_matched_count_ = 0; const service_type_support_callbacks_t * service_members; const message_type_support_callbacks_t * request_members; @@ -178,8 +180,9 @@ rmw_create_client( RMW_SET_ERROR_MSG("failed to get datawriter qos"); goto fail; } + info->pub_listener_ = new ClientPubListener(info); info->request_publisher_ = - Domain::createPublisher(participant, publisherParam, nullptr); + Domain::createPublisher(participant, publisherParam, info->pub_listener_); if (!info->request_publisher_) { RMW_SET_ERROR_MSG("create_publisher() could not create publisher"); goto fail; @@ -215,6 +218,10 @@ rmw_create_client( Domain::removeSubscriber(info->response_subscriber_); } + if (info->pub_listener_ != nullptr) { + delete info->pub_listener_; + } + if (info->listener_ != nullptr) { delete info->listener_; } diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp index babb2876e..5950a0fed 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp @@ -27,10 +27,12 @@ #include "fastrtps/subscriber/SubscriberListener.h" #include "fastrtps/participant/Participant.h" #include "fastrtps/publisher/Publisher.h" +#include "fastrtps/publisher/PublisherListener.h" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" class ClientListener; +class ClientPubListener; typedef struct CustomClientInfo { @@ -42,6 +44,9 @@ typedef struct CustomClientInfo eprosima::fastrtps::rtps::GUID_t writer_guid_; eprosima::fastrtps::Participant * participant_; const char * typesupport_identifier_; + ClientPubListener * pub_listener_; + uint32_t response_subscriber_matched_count_; + uint32_t request_publisher_matched_count_; } CustomClientInfo; typedef struct CustomClientResponse @@ -141,6 +146,21 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener return list_has_data_.load(); } + void onSubscriptionMatched( + eprosima::fastrtps::Subscriber * sub, + eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) + { + if (info_ == nullptr || sub == nullptr) { + return; + } + + if (matchingInfo.status == eprosima::fastrtps::rtps::MATCHED_MATCHING) { + info_->response_subscriber_matched_count_++; + } else { + info_->response_subscriber_matched_count_--; + } + } + private: CustomClientInfo * info_; std::mutex internalMutex_; @@ -150,4 +170,31 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener std::condition_variable * conditionVariable_; }; +class ClientPubListener : public eprosima::fastrtps::PublisherListener +{ +public: + explicit ClientPubListener(CustomClientInfo * info) + : info_(info) + { + } + + void onPublicationMatched( + eprosima::fastrtps::Publisher * pub, + eprosima::fastrtps::rtps::MatchingInfo & matchingInfo) + { + if (info_ == nullptr || pub == nullptr) { + return; + } + + if (matchingInfo.status == eprosima::fastrtps::rtps::MATCHED_MATCHING) { + info_->request_publisher_matched_count_++; + } else { + info_->request_publisher_matched_count_--; + } + } + +private: + CustomClientInfo * info_; +}; + #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_CLIENT_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp index c4a7f89a3..89976d472 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp @@ -56,6 +56,9 @@ __rmw_destroy_client( if (info->request_publisher_ != nullptr) { Domain::removePublisher(info->request_publisher_); } + if (info->pub_listener_ != nullptr) { + delete info->pub_listener_; + } if (info->listener_ != nullptr) { delete info->listener_; } diff --git a/rmw_fastrtps_shared_cpp/src/rmw_service_server_is_available.cpp b/rmw_fastrtps_shared_cpp/src/rmw_service_server_is_available.cpp index 281509259..4960ebdbd 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_service_server_is_available.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_service_server_is_available.cpp @@ -104,6 +104,15 @@ __rmw_service_server_is_available( return RMW_RET_OK; } + if (0 == client_info->request_publisher_matched_count_) { + // not ready + return RMW_RET_OK; + } + if (0 == client_info->response_subscriber_matched_count_) { + // not ready + return RMW_RET_OK; + } + // all conditions met, there is a service server available *is_available = true; return RMW_RET_OK;