diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index b4e42dd2b1..66b0f10bb3 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -65,6 +65,7 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/parameter_map.cpp src/rclcpp/parameter_service.cpp src/rclcpp/publisher_base.cpp + src/rclcpp/qos_event.cpp src/rclcpp/service.cpp src/rclcpp/signal_handler.cpp src/rclcpp/subscription_base.cpp diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index c53bffbd7d..57319b2c23 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -21,6 +21,7 @@ #include #include "rclcpp/client.hpp" +#include "rclcpp/publisher.hpp" #include "rclcpp/service.hpp" #include "rclcpp/subscription.hpp" #include "rclcpp/timer.hpp" @@ -92,6 +93,10 @@ class CallbackGroup protected: RCLCPP_DISABLE_COPY(CallbackGroup) + RCLCPP_PUBLIC + void + add_publisher(const rclcpp::PublisherBase::SharedPtr publisher_ptr); + RCLCPP_PUBLIC void add_subscription(const rclcpp::SubscriptionBase::SharedPtr subscription_ptr); diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index f136f33653..e21b00687b 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -31,6 +31,8 @@ create_publisher( rclcpp::node_interfaces::NodeTopicsInterface * node_topics, const std::string & topic_name, const rmw_qos_profile_t & qos_profile, + const PublisherEventCallbacks & event_callbacks, + rclcpp::callback_group::CallbackGroup::SharedPtr group, bool use_intra_process_comms, std::shared_ptr allocator) { @@ -39,10 +41,12 @@ create_publisher( auto pub = node_topics->create_publisher( topic_name, - rclcpp::create_publisher_factory(allocator), + rclcpp::create_publisher_factory(event_callbacks, allocator), publisher_options, use_intra_process_comms); - node_topics->add_publisher(pub); + + node_topics->add_publisher(pub, group); + return std::dynamic_pointer_cast(pub); } diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index a4ea581092..e765d376e2 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -38,6 +38,7 @@ create_subscription( const std::string & topic_name, CallbackT && callback, const rmw_qos_profile_t & qos_profile, + const SubscriptionEventCallbacks & event_callbacks, rclcpp::callback_group::CallbackGroup::SharedPtr group, bool ignore_local_publications, bool use_intra_process_comms, @@ -52,7 +53,10 @@ create_subscription( auto factory = rclcpp::create_subscription_factory ( - std::forward(callback), msg_mem_strat, allocator); + std::forward(callback), + event_callbacks, + msg_mem_strat, + allocator); auto sub = node_topics->create_subscription( topic_name, diff --git a/rclcpp/include/rclcpp/exceptions.hpp b/rclcpp/include/rclcpp/exceptions.hpp index 4744afe662..c65df54f1f 100644 --- a/rclcpp/include/rclcpp/exceptions.hpp +++ b/rclcpp/include/rclcpp/exceptions.hpp @@ -110,13 +110,15 @@ class InvalidServiceNameError : public NameValidationError * \throws std::runtime_error if the rcl_get_error_state returns 0 * \throws RCLErrorBase some child class exception based on ret */ +/* *INDENT-OFF* */ // Uncrustify cannot yet understand [[noreturn]] properly RCLCPP_PUBLIC void -throw_from_rcl_error( +throw_from_rcl_error [[noreturn]] ( rcl_ret_t ret, const std::string & prefix = "", const rcl_error_state_t * error_state = nullptr, void (* reset_error)() = rcl_reset_error); +/* *INDENT-ON* */ class RCLErrorBase { diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index 416890ed6c..70db12237a 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -51,6 +51,7 @@ class RCLCPP_PUBLIC MemoryStrategy virtual size_t number_of_ready_subscriptions() const = 0; virtual size_t number_of_ready_services() const = 0; virtual size_t number_of_ready_clients() const = 0; + virtual size_t number_of_ready_events() const = 0; virtual size_t number_of_ready_timers() const = 0; virtual size_t number_of_guard_conditions() const = 0; virtual size_t number_of_waitables() const = 0; diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index ff464f6d59..c987071592 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -1126,6 +1126,19 @@ class Node : public std::enable_shared_from_this const NodeOptions & get_node_options() const; + /// Manually assert that this Node is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE). + /** + * If the rmw Liveliness policy is set to RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE, the creator + * of this node may manually call `assert_liveliness` at some point in time to signal to the rest + * of the system that this Node is still alive. + * + * \return `true` if the liveliness was asserted successfully, otherwise `false` + */ + RCLCPP_PUBLIC + RCUTILS_WARN_UNUSED + bool + assert_liveliness() const; + protected: /// Construct a sub-node, which will extend the namespace of all entities created with it. /** diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index 78fdd8de95..a3eb298ccc 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -97,6 +97,8 @@ Node::create_publisher( this->node_topics_.get(), extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), qos_profile, + options.event_callbacks, + options.callback_group, use_intra_process, allocator); } @@ -181,6 +183,7 @@ Node::create_subscription( extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), std::forward(callback), qos_profile, + options.event_callbacks, options.callback_group, options.ignore_local_publications, use_intra_process, diff --git a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp index b7470269f4..77379c2858 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_base.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_base.hpp @@ -86,6 +86,11 @@ class NodeBase : public NodeBaseInterface std::shared_ptr get_shared_rcl_node_handle() const; + RCLCPP_PUBLIC + virtual + bool + assert_liveliness() const; + RCLCPP_PUBLIC virtual rclcpp::callback_group::CallbackGroup::SharedPtr diff --git a/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp b/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp index eb376c0353..a2228fb617 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp @@ -102,6 +102,12 @@ class NodeBaseInterface std::shared_ptr get_shared_rcl_node_handle() const = 0; + /// Manually assert that this Node is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE). + RCLCPP_PUBLIC + virtual + bool + assert_liveliness() const = 0; + /// Create and return a callback group. RCLCPP_PUBLIC virtual diff --git a/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp b/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp index 31cf62e54c..40beb3aacb 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp @@ -58,7 +58,8 @@ class NodeTopics : public NodeTopicsInterface virtual void add_publisher( - rclcpp::PublisherBase::SharedPtr publisher); + rclcpp::PublisherBase::SharedPtr publisher, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group); RCLCPP_PUBLIC virtual diff --git a/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp b/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp index b3d46ff334..d888e3ccd2 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp @@ -57,7 +57,8 @@ class NodeTopicsInterface virtual void add_publisher( - rclcpp::PublisherBase::SharedPtr publisher) = 0; + rclcpp::PublisherBase::SharedPtr publisher, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) = 0; RCLCPP_PUBLIC virtual diff --git a/rclcpp/include/rclcpp/parameter_client.hpp b/rclcpp/include/rclcpp/parameter_client.hpp index 879f261836..6b68a793fa 100644 --- a/rclcpp/include/rclcpp/parameter_client.hpp +++ b/rclcpp/include/rclcpp/parameter_client.hpp @@ -127,6 +127,7 @@ class AsyncParametersClient "parameter_events", std::forward(callback), rmw_qos_profile_default, + SubscriptionEventCallbacks(), nullptr, // group, false, // ignore_local_publications, false, // use_intra_process_comms_, diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 0cdca8cbd6..9fc199ddc6 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -34,7 +34,6 @@ #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/intra_process_manager.hpp" #include "rclcpp/macros.hpp" -#include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/publisher_base.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" @@ -59,6 +58,7 @@ class Publisher : public PublisherBase rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rcl_publisher_options_t & publisher_options, + const PublisherEventCallbacks & event_callbacks, const std::shared_ptr & allocator) : PublisherBase( node_base, @@ -68,6 +68,15 @@ class Publisher : public PublisherBase message_allocator_(allocator) { allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get()); + + if (event_callbacks.deadline_callback) { + this->add_event_handler(event_callbacks.deadline_callback, + RCL_PUBLISHER_OFFERED_DEADLINE_MISSED); + } + if (event_callbacks.liveliness_callback) { + this->add_event_handler(event_callbacks.liveliness_callback, + RCL_PUBLISHER_LIVELINESS_LOST); + } } virtual ~Publisher() diff --git a/rclcpp/include/rclcpp/publisher_base.hpp b/rclcpp/include/rclcpp/publisher_base.hpp index b836ccd90e..afb68302f8 100644 --- a/rclcpp/include/rclcpp/publisher_base.hpp +++ b/rclcpp/include/rclcpp/publisher_base.hpp @@ -23,11 +23,13 @@ #include #include #include +#include #include "rcl/publisher.h" #include "rclcpp/macros.hpp" #include "rclcpp/mapped_ring_buffer.hpp" +#include "rclcpp/qos_event.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" @@ -112,6 +114,12 @@ class PublisherBase const rcl_publisher_t * get_publisher_handle() const; + /// Get all the QoS event handlers associated with this publisher. + /** \return The vector of QoS event handlers. */ + RCLCPP_PUBLIC + const std::vector> & + get_event_handlers() const; + /// Get subscription count /** \return The number of subscriptions. */ RCLCPP_PUBLIC @@ -124,6 +132,19 @@ class PublisherBase size_t get_intra_process_subscription_count() const; + /// Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC). + /** + * If the rmw Liveliness policy is set to RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC, the creator + * of this publisher may manually call `assert_liveliness` at some point in time to signal to the + * rest of the system that this Node is still alive. + * + * \return `true` if the liveliness was asserted successfully, otherwise `false` + */ + RCLCPP_PUBLIC + RCUTILS_WARN_UNUSED + bool + assert_liveliness() const; + /// Get the actual QoS settings, after the defaults have been determined. /** * The actual configuration applied when using RMW_QOS_POLICY_*_SYSTEM_DEFAULT @@ -175,11 +196,27 @@ class PublisherBase const rcl_publisher_options_t & intra_process_options); protected: + template + void + add_event_handler( + const EventCallbackT & callback, + const rcl_publisher_event_type_t event_type) + { + auto handler = std::make_shared>( + callback, + rcl_publisher_event_init, + &publisher_handle_, + event_type); + event_handlers_.emplace_back(handler); + } + std::shared_ptr rcl_node_handle_; rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher(); rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher(); + std::vector> event_handlers_; + using IntraProcessManagerWeakPtr = std::weak_ptr; bool intra_process_is_enabled_; diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index aadb138dcd..ce2196a15f 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -57,13 +57,15 @@ struct PublisherFactory /// Return a PublisherFactory with functions setup for creating a PublisherT. template PublisherFactory -create_publisher_factory(std::shared_ptr allocator) +create_publisher_factory( + const PublisherEventCallbacks & event_callbacks, + std::shared_ptr allocator) { PublisherFactory factory; // factory function that creates a MessageT specific PublisherT factory.create_typed_publisher = - [allocator]( + [event_callbacks, allocator]( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic_name, rcl_publisher_options_t & publisher_options) -> std::shared_ptr @@ -71,7 +73,12 @@ create_publisher_factory(std::shared_ptr allocator) auto message_alloc = std::make_shared(*allocator.get()); publisher_options.allocator = allocator::get_rcl_allocator(*message_alloc.get()); - return std::make_shared(node_base, topic_name, publisher_options, message_alloc); + return std::make_shared( + node_base, + topic_name, + publisher_options, + event_callbacks, + message_alloc); }; // return the factory now that it is populated diff --git a/rclcpp/include/rclcpp/publisher_options.hpp b/rclcpp/include/rclcpp/publisher_options.hpp index 86ce4435ef..acdec02718 100644 --- a/rclcpp/include/rclcpp/publisher_options.hpp +++ b/rclcpp/include/rclcpp/publisher_options.hpp @@ -19,7 +19,9 @@ #include #include +#include "rclcpp/callback_group.hpp" #include "rclcpp/intra_process_setting.hpp" +#include "rclcpp/qos_event.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp @@ -29,10 +31,12 @@ namespace rclcpp template struct PublisherOptionsWithAllocator { + PublisherEventCallbacks event_callbacks; /// The quality of service profile to pass on to the rmw implementation. rmw_qos_profile_t qos_profile = rmw_qos_profile_default; + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group; /// Optional custom allocator. - std::shared_ptr allocator = nullptr; + std::shared_ptr allocator; /// Setting to explicitly set intraprocess communications. IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault; }; diff --git a/rclcpp/include/rclcpp/qos_event.hpp b/rclcpp/include/rclcpp/qos_event.hpp new file mode 100644 index 0000000000..d14af2bbe9 --- /dev/null +++ b/rclcpp/include/rclcpp/qos_event.hpp @@ -0,0 +1,126 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__QOS_EVENT_HPP_ +#define RCLCPP__QOS_EVENT_HPP_ + +#include + +#include "rcl/error_handling.h" + +#include "rcutils/logging_macros.h" + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/function_traits.hpp" +#include "rclcpp/waitable.hpp" + +namespace rclcpp +{ + +using QOSDeadlineRequestedInfo = rmw_requested_deadline_missed_status_t; +using QOSDeadlineOfferedInfo = rmw_offered_deadline_missed_status_t; +using QOSLivelinessChangedInfo = rmw_liveliness_changed_status_t; +using QOSLivelinessLostInfo = rmw_liveliness_lost_status_t; + +using QOSDeadlineRequestedCallbackType = std::function; +using QOSDeadlineOfferedCallbackType = std::function; +using QOSLivelinessChangedCallbackType = std::function; +using QOSLivelinessLostCallbackType = std::function; + +/// Contains callbacks for various types of events a Publisher can receive from the middleware. +struct PublisherEventCallbacks +{ + QOSDeadlineOfferedCallbackType deadline_callback; + QOSLivelinessLostCallbackType liveliness_callback; +}; + +/// Contains callbacks for non-message events that a Subscription can receive from the middleware. +struct SubscriptionEventCallbacks +{ + QOSDeadlineRequestedCallbackType deadline_callback; + QOSLivelinessChangedCallbackType liveliness_callback; +}; + +class QOSEventHandlerBase : public Waitable +{ +public: + RCLCPP_PUBLIC + virtual ~QOSEventHandlerBase(); + + /// Get the number of ready events + RCLCPP_PUBLIC + size_t + get_number_of_ready_events() override; + + /// Add the Waitable to a wait set. + RCLCPP_PUBLIC + bool + add_to_wait_set(rcl_wait_set_t * wait_set) override; + + /// Check if the Waitable is ready. + RCLCPP_PUBLIC + bool + is_ready(rcl_wait_set_t * wait_set) override; + +protected: + rcl_event_t event_handle_; + size_t wait_set_event_index_; +}; + +template +class QOSEventHandler : public QOSEventHandlerBase +{ +public: + template + QOSEventHandler( + const EventCallbackT & callback, + InitFuncT init_func, + ParentHandleT parent_handle, + EventTypeEnum event_type) + : event_callback_(callback) + { + event_handle_ = rcl_get_zero_initialized_event(); + rcl_ret_t ret = init_func(&event_handle_, parent_handle, event_type); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret, "could not create event"); + } + } + + /// Execute any entities of the Waitable that are ready. + void + execute() override + { + EventCallbackInfoT callback_info; + + rcl_ret_t ret = rcl_take_event(&event_handle_, &callback_info); + if (ret != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Couldn't take event info: %s", rcl_get_error_string().str); + return; + } + + event_callback_(callback_info); + } + +private: + using EventCallbackInfoT = typename std::remove_reference::template argument_type<0>>::type; + + EventCallbackT event_callback_; +}; + +} // namespace rclcpp + +#endif // RCLCPP__QOS_EVENT_HPP_ diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index ca1d2fba3c..8bf54656bc 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -430,6 +430,15 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy return number_of_services; } + size_t number_of_ready_events() const + { + size_t number_of_events = 0; + for (auto waitable : waitable_handles_) { + number_of_events += waitable->get_number_of_ready_events(); + } + return number_of_events; + } + size_t number_of_ready_clients() const { size_t number_of_clients = client_handles_.size(); diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index bda8db693f..348fcb52c8 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -42,6 +42,7 @@ #include "rclcpp/subscription_traits.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" +#include "rclcpp/waitable.hpp" namespace rclcpp { @@ -85,6 +86,7 @@ class Subscription : public SubscriptionBase const std::string & topic_name, const rcl_subscription_options_t & subscription_options, AnySubscriptionCallback callback, + const SubscriptionEventCallbacks & event_callbacks, typename message_memory_strategy::MessageMemoryStrategy::SharedPtr memory_strategy = message_memory_strategy::MessageMemoryStrategy::create_default()) @@ -96,7 +98,16 @@ class Subscription : public SubscriptionBase rclcpp::subscription_traits::is_serialized_subscription_argument::value), any_callback_(callback), message_memory_strategy_(memory_strategy) - {} + { + if (event_callbacks.deadline_callback) { + this->add_event_handler(event_callbacks.deadline_callback, + RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED); + } + if (event_callbacks.liveliness_callback) { + this->add_event_handler(event_callbacks.liveliness_callback, + RCL_SUBSCRIPTION_LIVELINESS_CHANGED); + } + } /// Support dynamically setting the message memory strategy. /** diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 73b0d48c38..8f133184e2 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -17,6 +17,7 @@ #include #include +#include #include "rcl/subscription.h" @@ -26,6 +27,7 @@ #include "rclcpp/any_subscription_callback.hpp" #include "rclcpp/macros.hpp" +#include "rclcpp/qos_event.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" @@ -90,6 +92,12 @@ class SubscriptionBase virtual const std::shared_ptr get_intra_process_subscription_handle() const; + /// Get all the QoS event handlers associated with this subscription. + /** \return The vector of QoS event handlers. */ + RCLCPP_PUBLIC + const std::vector> & + get_event_handlers() const; + /// Borrow a new message. /** \return Shared pointer to the fresh message. */ virtual std::shared_ptr @@ -145,9 +153,25 @@ class SubscriptionBase const rcl_subscription_options_t & intra_process_options); protected: - std::shared_ptr intra_process_subscription_handle_; - std::shared_ptr subscription_handle_; + template + void + add_event_handler( + const EventCallbackT & callback, + const rcl_subscription_event_type_t event_type) + { + auto handler = std::make_shared>( + callback, + rcl_subscription_event_init, + get_subscription_handle().get(), + event_type); + event_handlers_.emplace_back(handler); + } + std::shared_ptr node_handle_; + std::shared_ptr subscription_handle_; + std::shared_ptr intra_process_subscription_handle_; + + std::vector> event_handlers_; bool use_intra_process_; IntraProcessManagerWeakPtr weak_ipm_; diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index b479483244..7fe6bdef3b 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -75,6 +75,7 @@ template< SubscriptionFactory create_subscription_factory( CallbackT && callback, + const SubscriptionEventCallbacks & event_callbacks, typename rclcpp::message_memory_strategy::MessageMemoryStrategy< CallbackMessageT, Alloc>::SharedPtr msg_mem_strat, @@ -91,7 +92,7 @@ create_subscription_factory( // factory function that creates a MessageT specific SubscriptionT factory.create_typed_subscription = - [allocator, msg_mem_strat, any_subscription_callback, message_alloc]( + [allocator, msg_mem_strat, any_subscription_callback, event_callbacks, message_alloc]( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic_name, rcl_subscription_options_t & subscription_options @@ -109,6 +110,7 @@ create_subscription_factory( topic_name, subscription_options, any_subscription_callback, + event_callbacks, msg_mem_strat); auto sub_base_ptr = std::dynamic_pointer_cast(sub); return sub_base_ptr; diff --git a/rclcpp/include/rclcpp/subscription_options.hpp b/rclcpp/include/rclcpp/subscription_options.hpp index 38c34b950d..a86612e25b 100644 --- a/rclcpp/include/rclcpp/subscription_options.hpp +++ b/rclcpp/include/rclcpp/subscription_options.hpp @@ -19,7 +19,9 @@ #include #include +#include "rclcpp/callback_group.hpp" #include "rclcpp/intra_process_setting.hpp" +#include "rclcpp/qos_event.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp @@ -29,14 +31,15 @@ namespace rclcpp template struct SubscriptionOptionsWithAllocator { + SubscriptionEventCallbacks event_callbacks; /// The quality of service profile to pass on to the rmw implementation. rmw_qos_profile_t qos_profile = rmw_qos_profile_default; /// True to ignore local publications. bool ignore_local_publications = false; /// The callback group for this subscription. NULL to use the default callback group. - rclcpp::callback_group::CallbackGroup::SharedPtr callback_group = nullptr; + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group; /// Optional custom allocator. - std::shared_ptr allocator = nullptr; + std::shared_ptr allocator; /// Setting to explicitly set intraprocess communications. IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault; }; diff --git a/rclcpp/include/rclcpp/waitable.hpp b/rclcpp/include/rclcpp/waitable.hpp index 4f48bbc238..401b88e541 100644 --- a/rclcpp/include/rclcpp/waitable.hpp +++ b/rclcpp/include/rclcpp/waitable.hpp @@ -61,6 +61,17 @@ class Waitable size_t get_number_of_ready_clients(); + /// Get the number of ready events + /** + * Returns a value of 0 by default. + * This should be overridden if the Waitable contains one or more events. + * \return The number of events associated with the Waitable. + */ + RCLCPP_PUBLIC + virtual + size_t + get_number_of_ready_events(); + /// Get the number of ready services /** * Returns a value of 0 by default. @@ -88,6 +99,7 @@ class Waitable /** * \param[in] wait_set A handle to the wait set to add the Waitable to. * \return `true` if the Waitable is added successfully, `false` otherwise. + * \throws rclcpp::execptions::RCLError from rcl_wait_set_add_*() */ RCLCPP_PUBLIC virtual diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 24cd18e421..0caa728af2 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -60,7 +60,11 @@ Executor::Executor(const ExecutorArgs & args) // Store the context for later use. context_ = args.context; - ret = rcl_wait_set_init(&wait_set_, 0, 2, 0, 0, 0, context_->get_rcl_context().get(), allocator); + ret = rcl_wait_set_init( + &wait_set_, + 0, 2, 0, 0, 0, 0, + context_->get_rcl_context().get(), + allocator); if (RCL_RET_OK != ret) { RCUTILS_LOG_ERROR_NAMED( "rclcpp", @@ -435,7 +439,8 @@ Executor::wait_for_work(std::chrono::nanoseconds timeout) rcl_ret_t ret = rcl_wait_set_resize( &wait_set_, memory_strategy_->number_of_ready_subscriptions(), memory_strategy_->number_of_guard_conditions(), memory_strategy_->number_of_ready_timers(), - memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services()); + memory_strategy_->number_of_ready_clients(), memory_strategy_->number_of_ready_services(), + memory_strategy_->number_of_ready_events()); if (RCL_RET_OK != ret) { throw std::runtime_error( std::string("Couldn't resize the wait set : ") + rcl_get_error_string().str); diff --git a/rclcpp/src/rclcpp/graph_listener.cpp b/rclcpp/src/rclcpp/graph_listener.cpp index 1a181898c4..b097564333 100644 --- a/rclcpp/src/rclcpp/graph_listener.cpp +++ b/rclcpp/src/rclcpp/graph_listener.cpp @@ -79,6 +79,7 @@ GraphListener::start_if_not_started() 0, // number_of_timers 0, // number_of_clients 0, // number_of_services + 0, // number_of_events this->parent_context_->get_rcl_context().get(), rcl_get_default_allocator()); if (RCL_RET_OK != ret) { @@ -145,7 +146,7 @@ GraphListener::run_loop() const size_t node_graph_interfaces_size = node_graph_interfaces_.size(); // Add 2 for the interrupt and shutdown guard conditions if (wait_set_.size_of_guard_conditions < (node_graph_interfaces_size + 2)) { - ret = rcl_wait_set_resize(&wait_set_, 0, node_graph_interfaces_size + 2, 0, 0, 0); + ret = rcl_wait_set_resize(&wait_set_, 0, node_graph_interfaces_size + 2, 0, 0, 0, 0); if (RCL_RET_OK != ret) { throw_from_rcl_error(ret, "failed to resize wait set"); } diff --git a/rclcpp/src/rclcpp/node.cpp b/rclcpp/src/rclcpp/node.cpp index a10f51bec8..8313932621 100644 --- a/rclcpp/src/rclcpp/node.cpp +++ b/rclcpp/src/rclcpp/node.cpp @@ -470,3 +470,9 @@ Node::get_node_options() const { return this->node_options_; } + +bool +Node::assert_liveliness() const +{ + return node_base_->assert_liveliness(); +} diff --git a/rclcpp/src/rclcpp/node_interfaces/node_base.cpp b/rclcpp/src/rclcpp/node_interfaces/node_base.cpp index 845dcd7b6d..a9198ccd73 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_base.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_base.cpp @@ -197,6 +197,12 @@ NodeBase::get_shared_rcl_node_handle() const return node_handle_; } +bool +NodeBase::assert_liveliness() const +{ + return RCL_RET_OK == rcl_node_assert_liveliness(get_rcl_node_handle()); +} + rclcpp::callback_group::CallbackGroup::SharedPtr NodeBase::create_callback_group(rclcpp::callback_group::CallbackGroupType group_type) { diff --git a/rclcpp/src/rclcpp/node_interfaces/node_parameters.cpp b/rclcpp/src/rclcpp/node_interfaces/node_parameters.cpp index 9ee2836393..fea2e87842 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_parameters.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_parameters.cpp @@ -75,6 +75,8 @@ NodeParameters::NodeParameters( node_topics.get(), "parameter_events", parameter_event_qos_profile, + PublisherEventCallbacks(), + nullptr, use_intra_process, allocator); } diff --git a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp index bbb54b2b4a..e513464c1f 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp @@ -60,11 +60,22 @@ NodeTopics::create_publisher( void NodeTopics::add_publisher( - rclcpp::PublisherBase::SharedPtr publisher) + rclcpp::PublisherBase::SharedPtr publisher, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) { - // The publisher is not added to a callback group or anthing like that for now. - // It may be stored within the NodeTopics class or the NodeBase class in the future. - (void)publisher; + // Assign to a group. + if (callback_group) { + if (!node_base_->callback_group_in_node(callback_group)) { + throw std::runtime_error("Cannot create publisher, callback group not in node."); + } + } else { + callback_group = node_base_->get_default_callback_group(); + } + + for (auto & publisher_event : publisher->get_event_handlers()) { + callback_group->add_waitable(publisher_event); + } + // Notify the executor that a new publisher was created using the parent Node. { auto notify_guard_condition_lock = node_base_->acquire_notify_guard_condition_lock(); @@ -111,9 +122,13 @@ NodeTopics::add_subscription( // TODO(jacquelinekay): use custom exception throw std::runtime_error("Cannot create subscription, callback group not in node."); } - callback_group->add_subscription(subscription); } else { - node_base_->get_default_callback_group()->add_subscription(subscription); + callback_group = node_base_->get_default_callback_group(); + } + + callback_group->add_subscription(subscription); + for (auto & subscription_event : subscription->get_event_handlers()) { + callback_group->add_waitable(subscription_event); } // Notify the executor that a new subscription was created using the parent Node. diff --git a/rclcpp/src/rclcpp/publisher_base.cpp b/rclcpp/src/rclcpp/publisher_base.cpp index 09997bac4e..2321150b31 100644 --- a/rclcpp/src/rclcpp/publisher_base.cpp +++ b/rclcpp/src/rclcpp/publisher_base.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include "rcl_interfaces/msg/intra_process_message.hpp" #include "rcutils/logging_macros.h" @@ -30,10 +31,10 @@ #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" #include "rclcpp/exceptions.hpp" +#include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/intra_process_manager.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/node.hpp" -#include "rclcpp/expand_topic_or_service_name.hpp" using rclcpp::PublisherBase; @@ -80,6 +81,9 @@ PublisherBase::PublisherBase( PublisherBase::~PublisherBase() { + // must fini the events before fini-ing the publisher + event_handlers_.clear(); + if (rcl_publisher_fini(&intra_process_publisher_handle_, rcl_node_handle_.get()) != RCL_RET_OK) { RCUTILS_LOG_ERROR_NAMED( "rclcpp", @@ -153,6 +157,12 @@ PublisherBase::get_publisher_handle() const return &publisher_handle_; } +const std::vector> & +PublisherBase::get_event_handlers() const +{ + return event_handlers_; +} + size_t PublisherBase::get_subscription_count() const { @@ -207,6 +217,12 @@ PublisherBase::get_actual_qos() const return *qos; } +bool +PublisherBase::assert_liveliness() const +{ + return RCL_RET_OK == rcl_publisher_assert_liveliness(&publisher_handle_); +} + bool PublisherBase::operator==(const rmw_gid_t & gid) const { diff --git a/rclcpp/src/rclcpp/qos_event.cpp b/rclcpp/src/rclcpp/qos_event.cpp new file mode 100644 index 0000000000..75072f799a --- /dev/null +++ b/rclcpp/src/rclcpp/qos_event.cpp @@ -0,0 +1,55 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/qos_event.hpp" + +namespace rclcpp +{ + +QOSEventHandlerBase::~QOSEventHandlerBase() +{ + if (rcl_event_fini(&event_handle_) != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Error in destruction of rcl event handle: %s", rcl_get_error_string().str); + rcl_reset_error(); + } +} + +/// Get the number of ready events. +size_t +QOSEventHandlerBase::get_number_of_ready_events() +{ + return 1; +} + +/// Add the Waitable to a wait set. +bool +QOSEventHandlerBase::add_to_wait_set(rcl_wait_set_t * wait_set) +{ + rcl_ret_t ret = rcl_wait_set_add_event(wait_set, &event_handle_, &wait_set_event_index_); + if (RCL_RET_OK != ret) { + exceptions::throw_from_rcl_error(ret, "Couldn't add event to wait set"); + } + return true; +} + +/// Check if the Waitable is ready. +bool +QOSEventHandlerBase::is_ready(rcl_wait_set_t * wait_set) +{ + return wait_set->events[wait_set_event_index_] == &event_handle_; +} + +} // namespace rclcpp diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 23ed7e5e0f..9b02141988 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" @@ -121,6 +122,12 @@ SubscriptionBase::get_intra_process_subscription_handle() const return intra_process_subscription_handle_; } +const std::vector> & +SubscriptionBase::get_event_handlers() const +{ + return event_handlers_; +} + const rosidl_message_type_support_t & SubscriptionBase::get_message_type_support_handle() const { diff --git a/rclcpp/src/rclcpp/time_source.cpp b/rclcpp/src/rclcpp/time_source.cpp index f54cea4d5a..ca2ad27da2 100644 --- a/rclcpp/src/rclcpp/time_source.cpp +++ b/rclcpp/src/rclcpp/time_source.cpp @@ -223,6 +223,7 @@ void TimeSource::create_clock_sub() topic_name, std::move(cb), rmw_qos_profile_default, + rclcpp::SubscriptionEventCallbacks(), group, false, false, diff --git a/rclcpp/src/rclcpp/waitable.cpp b/rclcpp/src/rclcpp/waitable.cpp index a29b80de1d..542b10a016 100644 --- a/rclcpp/src/rclcpp/waitable.cpp +++ b/rclcpp/src/rclcpp/waitable.cpp @@ -34,6 +34,12 @@ Waitable::get_number_of_ready_clients() return 0u; } +size_t +Waitable::get_number_of_ready_events() +{ + return 0u; +} + size_t Waitable::get_number_of_ready_services() { diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp index 290b93d5b4..dc19f3c5b2 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp @@ -23,10 +23,12 @@ #include "rclcpp/contexts/default_context.hpp" #include "rclcpp/intra_process_manager.hpp" +#include "rclcpp/event.hpp" #include "rclcpp/parameter.hpp" #include "rclcpp/create_publisher.hpp" #include "rclcpp/create_service.hpp" #include "rclcpp/create_subscription.hpp" +#include "rclcpp/subscription_options.hpp" #include "rclcpp/type_support_decl.hpp" #include "lifecycle_publisher.hpp" @@ -65,6 +67,8 @@ LifecycleNode::create_publisher( this->node_topics_.get(), topic_name, qos_profile, + rclcpp::PublisherEventCallbacks(), + nullptr, use_intra_process_comms_, allocator); } @@ -99,6 +103,7 @@ LifecycleNode::create_subscription( topic_name, std::forward(callback), qos_profile, + rclcpp::SubscriptionEventCallbacks(), group, ignore_local_publications, use_intra_process_comms_, @@ -128,6 +133,7 @@ LifecycleNode::create_subscription( topic_name, std::forward(callback), qos, + rclcpp::SubscriptionEventCallbacks(), group, ignore_local_publications, msg_mem_strat, diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp index 1a1a3edb5d..4b05977f3e 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_publisher.hpp @@ -61,9 +61,10 @@ class LifecyclePublisher : public LifecyclePublisherInterface, rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rcl_publisher_options_t & publisher_options, + const rclcpp::PublisherEventCallbacks event_callbacks, std::shared_ptr allocator) : rclcpp::Publisher( - node_base, topic, publisher_options, allocator), + node_base, topic, publisher_options, event_callbacks, allocator), enabled_(false), logger_(rclcpp::get_logger("LifecyclePublisher")) {