diff --git a/rcl/include/rcl/graph.h b/rcl/include/rcl/graph.h index 50d6fb526..17a74a6ea 100644 --- a/rcl/include/rcl/graph.h +++ b/rcl/include/rcl/graph.h @@ -26,6 +26,7 @@ extern "C" #include #include +#include "rcutils/time.h" #include "rcutils/types.h" #include "rosidl_runtime_c/service_type_support_struct.h" @@ -581,6 +582,95 @@ rcl_count_subscribers( const char * topic_name, size_t * count); +/// Wait for there to be a specified number of publishers on a given topic. +/** + * The `node` parameter must point to a valid node. + * + * The `allocator` parameter must point to a valid allocator. + * + * The `topic_name` parameter must not be `NULL`, and must not be an empty string. + * It should also follow the topic name rules. + * + * This function blocks and will return when the number of publishers for `topic_name` + * is greater than or equal to the `count` parameter, or the specificed `timeout` is reached. + * + * The `timeout` parameter is in nanoseconds. + * A negative value disables the timeout (i.e. this function to blocks until the number of + * publishers is greater than or equals to `count`). + * + * The `success` parameter must point to a valid bool. + * The `success` parameter is the output for this function and will be set. + * + *
+ * Attribute | Adherence + * ------------------ | ------------- + * Allocates Memory | Yes + * Thread-Safe | No + * Uses Atomics | No + * Lock-Free | Maybe [1] + * [1] implementation may need to protect the data structure with a lock + * + * \param[in] node the handle to the node being used to query the ROS graph + * \param[in] allocator to allocate space for the rcl_wait_set_t used to wait for graph events + * \param[in] topic_name the name of the topic in question + * \param[in] count number of publishers to wait for + * \param[in] timeout maximum duration to wait for publishers + * \param[out] success `true` if the number of publishers is equal to or greater than count, or + * `false` if a timeout occurred waiting for publishers. + * \return #RCL_RET_OK if there was no errors, or + * \return #RCL_RET_NODE_INVALID if the node is invalid, or + * \return #RCL_RET_INVALID_ARGUMENT if any arguments are invalid, or + * \return #RCL_RET_TIMEOUT if a timeout occurs before the number of publishers is detected, or + * \return #RCL_RET_ERROR if an unspecified error occurred. + */ +RCL_PUBLIC +RCL_WARN_UNUSED +rcl_ret_t +rcl_wait_for_publishers( + const rcl_node_t * node, + rcl_allocator_t * allocator, + const char * topic_name, + const size_t count, + rcutils_duration_value_t timeout, + bool * success); + +/// Wait for there to be a specified number of subscribers on a given topic. +/** + * \see rcl_wait_for_publishers + * + *
+ * Attribute | Adherence + * ------------------ | ------------- + * Allocates Memory | Yes + * Thread-Safe | No + * Uses Atomics | No + * Lock-Free | Maybe [1] + * [1] implementation may need to protect the data structure with a lock + * + * \param[in] node the handle to the node being used to query the ROS graph + * \param[in] allocator to allocate space for the rcl_wait_set_t used to wait for graph events + * \param[in] topic_name the name of the topic in question + * \param[in] count number of subscribers to wait for + * \param[in] timeout maximum duration to wait for subscribers + * \param[out] success `true` if the number of subscribers is equal to or greater than count, or + * `false` if a timeout occurred waiting for subscribers. + * \return #RCL_RET_OK if there was no errors, or + * \return #RCL_RET_NODE_INVALID if the node is invalid, or + * \return #RCL_RET_INVALID_ARGUMENT if any arguments are invalid, or + * \return #RCL_RET_TIMEOUT if a timeout occurs before the number of subscribers is detected, or + * \return #RCL_RET_ERROR if an unspecified error occurred. + */ +RCL_PUBLIC +RCL_WARN_UNUSED +rcl_ret_t +rcl_wait_for_subscribers( + const rcl_node_t * node, + rcl_allocator_t * allocator, + const char * topic_name, + const size_t count, + rcutils_duration_value_t timeout, + bool * success); + /// Return a list of all publishers to a topic. /** * The `node` parameter must point to a valid node. diff --git a/rcl/src/rcl/graph.c b/rcl/src/rcl/graph.c index 225b938d6..42279c665 100644 --- a/rcl/src/rcl/graph.c +++ b/rcl/src/rcl/graph.c @@ -20,8 +20,12 @@ extern "C" #include "rcl/graph.h" #include "rcl/error_handling.h" +#include "rcl/guard_condition.h" +#include "rcl/wait.h" #include "rcutils/allocator.h" +#include "rcutils/error_handling.h" #include "rcutils/macros.h" +#include "rcutils/time.h" #include "rcutils/types.h" #include "rmw/error_handling.h" #include "rmw/get_node_info_and_types.h" @@ -452,6 +456,158 @@ rcl_count_subscribers( return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret); } +typedef rcl_ret_t (* count_entities_func_t)( + const rcl_node_t * node, + const char * topic_name, + size_t * count); + +rcl_ret_t +_rcl_wait_for_entities( + const rcl_node_t * node, + rcl_allocator_t * allocator, + const char * topic_name, + const size_t expected_count, + rcutils_duration_value_t timeout, + bool * success, + count_entities_func_t count_entities_func) +{ + if (!rcl_node_is_valid(node)) { + return RCL_RET_NODE_INVALID; + } + RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT); + RCL_CHECK_ARGUMENT_FOR_NULL(topic_name, RCL_RET_INVALID_ARGUMENT); + RCL_CHECK_ARGUMENT_FOR_NULL(success, RCL_RET_INVALID_ARGUMENT); + + *success = false; + + // We can avoid waiting if there are already the expected number of publishers + size_t count = 0u; + rcl_ret_t count_ret = count_entities_func(node, topic_name, &count); + if (count_ret != RCL_RET_OK) { + // Error message already set + return count_ret; + } + if (expected_count <= count) { + *success = true; + return RCL_RET_OK; + } + + // Create a wait set and add the node graph guard condition to it + rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set(); + rcl_ret_t ret = rcl_wait_set_init( + &wait_set, 0, 1, 0, 0, 0, 0, node->context, *allocator); + if (ret != RCL_RET_OK) { + // Error message already set + return ret; + } + + const rcl_guard_condition_t * guard_condition = rcl_node_get_graph_guard_condition(node); + if (!guard_condition) { + // Error message already set + return RCL_RET_ERROR; + } + + // Add it to the wait set + ret = rcl_wait_set_add_guard_condition(&wait_set, guard_condition, NULL); + if (ret != RCL_RET_OK) { + // Error message already set + return ret; + } + + // Get current time + rcutils_time_point_value_t start; + rcutils_ret_t time_ret = rcutils_steady_time_now(&start); + if (time_ret != RCUTILS_RET_OK) { + rcutils_error_string_t error = rcutils_get_error_string(); + rcutils_reset_error(); + RCL_SET_ERROR_MSG(error.str); + return RCL_RET_ERROR; + } + + // Wait for expected count or timeout + while (true) { + ret = rcl_wait(&wait_set, timeout); + if (ret != RCL_RET_OK && ret != RCL_RET_TIMEOUT) { + // Error message already set + return ret; + } + + // Check count + count_ret = count_entities_func(node, topic_name, &count); + if (count_ret != RCL_RET_OK) { + // Error already set + return count_ret; + } + if (expected_count <= count) { + *success = true; + break; + } + + // If we're not waiting indefinitely, compute time remaining + if (timeout >= 0) { + rcutils_time_point_value_t now; + time_ret = rcutils_steady_time_now(&now); + if (time_ret != RCUTILS_RET_OK) { + rcutils_error_string_t error = rcutils_get_error_string(); + rcutils_reset_error(); + RCL_SET_ERROR_MSG(error.str); + return RCL_RET_ERROR; + } + timeout = timeout - (now - start); + if (timeout <= 0) { + return RCL_RET_TIMEOUT; + } + } + + // Clear wait set for next iteration + ret = rcl_wait_set_clear(&wait_set); + if (ret != RCL_RET_OK) { + // Error message already set + return ret; + } + } + + return RCL_RET_OK; +} + +rcl_ret_t +rcl_wait_for_publishers( + const rcl_node_t * node, + rcl_allocator_t * allocator, + const char * topic_name, + const size_t expected_count, + rcutils_duration_value_t timeout, + bool * success) +{ + return _rcl_wait_for_entities( + node, + allocator, + topic_name, + expected_count, + timeout, + success, + rcl_count_publishers); +} + +rcl_ret_t +rcl_wait_for_subscribers( + const rcl_node_t * node, + rcl_allocator_t * allocator, + const char * topic_name, + const size_t expected_count, + rcutils_duration_value_t timeout, + bool * success) +{ + return _rcl_wait_for_entities( + node, + allocator, + topic_name, + expected_count, + timeout, + success, + rcl_count_subscribers); +} + typedef rmw_ret_t (* get_topic_endpoint_info_func_t)( const rmw_node_t * node, rcutils_allocator_t * allocator, diff --git a/rcl/test/rcl/test_graph.cpp b/rcl/test/rcl/test_graph.cpp index bb49a1859..d6025e4fe 100644 --- a/rcl/test/rcl/test_graph.cpp +++ b/rcl/test/rcl/test_graph.cpp @@ -706,6 +706,96 @@ TEST_F( rcl_reset_error(); } +/* Test the rcl_wait_for_publishers function. + */ +TEST_F( + CLASSNAME(TestGraphFixture, RMW_IMPLEMENTATION), + test_rcl_wait_for_publishers +) { + rcl_ret_t ret; + rcl_node_t zero_node = rcl_get_zero_initialized_node(); + rcl_allocator_t zero_allocator = static_cast( + rcutils_get_zero_initialized_allocator()); + rcl_allocator_t allocator = rcl_get_default_allocator(); + const char * topic_name = "/topic_test_rcl_wait_for_publishers"; + bool success = false; + + // Invalid node + ret = rcl_wait_for_publishers(nullptr, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_NODE_INVALID, ret); + rcl_reset_error(); + ret = rcl_wait_for_publishers(&zero_node, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_NODE_INVALID, ret); + rcl_reset_error(); + ret = rcl_wait_for_publishers(this->old_node_ptr, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_NODE_INVALID, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Invalid allocator + ret = rcl_wait_for_publishers(this->node_ptr, nullptr, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + ret = rcl_wait_for_publishers(this->node_ptr, &zero_allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Invalid topic name + ret = rcl_wait_for_publishers(this->node_ptr, &allocator, nullptr, 1u, 100, &success); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Invalid output arg + ret = rcl_wait_for_publishers(this->node_ptr, &allocator, topic_name, 1u, 100, nullptr); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Valid call (expect timeout since there are no publishers) + ret = rcl_wait_for_publishers(this->node_ptr, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_TIMEOUT, ret) << rcl_get_error_string().str; + rcl_reset_error(); +} + +/* Test the rcl_wait_for_subscribers function. + */ +TEST_F( + CLASSNAME(TestGraphFixture, RMW_IMPLEMENTATION), + test_rcl_wait_for_subscribers +) { + rcl_ret_t ret; + rcl_node_t zero_node = rcl_get_zero_initialized_node(); + rcl_allocator_t zero_allocator = static_cast( + rcutils_get_zero_initialized_allocator()); + rcl_allocator_t allocator = rcl_get_default_allocator(); + const char * topic_name = "/topic_test_rcl_wait_for_subscribers"; + bool success = false; + + // Invalid node + ret = rcl_wait_for_subscribers(nullptr, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_NODE_INVALID, ret); + rcl_reset_error(); + ret = rcl_wait_for_subscribers(&zero_node, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_NODE_INVALID, ret); + rcl_reset_error(); + ret = rcl_wait_for_subscribers(this->old_node_ptr, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_NODE_INVALID, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Invalid allocator + ret = rcl_wait_for_subscribers(this->node_ptr, nullptr, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + ret = rcl_wait_for_subscribers(this->node_ptr, &zero_allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Invalid topic name + ret = rcl_wait_for_subscribers(this->node_ptr, &allocator, nullptr, 1u, 100, &success); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Invalid output arg + ret = rcl_wait_for_subscribers(this->node_ptr, &allocator, topic_name, 1u, 100, nullptr); + EXPECT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str; + rcl_reset_error(); + // Valid call (expect timeout since there are no subscribers) + ret = rcl_wait_for_subscribers(this->node_ptr, &allocator, topic_name, 1u, 100, &success); + EXPECT_EQ(RCL_RET_TIMEOUT, ret) << rcl_get_error_string().str; + rcl_reset_error(); +} + void check_graph_state( const rcl_node_t * node_ptr,