Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement callbacks for liveliness and deadline QoS events #695

Merged
merged 10 commits into from
May 3, 2019
1 change: 1 addition & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/parameter_map.cpp
src/rclcpp/parameter_service.cpp
src/rclcpp/publisher.cpp
src/rclcpp/qos_event.cpp
src/rclcpp/service.cpp
src/rclcpp/signal_handler.cpp
src/rclcpp/subscription.cpp
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/include/rclcpp/callback_group.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <vector>

#include "rclcpp/client.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/subscription.hpp"
#include "rclcpp/timer.hpp"
Expand Down Expand Up @@ -92,6 +93,10 @@ class CallbackGroup
protected:
RCLCPP_DISABLE_COPY(CallbackGroup)

RCLCPP_PUBLIC
void
add_publisher(const rclcpp::PublisherBase::SharedPtr publisher_ptr);

RCLCPP_PUBLIC
void
add_subscription(const rclcpp::SubscriptionBase::SharedPtr subscription_ptr);
Expand Down
8 changes: 6 additions & 2 deletions rclcpp/include/rclcpp/create_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ create_publisher(
rclcpp::node_interfaces::NodeTopicsInterface * node_topics,
const std::string & topic_name,
const rmw_qos_profile_t & qos_profile,
const PublisherEventCallbacks & event_callbacks,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool use_intra_process_comms,
std::shared_ptr<AllocatorT> allocator)
{
Expand All @@ -39,10 +41,12 @@ create_publisher(

auto pub = node_topics->create_publisher(
topic_name,
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(allocator),
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(event_callbacks, allocator),
publisher_options,
use_intra_process_comms);
node_topics->add_publisher(pub);

node_topics->add_publisher(pub, group);

return std::dynamic_pointer_cast<PublisherT>(pub);
}

Expand Down
6 changes: 5 additions & 1 deletion rclcpp/include/rclcpp/create_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ create_subscription(
const std::string & topic_name,
CallbackT && callback,
const rmw_qos_profile_t & qos_profile,
const SubscriptionEventCallbacks & event_callbacks,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
bool use_intra_process_comms,
Expand All @@ -52,7 +53,10 @@ create_subscription(

auto factory = rclcpp::create_subscription_factory
<MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT>(
std::forward<CallbackT>(callback), msg_mem_strat, allocator);
std::forward<CallbackT>(callback),
event_callbacks,
msg_mem_strat,
allocator);

auto sub = node_topics->create_subscription(
topic_name,
Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class InvalidServiceNameError : public NameValidationError
* \throws RCLErrorBase some child class exception based on ret
*/
RCLCPP_PUBLIC
[[noreturn]]
void
throw_from_rcl_error(
rcl_ret_t ret,
Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class RCLCPP_PUBLIC MemoryStrategy
virtual size_t number_of_ready_subscriptions() const = 0;
virtual size_t number_of_ready_services() const = 0;
virtual size_t number_of_ready_clients() const = 0;
virtual size_t number_of_ready_events() const = 0;
virtual size_t number_of_ready_timers() const = 0;
virtual size_t number_of_guard_conditions() const = 0;
virtual size_t number_of_waitables() const = 0;
Expand Down
13 changes: 13 additions & 0 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,19 @@ class Node : public std::enable_shared_from_this<Node>
const NodeOptions &
get_node_options() const;

/// Manually assert that this Node is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE).
/**
* If the rmw Liveliness policy is set to RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE, the creator
* of this node may manually call `assert_liveliness` at some point in time to signal to the rest
* of the system that this Node is still alive.
*
* \return `true` if the liveliness was asserted successfully, otherwise `false`
*/
RCLCPP_PUBLIC
RCUTILS_WARN_UNUSED
bool
assert_liveliness() const;

protected:
/// Construct a sub-node, which will extend the namespace of all entities created with it.
/**
Expand Down
3 changes: 3 additions & 0 deletions rclcpp/include/rclcpp/node_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ Node::create_publisher(
this->node_topics_.get(),
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
qos_profile,
options.event_callbacks,
options.callback_group,
use_intra_process,
allocator);
}
Expand Down Expand Up @@ -181,6 +183,7 @@ Node::create_subscription(
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
std::forward<CallbackT>(callback),
qos_profile,
options.event_callbacks,
options.callback_group,
options.ignore_local_publications,
use_intra_process,
Expand Down
5 changes: 5 additions & 0 deletions rclcpp/include/rclcpp/node_interfaces/node_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ class NodeBase : public NodeBaseInterface
std::shared_ptr<const rcl_node_t>
get_shared_rcl_node_handle() const;

RCLCPP_PUBLIC
virtual
bool
assert_liveliness() const;

RCLCPP_PUBLIC
virtual
rclcpp::callback_group::CallbackGroup::SharedPtr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ class NodeBaseInterface
std::shared_ptr<const rcl_node_t>
get_shared_rcl_node_handle() const = 0;

/// Manually assert that this Node is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE).
RCLCPP_PUBLIC
virtual
bool
assert_liveliness() const = 0;

/// Create and return a callback group.
RCLCPP_PUBLIC
virtual
Expand Down
3 changes: 2 additions & 1 deletion rclcpp/include/rclcpp/node_interfaces/node_topics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class NodeTopics : public NodeTopicsInterface
virtual
void
add_publisher(
rclcpp::PublisherBase::SharedPtr publisher);
rclcpp::PublisherBase::SharedPtr publisher,
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group);

RCLCPP_PUBLIC
virtual
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class NodeTopicsInterface
virtual
void
add_publisher(
rclcpp::PublisherBase::SharedPtr publisher) = 0;
rclcpp::PublisherBase::SharedPtr publisher,
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) = 0;

RCLCPP_PUBLIC
virtual
Expand Down
1 change: 1 addition & 0 deletions rclcpp/include/rclcpp/parameter_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ class AsyncParametersClient
"parameter_events",
std::forward<CallbackT>(callback),
rmw_qos_profile_default,
SubscriptionEventCallbacks(),
nullptr, // group,
false, // ignore_local_publications,
false, // use_intra_process_comms_,
Expand Down
54 changes: 52 additions & 2 deletions rclcpp/include/rclcpp/publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <memory>
#include <sstream>
#include <string>
#include <vector>

#include "rcl/error_handling.h"
#include "rcl/publisher.h"
Expand All @@ -31,17 +32,21 @@

#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/allocator/allocator_deleter.hpp"
#include "rclcpp/exceptions.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"

namespace rclcpp
{

// Forward declaration is used for friend statement.
namespace node_interfaces
{
// Forward declaration avoids including node_base_interface.hpp
// which causes circular inclusion from callback_group.hpp
class NodeBaseInterface;
// Forward declaration is used for friend statement.
class NodeTopicsInterface;
}

Expand Down Expand Up @@ -116,6 +121,12 @@ class PublisherBase
const rcl_publisher_t *
get_publisher_handle() const;

/// Get all the QoS event handlers associated with this publisher.
/** \return The vector of QoS event handlers. */
RCLCPP_PUBLIC
const std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> &
get_event_handlers() const;

/// Get subscription count
/** \return The number of subscriptions. */
RCLCPP_PUBLIC
Expand All @@ -128,6 +139,19 @@ class PublisherBase
size_t
get_intra_process_subscription_count() const;

/// Manually assert that this Publisher is alive (for RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC).
/**
* If the rmw Liveliness policy is set to RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC, the creator
* of this publisher may manually call `assert_liveliness` at some point in time to signal to the
* rest of the system that this Node is still alive.
*
* \return `true` if the liveliness was asserted successfully, otherwise `false`
*/
RCLCPP_PUBLIC
RCUTILS_WARN_UNUSED
bool
assert_liveliness() const;

/// Get the actual QoS settings, after the defaults have been determined.
/**
* The actual configuration applied when using RMW_QOS_POLICY_*_SYSTEM_DEFAULT
Expand Down Expand Up @@ -176,11 +200,27 @@ class PublisherBase
const rcl_publisher_options_t & intra_process_options);

protected:
template<typename EventCallbackT>
void
add_event_handler(
const EventCallbackT & callback,
const rcl_publisher_event_type_t event_type)
{
auto handler = std::make_shared<QOSEventHandler<EventCallbackT>>(
callback,
rcl_publisher_event_init,
&publisher_handle_,
event_type);
event_handlers_.emplace_back(handler);
}

std::shared_ptr<rcl_node_t> rcl_node_handle_;

rcl_publisher_t publisher_handle_ = rcl_get_zero_initialized_publisher();
rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher();

std::vector<std::shared_ptr<rclcpp::QOSEventHandlerBase>> event_handlers_;

using IntraProcessManagerWeakPtr =
std::weak_ptr<rclcpp::intra_process_manager::IntraProcessManager>;
bool intra_process_is_enabled_;
Expand Down Expand Up @@ -208,6 +248,7 @@ class Publisher : public PublisherBase
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic,
const rcl_publisher_options_t & publisher_options,
const PublisherEventCallbacks & event_callbacks,
const std::shared_ptr<MessageAlloc> & allocator)
: PublisherBase(
node_base,
Expand All @@ -217,6 +258,15 @@ class Publisher : public PublisherBase
message_allocator_(allocator)
{
allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());

if (event_callbacks.deadline_callback) {
this->add_event_handler(event_callbacks.deadline_callback,
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
}
if (event_callbacks.liveliness_callback) {
this->add_event_handler(event_callbacks.liveliness_callback,
RCL_PUBLISHER_LIVELINESS_LOST);
}
}

virtual ~Publisher()
Expand Down
13 changes: 10 additions & 3 deletions rclcpp/include/rclcpp/publisher_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,28 @@ struct PublisherFactory
/// Return a PublisherFactory with functions setup for creating a PublisherT<MessageT, Alloc>.
template<typename MessageT, typename Alloc, typename PublisherT>
PublisherFactory
create_publisher_factory(std::shared_ptr<Alloc> allocator)
create_publisher_factory(
const PublisherEventCallbacks & event_callbacks,
std::shared_ptr<Alloc> allocator)
{
PublisherFactory factory;

// factory function that creates a MessageT specific PublisherT
factory.create_typed_publisher =
[allocator](
[event_callbacks, allocator](
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name,
rcl_publisher_options_t & publisher_options) -> std::shared_ptr<PublisherT>
{
auto message_alloc = std::make_shared<typename PublisherT::MessageAlloc>(*allocator.get());
publisher_options.allocator = allocator::get_rcl_allocator<MessageT>(*message_alloc.get());

return std::make_shared<PublisherT>(node_base, topic_name, publisher_options, message_alloc);
return std::make_shared<PublisherT>(
node_base,
topic_name,
publisher_options,
event_callbacks,
message_alloc);
};

// function to add a publisher to the intra process manager
Expand Down
6 changes: 5 additions & 1 deletion rclcpp/include/rclcpp/publisher_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
#include <string>
#include <vector>

#include "rclcpp/callback_group.hpp"
#include "rclcpp/intra_process_setting.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/visibility_control.hpp"

namespace rclcpp
Expand All @@ -29,10 +31,12 @@ namespace rclcpp
template<typename Allocator>
struct PublisherOptionsWithAllocator
{
PublisherEventCallbacks event_callbacks;
/// The quality of service profile to pass on to the rmw implementation.
rmw_qos_profile_t qos_profile = rmw_qos_profile_default;
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group;
/// Optional custom allocator.
std::shared_ptr<Allocator> allocator = nullptr;
std::shared_ptr<Allocator> allocator;
/// Setting to explicitly set intraprocess communications.
IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault;
};
Expand Down
Loading